--- title: ES-RNN: Exponential Smoothing Recurrent Neural Network keywords: fastai sidebar: home_sidebar summary: "API details." description: "API details." nb_path: "nbs/models_esrnn__esrnn.ipynb" ---
{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %}

Test ES-RNN model

{% raw %}
def test_no_leakage_season(n_series, input_size, output_size, seasonality):
    """
    This test checks no leakage for seasonality.
    """
    t.manual_seed(1)
    model = _ESRNN(n_series=n_series, input_size=input_size, output_size=output_size, 
                   output_size_m=1, n_t=0, n_s=0, 
                   es_component='multiplicative', seasonality=seasonality, noise_std=0.000000001,
                   cell_type='GRU',
                   add_nl_layer=False, dilations=[[1, 2]], state_hsize=30)
    S = t.empty((n_series, 0))
    X = t.empty((n_series, 0))
    Y = t.normal(0, 1, (n_series, 2 * (input_size + output_size)))
    Y += Y.min().abs() + 10
    sample_mask = t.ones_like(Y)
    sample_mask[:, -output_size] = 0
    idxs = t.arange(n_series)
    
    # Testing different values for output_size
    # forecasts should be the same in all cases
    # except for numeric exceptions
    Y_to_test = [Y] * 4
    Y_to_test[1][:, -output_size:] = 10_000
    Y_to_test[2][:, -output_size:] = 0
    Y_to_test[3][:, -output_size:] = -10_000
    
    forecasts = []
    for Y in Y_to_test:
        # forward es and rnn
        windows_y_insample, windows_y_outsample, levels, seasonalities, sample_mask_w = model.es(S, Y, X, idxs, sample_mask=sample_mask)
        trends = model.rnn(windows_y_insample)
        trends = trends.permute(1, 0, 2)

        # predict es
        y_hat = model.es.predict(trends=trends, levels=levels, seasonalities=seasonalities)

        forecasts.append(y_hat)
        
    assert all(t.allclose(forecasts[0], forecast) for forecast in forecasts), (
        'Season leakage detected ',
        'please check.'
    )
{% endraw %} {% raw %}
test_no_leakage_season(n_series=10, input_size=10, output_size=8, seasonality=[6])
{% endraw %} {% raw %}
test_no_leakage_season(n_series=10, input_size=10, output_size=2, seasonality=[6])
{% endraw %} {% raw %}
test_no_leakage_season(n_series=10, input_size=10, output_size=8, seasonality=[12, 24])
{% endraw %} {% raw %}
test_no_leakage_season(n_series=10, input_size=10, output_size=8, seasonality=[6, 12])
{% endraw %} {% raw %}
test_no_leakage_season(n_series=10, input_size=10, output_size=18, seasonality=[12])
{% endraw %} {% raw %}
test_no_leakage_season(n_series=10, input_size=10, output_size=4, seasonality=[1])
{% endraw %} {% raw %}
test_no_leakage_season(n_series=10, input_size=10, output_size=18, seasonality=[12, 24])
{% endraw %}

ES-RNN model wrapper

{% raw %}
{% endraw %} {% raw %}

class ESRNN[source]

ESRNN(n_series:int, input_size:int, output_size:int, n_x:int=0, n_s:int=0, sample_freq:int=1, es_component:str='multiplicative', cell_type:str='LSTM', state_hsize:int=50, dilations:List[List[int]]=[[1, 2], [4, 8]], add_nl_layer:bool=False, seasonality:List[int]=[], learning_rate:float=0.001, lr_scheduler_step_size:int=9, lr_decay:float=0.9, per_series_lr_multip:float=1.0, gradient_eps:float=1e-08, gradient_clipping_threshold:float=20.0, rnn_weight_decay:float=0.0, noise_std:float=0.001, level_variability_penalty:float=20.0, testing_percentile:Union[int, List[T]]=50, training_percentile:Union[int, List[T]]=50, loss:str='SMYL', val_loss:str='MAE', frequency:str='D') :: LightningModule

Hooks to be used in LightningModule.

{% endraw %} {% raw %}
{% endraw %} {% raw %}

ESRNN.forecast[source]

ESRNN.forecast(Y_df:DataFrame, X_df:DataFrame=None, S_df:DataFrame=None, batch_size:int=1, trainer:Trainer=None)

Method for forecasting self.output_size periods after last timestamp of Y_df.

Parameters

Y_df: pd.DataFrame Dataframe with target time-series data, needs 'unique_id','ds' and 'y' columns. X_df: pd.DataFrame Dataframe with exogenous time-series data, needs 'unique_id' and 'ds' columns. Note that 'unique_id' and 'ds' must match Y_df plus the forecasting horizon. S_df: pd.DataFrame Dataframe with static data, needs 'unique_id' column. bath_size: int Batch size for forecasting. trainer: pl.Trainer Trainer object for model training and evaluation.

Returns

forecast_df: pd.DataFrame Dataframe with forecasts.

{% endraw %} {% raw %}
{% endraw %}

ES-RNN Univariate Example

{% raw %}
import matplotlib.pyplot as plt

from neuralforecast.data.datasets.epf import EPF, EPFInfo

from pytorch_lightning.callbacks import EarlyStopping
{% endraw %} {% raw %}
Y_df, X_df, _ = EPF.load(directory='./data', group=EPFInfo.groups[0])

X_df = X_df[['unique_id', 'ds', 'week_day']]

# Trimming series to avoid slow backprop through time
Y_df = Y_df.groupby('unique_id').tail(90*24+30*24)
X_df = X_df.groupby('unique_id').tail(90*24+30*24)

# Leveling Y_df (multiplicative model)
Y_min = Y_df.y.min()
Y_df.y = Y_df.y - Y_min + 20

train_dataset = TimeSeriesDataset(Y_df=Y_df, X_df=X_df,
                                  ds_in_test=30*24,
                                  is_test=False,
                                  input_size=7*24,
                                  output_size=24,
                                  verbose=True)

valid_dataset = TimeSeriesDataset(Y_df=Y_df, X_df=X_df,
                                  ds_in_test=30*24,
                                  is_test=True,
                                  input_size=7*24,
                                  output_size=24,
                                  verbose=True)

train_loader = TimeSeriesLoader(dataset=train_dataset,
                                batch_size=32,
                                shuffle=True)

valid_loader = TimeSeriesLoader(dataset=valid_dataset,
                                batch_size=1,
                                shuffle=False)
{% endraw %} {% raw %}
model = ESRNN(# Architecture parameters
    n_series=train_dataset.n_series,
    n_s=train_dataset.n_s,
    n_x=train_dataset.n_x,
    input_size=train_dataset.input_size,
    output_size=train_dataset.output_size,
    sample_freq=train_dataset.output_size,
    es_component='multiplicative',
    cell_type='LSTM',
    state_hsize=50,
    dilations=[[1, 24], [48, 168]],
    add_nl_layer=False,
    # Regularization and optimization parameters
    learning_rate=0.005,
    lr_scheduler_step_size=10,
    lr_decay=0.9,
    per_series_lr_multip=1.5,
    gradient_eps=1e-8,
    gradient_clipping_threshold=50,
    rnn_weight_decay=0,
    noise_std=0.001,
    level_variability_penalty=10,
    testing_percentile=50,
    training_percentile=51,
    loss='SMYL',
    val_loss='MAE',
    seasonality=[24],
    frequency='Y'
)
{% endraw %} {% raw %}
early_stopping = EarlyStopping(monitor="val_loss", 
                               min_delta=1e-4, 
                               patience=5, verbose=True, 
                               mode="min")

trainer = pl.Trainer(max_epochs=1, progress_bar_refresh_rate=1, 
                     log_every_n_steps=5, check_val_every_n_epoch=5,
                     callbacks=[early_stopping])
trainer.fit(model, train_loader, valid_loader)
{% endraw %} {% raw %}
outputs = trainer.predict(model, valid_loader)

y_true, y_hat, sample_mask = zip(*outputs)
y_true = t.cat(y_true).cpu()
y_hat = t.cat(y_hat).cpu()
sample_mask = t.cat(sample_mask).cpu()

print("Original")
print("y_true.shape", y_true.shape)
print("y_hat.shape", y_hat.shape)
y_true = y_true.flatten(1,2)
y_hat = y_hat.flatten(1,2)
sample_mask = sample_mask.flatten(1,2)

print("\nFlatten")
print("y_true.shape", y_true.shape)
print("y_hat.shape", y_hat.shape)
print("sample_mask.shape", sample_mask.shape)
{% endraw %} {% raw %}
y_true = y_true + Y_min - 20
y_hat = y_hat + Y_min - 20
{% endraw %} {% raw %}
from scipy import stats

import pandas as pd

from neuralforecast.losses.numpy import mae, rmae, smape, rmse
from neuralforecast.data.datasets.epf import epf_naive_forecast

Y_naive_df = epf_naive_forecast(Y_df)

# Filter test hours
y_true = y_true[0, -30*24:]
y_hat = y_hat[0, -30*24:]
y_naive = Y_naive_df.y_hat.values[-30*24:]

metrics = pd.Series({'mae' :  mae(y=y_true, y_hat=y_hat),
                     'rmae':  rmae(y=y_true, y_hat1=y_hat, y_hat2=y_naive),
                     'smape': smape(y=y_true, y_hat=y_hat),
                     'rmse':  rmse(y=y_true, y_hat=y_hat)})

print(metrics)
print('\n')
print(stats.describe(y_true-y_hat))
print(f'model.training_percentile {model.training_percentile}')

plt.plot(sample_mask[0,:])
plt.show()
{% endraw %} {% raw %}
start = 0
end = 7 * 24

fig = plt.figure(figsize=(15, 6))
plt.plot(y_true[start:end], color='#628793', linewidth=0.4, label='true')
plt.plot(y_hat[start:end], color='peru', linewidth=0.4, label='forecast')
plt.ylabel('Price [EUR/MWh]', fontsize=15)
plt.xlabel('Date', fontsize=15)
plt.legend()
plt.grid()
plt.show()
{% endraw %}

ES-RNN Multivariate Example

{% raw %}
from neuralforecast.data.datasets.tourism import Tourism, TourismInfo

group = TourismInfo['Yearly']
print(group.name)
Y_df, *_ = Tourism.load(directory='./data', group=group.name)
{% endraw %} {% raw %}
fill_uids = Y_df.groupby('unique_id').size().sort_values() \
                .add(-group.horizon-1) \
                .loc[lambda x: x < 2 * group.horizon] \
                .index
{% endraw %} {% raw %}
new_ys = []
for uid in fill_uids:
    y = Y_df.query('unique_id == @uid')

    ds_to_fill = 2 * group.horizon - y.shape[0] + group.horizon + 1

    ds = pd.date_range(end = y.iloc[0].ds, periods = ds_to_fill + 1, freq = group.freq)[:ds_to_fill]
    new_y = pd.DataFrame({'ds': ds})
    new_y['unique_id'] = y.iloc[0].unique_id
    new_y['y'] = y.iloc[0].y
    
    new_ys.append(new_y)
new_ys = pd.concat(new_ys)
{% endraw %} {% raw %}
Y_df = pd.concat([Y_df, new_ys]).sort_values(['unique_id', 'ds'], ignore_index=True)
{% endraw %} {% raw %}
train_dataset = TimeSeriesDataset(Y_df=Y_df,
                                  ds_in_test=group.horizon,
                                  is_test=False,
                                  input_size=group.horizon,
                                  output_size=group.horizon,
                                  verbose=True)

valid_dataset = TimeSeriesDataset(Y_df=Y_df,
                                  ds_in_test=0,
                                  is_test=True,
                                  input_size=group.horizon,
                                  output_size=group.horizon,
                                  verbose=True)
{% endraw %} {% raw %}
train_loader = TimeSeriesLoader(dataset=train_dataset,
                                batch_size=32,
                                eq_batch_size=True,
                                shuffle=True)

valid_loader = TimeSeriesLoader(dataset=valid_dataset,
                                batch_size=1,
                                shuffle=False)
{% endraw %} {% raw %}
dataloader = iter(train_loader)
batch = next(dataloader)
S, Y, X = batch['S'], batch['Y'], batch['X']
available_mask = batch['available_mask']
idxs = batch['idxs']

print("S.shape", S.shape)
print("Y.shape", Y.shape)
print("X.shape", X.shape)
print("idxs.shape", idxs.shape)
{% endraw %} {% raw %}
model = ESRNN(
    n_series=train_dataset.n_series,
    n_s=train_dataset.n_s,
    n_x=train_dataset.n_x,
    input_size=train_dataset.input_size,
    output_size=train_dataset.output_size,
    sample_freq=1,
    learning_rate=5e-3,
    lr_scheduler_step_size=100,
    lr_decay=0.9,
    per_series_lr_multip=1.5,
    gradient_eps=1e-8,
    gradient_clipping_threshold=10,
    rnn_weight_decay=0,
    noise_std=0.001,
    level_variability_penalty=10,
    testing_percentile=50,
    training_percentile=50,
    es_component='multiplicative',
    cell_type='GRU',
    state_hsize=50,
    dilations=[[24, 48], [168]],
    add_nl_layer=False,
    loss='SMYL',
    val_loss='MAE',
    seasonality=[1],
    frequency='Y'
)
{% endraw %} {% raw %}
early_stopping = EarlyStopping(monitor="val_loss", 
                               min_delta=1e-4, 
                               patience=1, verbose=True, 
                               mode="min")
{% endraw %} {% raw %}
trainer = pl.Trainer(max_epochs=2, progress_bar_refresh_rate=1, 
                     log_every_n_steps=1, check_val_every_n_epoch=1,
                     callbacks=[early_stopping])
trainer.fit(model, train_loader, valid_loader)
{% endraw %} {% raw %}
outputs = trainer.predict(model, valid_loader)
y, y_hat, mask = zip(*outputs)
y = t.cat(y, axis=1)
y_hat = t.cat(y_hat, axis=1)
mask = t.cat(mask, axis=1)
print("y_true.shape", y.shape)
print("y_hat.shape", y_hat.shape)
print("mask.shape", mask.shape)
{% endraw %} {% raw %}
model.forecast(Y_df)
{% endraw %}