--- title: Data preprocessing keywords: fastai sidebar: home_sidebar summary: "Functions used to preprocess time series (both X and y)." description: "Functions used to preprocess time series (both X and y)." nb_path: "nbs/016_data.preprocessing.ipynb" ---
{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
dsid = 'NATOPS'
X, y, splits = get_UCR_data(dsid, return_split=False)
tfms = [None, Categorize()]
dsets = TSDatasets(X, y, tfms=tfms, splits=splits)
{% endraw %} {% raw %}

class ToNumpyCategory[source]

ToNumpyCategory(**kwargs) :: Transform

Categorize a numpy batch

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = ToNumpyCategory()
y_cat = t(y)
y_cat[:10]
array([3, 2, 2, 3, 2, 4, 0, 5, 2, 1])
{% endraw %} {% raw %}
test_eq(t.decode(tensor(y_cat)), y)
test_eq(t.decode(np.array(y_cat)), y)
{% endraw %} {% raw %}

class OneHot[source]

OneHot(n_classes=None, **kwargs) :: Transform

One-hot encode/ decode a batch

{% endraw %} {% raw %}
{% endraw %} {% raw %}
oh_encoder = OneHot()
y_cat = ToNumpyCategory()(y)
oht = oh_encoder(y_cat)
oht[:10]
array([[0., 0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0.],
       [1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1.],
       [0., 0., 1., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0.]])
{% endraw %} {% raw %}
n_classes = 10
n_samples = 100

t = torch.randint(0, n_classes, (n_samples,))
oh_encoder = OneHot()
oht = oh_encoder(t)
test_eq(oht.shape, (n_samples, n_classes))
test_eq(torch.argmax(oht, dim=-1), t)
test_eq(oh_encoder.decode(oht), t)
{% endraw %} {% raw %}
n_classes = 10
n_samples = 100

a = np.random.randint(0, n_classes, (n_samples,))
oh_encoder = OneHot()
oha = oh_encoder(a)
test_eq(oha.shape, (n_samples, n_classes))
test_eq(np.argmax(oha, axis=-1), a)
test_eq(oh_encoder.decode(oha), a)
{% endraw %} {% raw %}

class Nan2Value[source]

Nan2Value(value=0, median=False, by_sample_and_var=True) :: Transform

Replaces any nan values by a predefined value or median

{% endraw %} {% raw %}
{% endraw %} {% raw %}
o = TSTensor(torch.randn(16, 10, 100))
o[0,0] = float('nan')
o[o > .9] = float('nan')
o[[0,1,5,8,14,15], :, -20:] = float('nan')
nan_vals1 = torch.isnan(o).sum()
o2 = Pipeline(Nan2Value(), split_idx=0)(o.clone())
o3 = Pipeline(Nan2Value(median=True, by_sample_and_var=True), split_idx=0)(o.clone())
o4 = Pipeline(Nan2Value(median=True, by_sample_and_var=False), split_idx=0)(o.clone())
nan_vals2 = torch.isnan(o2).sum()
nan_vals3 = torch.isnan(o3).sum()
nan_vals4 = torch.isnan(o4).sum()
test_ne(nan_vals1, 0)
test_eq(nan_vals2, 0)
test_eq(nan_vals3, 0)
test_eq(nan_vals4, 0)
{% endraw %} {% raw %}

class TSStandardize[source]

TSStandardize(mean=None, std=None, by_sample=False, by_var=False, by_step=False, eps=1e-08, use_single_batch=True, verbose=False) :: Transform

Standardizes batch of type TSTensor

Args:

- mean: you can pass a precalculated mean value as a torch tensor which is the one that will be used, or leave as None, in which case
    it will be estimated using a batch.
- std: you can pass a precalculated std value as a torch tensor which is the one that will be used, or leave as None, in which case
    it will be estimated using a batch. If both mean and std values are passed when instantiating TSStandardize, the rest of arguments won't be used.
- by_sample: if True, it will calculate mean and std for each individual sample. Otherwise based on the entire batch.
- by_var:
    * False: mean and std will be the same for all variables.
    * True: a mean and std will be be different for each variable.
    * a list of ints: (like [0,1,3]) a different mean and std will be set for each variable on the list. Variables not included in the list
    won't be standardized.
    * a list that contains a list/lists: (like[0, [1,3]]) a different mean and std will be set for each element of the list. If multiple elements are
    included in a list, the same mean and std will be set for those variable in the sublist/s. (in the example a mean and std is determined for
    variable 0, and another one for variables 1 & 3 - the same one). Variables not included in the list won't be standardized.
- by_step: if False, it will standardize values for each time step.
- eps: it avoids dividing by 0
- use_single_batch: if True a single training batch will be used to calculate mean & std. Else the entire training set will be used.
{% endraw %} {% raw %}
{% endraw %} {% raw %}
batch_tfms=[TSStandardize(by_sample=True, by_var=False, verbose=True)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
{% endraw %} {% raw %}
from tsai.data.validation import TimeSplitter
X_nan = np.random.rand(100, 5, 10)
idxs = np.random.choice(len(X_nan), int(len(X_nan)*.5), False)
X_nan[idxs, 0] = float('nan')
idxs = np.random.choice(len(X_nan), int(len(X_nan)*.5), False)
X_nan[idxs, 1, -10:] = float('nan')
batch_tfms = TSStandardize(by_var=True)
dls = get_ts_dls(X_nan, batch_tfms=batch_tfms, splits=TimeSplitter(show_plot=False)(range_of(X_nan)))
test_eq(torch.isnan(dls.after_batch[0].mean).sum(), 0)
test_eq(torch.isnan(dls.after_batch[0].std).sum(), 0)
xb = first(dls.train)[0]
test_ne(torch.isnan(xb).sum(), 0)
test_ne(torch.isnan(xb).sum(), torch.isnan(xb).numel())
batch_tfms = [TSStandardize(by_var=True), Nan2Value()]
dls = get_ts_dls(X_nan, batch_tfms=batch_tfms, splits=TimeSplitter(show_plot=False)(range_of(X_nan)))
xb = first(dls.train)[0]
test_eq(torch.isnan(xb).sum(), 0)
{% endraw %} {% raw %}
batch_tfms=[TSStandardize(by_sample=True, by_var=False, verbose=False)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
xb, yb = next(iter(dls.valid))
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
{% endraw %} {% raw %}
tfms = [None, TSClassification()]
batch_tfms = TSStandardize(by_sample=True)
dls = get_ts_dls(X, y, splits=splits, tfms=tfms, batch_tfms=batch_tfms, bs=[64, 128], inplace=True)
xb, yb = dls.train.one_batch()
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
xb, yb = dls.valid.one_batch()
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
{% endraw %} {% raw %}
tfms = [None, TSClassification()]
batch_tfms = TSStandardize(by_sample=True, by_var=False, verbose=False)
dls = get_ts_dls(X, y, splits=splits, tfms=tfms, batch_tfms=batch_tfms, bs=[64, 128], inplace=False)
xb, yb = dls.train.one_batch()
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
xb, yb = dls.valid.one_batch()
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
{% endraw %} {% raw %}

Tensor.mul_min[source]

Tensor.mul_min(x:NumpyTensor'>), axes=(), keepdim=False)

{% endraw %} {% raw %}

TSTensor.mul_min[source]

TSTensor.mul_min(x:NumpyTensor'>), axes=(), keepdim=False)

{% endraw %} {% raw %}

NumpyTensor.mul_min[source]

NumpyTensor.mul_min(x:NumpyTensor'>), axes=(), keepdim=False)

{% endraw %} {% raw %}

Tensor.mul_max[source]

Tensor.mul_max(x:NumpyTensor'>), axes=(), keepdim=False)

{% endraw %} {% raw %}

TSTensor.mul_max[source]

TSTensor.mul_max(x:NumpyTensor'>), axes=(), keepdim=False)

{% endraw %} {% raw %}

NumpyTensor.mul_max[source]

NumpyTensor.mul_max(x:NumpyTensor'>), axes=(), keepdim=False)

{% endraw %} {% raw %}

class TSNormalize[source]

TSNormalize(min=None, max=None, range=(-1, 1), by_sample=False, by_var=False, by_step=False, clip_values=True, use_single_batch=True, verbose=False) :: Transform

Normalizes batch of type TSTensor

{% endraw %} {% raw %}
{% endraw %} {% raw %}
batch_tfms = [TSNormalize()]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
assert xb.max() <= 1
assert xb.min() >= -1
{% endraw %} {% raw %}
batch_tfms=[TSNormalize(by_sample=True, by_var=False, verbose=False)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
assert xb.max() <= 1
assert xb.min() >= -1
{% endraw %} {% raw %}
batch_tfms = [TSNormalize(by_var=[0, [1, 2]], use_single_batch=False, clip_values=False, verbose=False)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
assert xb[:, [0, 1, 2]].max() <= 1
assert xb[:, [0, 1, 2]].min() >= -1
{% endraw %} {% raw %}

class TSClipOutliers[source]

TSClipOutliers(min=None, max=None, by_sample=False, by_var=False, verbose=False) :: Transform

Clip outliers batch of type TSTensor based on the IQR

{% endraw %} {% raw %}
{% endraw %} {% raw %}
batch_tfms=[TSClipOutliers(-1, 1, verbose=True)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
assert xb.max() <= 1
assert xb.min() >= -1
test_close(xb.min(), -1, eps=1e-1)
test_close(xb.max(), 1, eps=1e-1)
xb, yb = next(iter(dls.valid))
test_close(xb.min(), -1, eps=1e-1)
test_close(xb.max(), 1, eps=1e-1)
TSClipOutliers min=-1, max=1

{% endraw %} {% raw %}

class TSClip[source]

TSClip(min=-6, max=6) :: Transform

Clip batch of type TSTensor

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = TSTensor(torch.randn(10, 20, 100)*10)
test_le(TSClip()(t).max().item(), 6)
test_ge(TSClip()(t).min().item(), -6)
{% endraw %} {% raw %}

class TSRobustScale[source]

TSRobustScale(median=None, min=None, max=None, by_sample=False, by_var=False, quantile_range=(25.0, 75.0), use_single_batch=True, verbose=False) :: Transform

This Scaler removes the median and scales the data according to the quantile range (defaults to IQR: Interquartile Range)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, num_workers=0)
xb, yb = next(iter(dls.train))
clipped_xb = TSRobustScale(by_sample=true)(xb)
test_ne(clipped_xb, xb)
clipped_xb.min(), clipped_xb.max(), xb.min(), xb.max()
(TSTensor([-0.6101659536361694], device=cpu),
 TSTensor([1.1915178298950195], device=cpu),
 TSTensor([-2.6447908878326416], device=cpu),
 TSTensor([2.5255849361419678], device=cpu))
{% endraw %} {% raw %}

class TSDiff[source]

TSDiff(lag=1, pad=True) :: Transform

Differences batch of type TSTensor

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = TSTensor(torch.arange(24).reshape(2,3,4))
test_eq(TSDiff()(t)[..., 1:].float().mean(), 1)
test_eq(TSDiff(lag=2, pad=False)(t).float().mean(), 2)
{% endraw %} {% raw %}

class TSLog[source]

TSLog(ex=None, **kwargs) :: Transform

Log transforms batch of type TSTensor + 1. Accepts positive and negative numbers

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = TSTensor(torch.rand(2,3,4)) * 2 - 1 
tfm = TSLog()
enc_t = tfm(t)
test_ne(enc_t, t)
test_close(tfm.decodes(enc_t).data, t.data)
{% endraw %} {% raw %}

class TSCyclicalPosition[source]

TSCyclicalPosition(magnitude=None, **kwargs) :: Transform

Concatenates the position along the sequence as 2 additional variables (sine and cosine)

Args: magnitude: added for compatibility. It's not used.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
bs, c_in, seq_len = 1,3,100
t = TSTensor(torch.rand(bs, c_in, seq_len))
enc_t = TSCyclicalPosition()(t)
test_ne(enc_t, t)
assert t.shape[1] == enc_t.shape[1] - 2
plt.plot(enc_t[0, -2:].cpu().numpy().T)
plt.show()
{% endraw %} {% raw %}

class TSLinearPosition[source]

TSLinearPosition(magnitude=None, lin_range=(-1, 1), **kwargs) :: Transform

Concatenates the position along the sequence as 1 additional variable

Args: magnitude: added for compatibility. It's not used.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
bs, c_in, seq_len = 1,3,100
t = TSTensor(torch.rand(bs, c_in, seq_len))
enc_t = TSLinearPosition()(t)
test_ne(enc_t, t)
assert t.shape[1] == enc_t.shape[1] - 1
plt.plot(enc_t[0, -1].cpu().numpy().T)
plt.show()
{% endraw %} {% raw %}

class TSLogReturn[source]

TSLogReturn(lag=1, pad=True) :: Transform

Calculates log-return of batch of type TSTensor. For positive values only

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = TSTensor([1,2,4,8,16,32,64,128,256]).float()
test_eq(TSLogReturn(pad=False)(t).std(), 0)
{% endraw %} {% raw %}

class TSAdd[source]

TSAdd(add) :: Transform

Add a defined amount to each batch of type TSTensor.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = TSTensor([1,2,3]).float()
test_eq(TSAdd(1)(t), TSTensor([2,3,4]).float())
{% endraw %}

y transforms

{% raw %}

class Preprocessor[source]

Preprocessor(preprocessor, **kwargs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
from tsai.data.validation import TimeSplitter
y = random_shuffle(np.random.randn(1000) * 10 + 5)
splits = TimeSplitter()(y)
preprocessor = Preprocessor(StandardScaler)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
{% endraw %} {% raw %}
y = random_shuffle(np.random.randn(1000) * 10 + 5)
splits = TimeSplitter()(y)
preprocessor = Preprocessor(RobustScaler)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
{% endraw %} {% raw %}
y = random_shuffle(np.random.rand(1000) * 3 + .5)
splits = TimeSplitter()(y)
preprocessor = Preprocessor(Normalizer)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
{% endraw %} {% raw %}
y = random_shuffle(np.random.rand(1000) * 10 + 5)
splits = TimeSplitter()(y)
preprocessor = Preprocessor(BoxCox)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
{% endraw %} {% raw %}
y = random_shuffle(np.random.randn(1000) * 10 + 5)
y = np.random.beta(.5, .5, size=1000)
splits = TimeSplitter()(y)
preprocessor = Preprocessor(YeoJohnshon)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
{% endraw %} {% raw %}
y = - np.random.beta(1, .5, 10000) * 10
splits = TimeSplitter()(y)
preprocessor = Preprocessor(Quantile)
preprocessor.fit(y[splits[0]])
plt.hist(y, 50, label='ori',)
y_tfm = preprocessor.transform(y)
plt.legend(loc='best')
plt.show()
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
test_close(preprocessor.inverse_transform(y_tfm), y, 1e-1)
{% endraw %} {% raw %}

ReLabeler[source]

ReLabeler(cm)

Changes the labels in a dataset based on a dictionary (class mapping) Args: cm = class mapping dictionary

{% endraw %} {% raw %}
{% endraw %} {% raw %}
vals = {0:'a', 1:'b', 2:'c', 3:'d', 4:'e'}
y = np.array([vals[i] for i in np.random.randint(0, 5, 20)])
labeler = ReLabeler(dict(a='x', b='x', c='y', d='z', e='z'))
y_new = labeler(y)
test_eq(y.shape, y_new.shape)
y, y_new
(array(['b', 'e', 'd', 'c', 'd', 'e', 'e', 'd', 'a', 'e', 'd', 'e', 'e',
        'd', 'e', 'c', 'b', 'b', 'd', 'd'], dtype='<U1'),
 array(['x', 'z', 'z', 'y', 'z', 'z', 'z', 'z', 'x', 'z', 'z', 'z', 'z',
        'z', 'z', 'y', 'x', 'x', 'z', 'z'], dtype='<U1'))
{% endraw %}