--- title: TSTPlus (Time Series Transformer) keywords: fastai sidebar: home_sidebar summary: "This is an unofficial PyTorch implementation created by Ignacio Oguiza (timeseriesAI@gmail.com) based on TST (Zerveas, 2020) and Transformer (Vaswani, 2017)." description: "This is an unofficial PyTorch implementation created by Ignacio Oguiza (timeseriesAI@gmail.com) based on TST (Zerveas, 2020) and Transformer (Vaswani, 2017)." nb_path: "nbs/108c_models.TSTPlus.ipynb" ---
References:
No official implementation available as far as I know (Oct 10th, 2020)
Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., ... & Polosukhin, I. (2017). Attention is all you need. In Advances in neural information processing systems (pp. 5998-6008).
He, R., Ravula, A., Kanagal, B., & Ainslie, J. (2020). Realformer: Transformer Likes Informed Attention. arXiv preprint arXiv:2012.11747.
This implementation is adapted to work with the rest of the tsai
library, and contain some hyperparameters that are not available in the original implementation. I included them for experimenting.
In general, transformers require a lower lr compared to other time series models when used with the same datasets. It's important to use learn.lr_find()
to learn what a good lr may be. In general, I've found lr between 1e-4 to 3e-4 work well.
The paper authors recommend to standardize data by feature. This can be done by adding TSStandardize(by_var=True)
as a batch_tfm when creating the TSDataLoaders
.
When using TST with a long time series, you may use max_w_len
to reduce the memory size and thus avoid gpu issues. By default it's set to 512.
I've tried different types of positional encoders. In my experience, the default one works just fine.
pe = PositionalEncoding(1000, 512).detach().cpu().numpy()
plt.pcolormesh(pe, cmap='viridis')
plt.title('PositionalEncoding')
plt.colorbar()
plt.show()
pe.mean(), pe.std(), pe.min(), pe.max(), pe.shape
cpe = Coord2dPosEncoding(1000, 512, exponential=True, normalize=True).cpu().numpy()
plt.pcolormesh(cpe, cmap='viridis')
plt.title('Coord2dPosEncoding')
plt.colorbar()
plt.show()
plt.plot(cpe.mean(0))
plt.show()
plt.plot(cpe.mean(1))
plt.show()
cpe.mean(), cpe.std(), cpe.min(), cpe.max()
cpe = Coord1dPosEncoding(1000, exponential=True, normalize=True).detach().cpu().numpy()
plt.pcolormesh(cpe, cmap='viridis')
plt.title('Coord1dPosEncoding')
plt.colorbar()
plt.show()
plt.plot(cpe.mean(1))
plt.show()
cpe.mean(), cpe.std(), cpe.min(), cpe.max(), cpe.shape
cpe = Coord1dPosEncoding(1000, exponential=True, normalize=True).detach().cpu().numpy()
plt.pcolormesh(cpe, cmap='viridis')
plt.title('Coord1dPosEncoding')
plt.colorbar()
plt.show()
plt.plot(cpe.mean(1))
plt.show()
cpe.mean(), cpe.std(), cpe.min(), cpe.max()
t = torch.rand(16, 50, 128)
attn_mask = torch.triu(torch.ones(50, 50)) # shape: q_len x q_len
key_padding_mask = torch.zeros(16, 50)
key_padding_mask[[1, 3, 6, 15], -10:] = 1
key_padding_mask = key_padding_mask.bool()
print('attn_mask', attn_mask.shape, 'key_padding_mask', key_padding_mask.shape)
encoder = _TSTEncoderLayer(q_len=50, d_model=128, n_heads=8, d_k=None, d_v=None, d_ff=512, res_dropout=0.1, store_attn=True, activation='gelu')
output = encoder(t, key_padding_mask=key_padding_mask, attn_mask=attn_mask)
output.shape
cmap='viridis'
figsize=(6,5)
plt.figure(figsize=figsize)
plt.pcolormesh(encoder.attn[0][0].detach().cpu().numpy(), cmap=cmap)
plt.title('Self-attention map')
plt.colorbar()
plt.show()
from tsai.models.utils import build_ts_model
bs = 8
c_in = 9 # aka channels, features, variables, dimensions
c_out = 2
seq_len = 1_500
xb = torch.randn(bs, c_in, seq_len).to(device)
# standardize by channel by_var based on the training set
xb = (xb - xb.mean((0, 2), keepdim=True)) / xb.std((0, 2), keepdim=True)
# Settings
max_seq_len = 256
d_model = 128
n_heads = 16
d_k = d_v = None # if None --> d_model // n_heads
d_ff = 256
res_dropout = 0.1
activation = "gelu"
n_layers = 3
fc_dropout = 0.1
pe = None
learn_pe = True
kwargs = {}
model = TSTPlus(c_in, c_out, seq_len, max_seq_len=max_seq_len, d_model=d_model, n_heads=n_heads,
d_k=d_k, d_v=d_v, d_ff=d_ff, res_dropout=res_dropout, activation=activation, n_layers=n_layers,
fc_dropout=fc_dropout, pe=pe, learn_pe=learn_pe, **kwargs).to(device)
test_eq(model(xb).shape, [bs, c_out])
test_eq(model[0], model.backbone)
test_eq(model[1], model.head)
model2 = build_ts_model(TSTPlus, c_in, c_out, seq_len, max_seq_len=max_seq_len, d_model=d_model, n_heads=n_heads,
d_k=d_k, d_v=d_v, d_ff=d_ff, res_dropout=res_dropout, activation=activation, n_layers=n_layers,
fc_dropout=fc_dropout, pe=pe, learn_pe=learn_pe, **kwargs).to(device)
test_eq(model2(xb).shape, [bs, c_out])
test_eq(model2[0], model2.backbone)
test_eq(model2[1], model2.head)
print(f'model parameters: {count_parameters(model)}')
model = TSTPlus(c_in, c_out, seq_len, pre_norm=True).to(device)
test_eq(model(xb).shape, [bs, c_out])
bs = 8
c_in = 9 # aka channels, features, variables, dimensions
c_out = 2
seq_len = 5000
xb = torch.randn(bs, c_in, seq_len)
# standardize by channel by_var based on the training set
xb = (xb - xb.mean((0, 2), keepdim=True)) / xb.std((0, 2), keepdim=True)
model = TSTPlus(c_in, c_out, seq_len, res_attention=True)
test_eq(model(xb).shape, [bs, c_out])
print(f'model parameters: {count_parameters(model)}')
custom_head = partial(create_pool_head, concat_pool=True)
model = TSTPlus(c_in, c_out, seq_len, max_seq_len=max_seq_len, d_model=d_model, n_heads=n_heads,
d_k=d_k, d_v=d_v, d_ff=d_ff, res_dropout=res_dropout, activation=activation, n_layers=n_layers,
fc_dropout=fc_dropout, pe=pe, learn_pe=learn_pe, flatten=False, custom_head=custom_head, **kwargs)
test_eq(model(xb).shape, [bs, c_out])
print(f'model parameters: {count_parameters(model)}')
custom_head = partial(create_pool_plus_head, concat_pool=True)
model = TSTPlus(c_in, c_out, seq_len, max_seq_len=max_seq_len, d_model=d_model, n_heads=n_heads,
d_k=d_k, d_v=d_v, d_ff=d_ff, res_dropout=res_dropout, activation=activation, n_layers=n_layers,
fc_dropout=fc_dropout, pe=pe, learn_pe=learn_pe, flatten=False, custom_head=custom_head, **kwargs)
test_eq(model(xb).shape, [bs, c_out])
print(f'model parameters: {count_parameters(model)}')
bs = 8
c_in = 9 # aka channels, features, variables, dimensions
c_out = 2
seq_len = 60
xb = torch.randn(bs, c_in, seq_len)
# standardize by channel by_var based on the training set
xb = (xb - xb.mean((0, 2), keepdim=True)) / xb.std((0, 2), keepdim=True)
# Settings
max_seq_len = 120
d_model = 128
n_heads = 16
d_k = d_v = None # if None --> d_model // n_heads
d_ff = 256
res_dropout = 0.1
act = "gelu"
n_layers = 3
fc_dropout = 0.1
pe='zeros'
learn_pe=True
kwargs = {}
# kwargs = dict(kernel_size=5, padding=2)
model = TSTPlus(c_in, c_out, seq_len, max_seq_len=max_seq_len, d_model=d_model, n_heads=n_heads,
d_k=d_k, d_v=d_v, d_ff=d_ff, res_dropout=res_dropout, act=act, n_layers=n_layers,
fc_dropout=fc_dropout, pe=pe, learn_pe=learn_pe, **kwargs)
test_eq(model(xb).shape, [bs, c_out])
print(f'model parameters: {count_parameters(model)}')
body, head = model[0], model[1]
test_eq(body(xb).ndim, 3)
test_eq(head(body(xb)).ndim, 2)
head
model.show_pe()
model = TSTPlus(3, 2, 10)
xb = torch.randn(4, 3, 10)
yb = torch.randint(0, 2, (4,))
test_eq(model.backbone._key_padding_mask(xb)[1], None)
random_idxs = np.random.choice(len(xb), 2, False)
xb[random_idxs, :, -5:] = np.nan
xb[random_idxs, 0, 1] = np.nan
test_eq(model.backbone._key_padding_mask(xb.clone())[1].data, (torch.isnan(xb).float().mean(1)==1).bool())
test_eq(model.backbone._key_padding_mask(xb.clone())[1].data.shape, (4,10))
print(torch.isnan(xb).sum())
pred = model(xb.clone())
loss = CrossEntropyLossFlat()(pred, yb)
loss.backward()
model.backbone._key_padding_mask(xb)[1].data.shape
bs = 4
c_in = 3
seq_len = 10
c_out = 2
xb = torch.randn(bs, c_in, seq_len)
xb[:, -1] = torch.randint(0, 2, (bs, seq_len)).sort()[0]
model = TSTPlus(c_in, c_out, seq_len)
test_eq(model.backbone._key_padding_mask(xb)[1], None)
model = TSTPlus(c_in, c_out, seq_len, padding_var=-1)
test_eq(model.backbone._key_padding_mask(xb)[1], (xb[:, -1]==1))
model = TSTPlus(c_in, c_out, seq_len, padding_var=2)
test_eq(model.backbone._key_padding_mask(xb)[1], (xb[:, -1]==1))
test_eq(model(xb).shape, (bs, c_out))
bs = 8
c_in = 7 # aka channels, features, variables, dimensions
c_out = 2
seq_len = 10
xb2 = torch.randn(bs, c_in, seq_len)
model1 = MultiTSTPlus([2, 5], c_out, seq_len, )
model2 = MultiTSTPlus(7, c_out, seq_len)
test_eq(model1(xb2).shape, (bs, c_out))
test_eq(model1(xb2).shape, model2(xb2).shape)
test_eq(count_parameters(model1) > count_parameters(model2), True)
bs = 8
c_in = 7 # aka channels, features, variables, dimensions
c_out = 2
seq_len = 10
xb2 = torch.randn(bs, c_in, seq_len)
model1 = MultiTSTPlus([2, 5], c_out, seq_len, )
model2 = MultiTSTPlus([[0,2,5], [0,1,3,4,6]], c_out, seq_len)
test_eq(model1(xb2).shape, (bs, c_out))
test_eq(model1(xb2).shape, model2(xb2).shape)
model1 = MultiTSTPlus([2, 5], c_out, seq_len, y_range=(0.5, 5.5))
body, head = split_model(model1)
test_eq(body(xb2).ndim, 3)
test_eq(head(body(xb2)).ndim, 2)
head
model = MultiTSTPlus([2, 5], c_out, seq_len, pre_norm=True)
bs = 8
n_vars = 3
seq_len = 12
c_out = 2
xb = torch.rand(bs, n_vars, seq_len)
net = MultiTSTPlus(n_vars, c_out, seq_len)
change_model_head(net, create_pool_plus_head, concat_pool=False)
print(net(xb).shape)
net.head
bs = 8
n_vars = 3
seq_len = 12
c_out = 10
xb = torch.rand(bs, n_vars, seq_len)
new_head = partial(conv_lin_3d_head, d=(5 ,2))
net = MultiTSTPlus(n_vars, c_out, seq_len, custom_head=new_head)
print(net(xb).shape)
net.head