--- title: Forecast using auto_arima at scale keywords: fastai sidebar: home_sidebar nb_path: "examples/AutoArima_vs_Prophet.ipynb" ---
The auto_arima
model is widely used to forecast time series in production and as a benchmark. However, the python implementation (pmdarima
) is so slow that prevent data scientist practioners from quickly iterating and deploying auto_arima
in production for a large number of time series. In this notebook we present Nixtla's auto_arima
based on the R implementation (developed by Rob Hyndman) and optimized using numba
.
import random
import time
from multiprocessing import cpu_count, Pool # for prophet
from itertools import product
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from prophet import Prophet
from statsforecast.core import StatsForecast
from statsforecast.models import auto_arima
from statsmodels.graphics.tsaplots import plot_acf
from sklearn.model_selection import ParameterGrid
The plot_grid
function defined below will be useful to plot different time series, and different models' forecasts.
def plot_grid(df_train, df_test=None, plot_random=True):
fig, axes = plt.subplots(4, 2, figsize = (24, 14))
unique_ids = df_train['unique_id'].unique()
assert len(unique_ids) >= 8, "Must provide at least 8 ts"
if plot_random:
unique_ids = random.sample(list(unique_ids), k=8)
else:
unique_uids = unique_ids[:8]
for uid, (idx, idy) in zip(unique_ids, product(range(4), range(2))):
train_uid = df_train.query('unique_id == @uid')
axes[idx, idy].plot(train_uid['ds'], train_uid['y'], label = 'y_train')
if df_test is not None:
max_ds = train_uid['ds'].max()
test_uid = df_test.query('unique_id == @uid')
for model in df_test.drop(['unique_id', 'ds'], axis=1).columns:
if all(np.isnan(test_uid[model])):
continue
axes[idx, idy].plot(test_uid['ds'], test_uid[model], label=model)
axes[idx, idy].set_title(f'M4 Hourly: {uid}')
axes[idx, idy].set_xlabel('Timestamp [t]')
axes[idx, idy].set_ylabel('Target')
axes[idx, idy].legend(loc='upper left')
axes[idx, idy].xaxis.set_major_locator(plt.MaxNLocator(20))
axes[idx, idy].grid()
fig.subplots_adjust(hspace=0.5)
plt.show()
def plot_autocorrelation_grid(df_train):
fig, axes = plt.subplots(4, 2, figsize = (24, 14))
unique_ids = df_train['unique_id'].unique()
assert len(unique_ids) >= 8, "Must provide at least 8 ts"
unique_ids = random.sample(list(unique_ids), k=8)
for uid, (idx, idy) in zip(unique_ids, product(range(4), range(2))):
train_uid = df_train.query('unique_id == @uid')
plot_acf(train_uid['y'].values, ax=axes[idx, idy],
title=f'ACF M4 Hourly {uid}')
axes[idx, idy].set_xlabel('Timestamp [t]')
axes[idx, idy].set_ylabel('Autocorrelation')
fig.subplots_adjust(hspace=0.5)
plt.show()
For testing purposes, we will use the Hourly dataset from the M4 competition.
!wget https://auto-arima-results.s3.amazonaws.com/M4-Hourly.csv
!wget https://auto-arima-results.s3.amazonaws.com/M4-Hourly-test.csv
train = pd.read_csv('M4-Hourly.csv')
test = pd.read_csv('M4-Hourly-test.csv').rename(columns={'y': 'y_test'})
In this example we will use a subset of the data to avoid waiting too long. You can modify the number of series if you want.
n_series = 16
uids = train['unique_id'].unique()[:n_series]
train = train.query('unique_id in @uids')
test = test.query('unique_id in @uids')
plot_grid(train, test)
Would nn autorregresive model be the right choice for our data? There is no doubt that we observe seasonal periods. The autocorrelation function (acf
) can help us to answer the question. Intuitively, we have to observe a decreasing correlation to opt for an AR model.
plot_autocorrelation_grid(train)
Thus, we observe a high autocorrelation for previous lags and also for the seasonal lags. Therefore, we will let auto_arima
to handle our data.
StatsForecast
receives a list of models to fit each time series. If you want to provide an extra argument to the model's function you have to pass a tuple of the form (model, arg1, arg2)
. Since we are dealing with Hourly data, it would be benefitial to use 24 as seasonality.
?auto_arima
As we see, we can pass season_length
to auto_arima
, so the definition of our models would be,
models = [(auto_arima, 24)]
fcst = StatsForecast(train.set_index('unique_id'), models=models, freq='H',
n_jobs=min(cpu_count(), n_series))
init = time.time()
with np.errstate(invalid='ignore'):
forecasts = fcst.forecast(48)
end = time.time()
time_nixtla = end - init
time_nixtla
forecasts.head()
forecasts = forecasts.reset_index()
test = test.merge(forecasts, how='left', on=['unique_id', 'ds'])
plot_grid(train, test)
You can use the StatsForecast
class to parallelize your own models. In this section we will use it to run the auto_arima
model from pmdarima
.
from pmdarima import auto_arima as auto_arima_p
def auto_arima_pmdarima(y, h, season_length):
mod = auto_arima_p(y, m=season_length,
with_intercept=False) #ensure comparability with Nixtla's implementation
return mod.predict(h)
n_series_pmdarima = 2
fcst = StatsForecast(
train.query('unique_id in ["H1", "H10"]').set_index('unique_id'),
models=[(auto_arima_pmdarima, 24)],
freq='H',
n_jobs=min(n_series_pmdarima, cpu_count())
)
init = time.time()
forecast_pmdarima = fcst.forecast(48)
end = time.time()
time_pmdarima = end - init
time_pmdarima
forecast_pmdarima.head()
forecast_pmdarima = forecast_pmdarima.reset_index()
test = test.merge(forecast_pmdarima, how='left', on=['unique_id', 'ds'])
plot_grid(train, test, plot_random=False)
Prophet
is designed to receive a pandas dataframe, so we cannot use StatForecast
. Therefore, we need to parallize from scratch.
params_grid = {'seasonality_mode': ['multiplicative','additive'],
'growth': ['linear', 'flat'],
'changepoint_prior_scale': [0.1, 0.2, 0.3, 0.4, 0.5],
'n_changepoints': [5, 10, 15, 20]}
grid = ParameterGrid(params_grid)
def fit_and_predict(index, ts):
df = ts.drop('unique_id', 1)
max_ds = df['ds'].max()
df['ds'] = pd.date_range(start='1970-01-01', periods=df.shape[0], freq='H')
df_val = df.tail(48)
df_train = df.drop(df_val.index)
y_val = df_val['y'].values
if len(df_train) >= 48:
val_results = {'losses': [], 'params': []}
for params in grid:
model = Prophet(seasonality_mode=params['seasonality_mode'],
growth=params['growth'],
weekly_seasonality=True,
daily_seasonality=True,
yearly_seasonality=True,
n_changepoints=params['n_changepoints'],
changepoint_prior_scale=params['changepoint_prior_scale'])
model = model.fit(df_train)
forecast = model.make_future_dataframe(periods=48,
include_history=False,
freq='H')
forecast = model.predict(forecast)
forecast['unique_id'] = index
forecast = forecast.filter(items=['unique_id', 'ds', 'yhat'])
loss = np.mean(abs(y_val - forecast['yhat'].values))
val_results['losses'].append(loss)
val_results['params'].append(params)
idx_params = np.argmin(val_results['losses'])
params = val_results['params'][idx_params]
else:
params = {'seasonality_mode': 'multiplicative',
'growth': 'flat',
'n_changepoints': 150,
'changepoint_prior_scale': 0.5}
model = Prophet(seasonality_mode=params['seasonality_mode'],
growth=params['growth'],
weekly_seasonality=True,
daily_seasonality=True,
yearly_seasonality=True,
n_changepoints=params['n_changepoints'],
changepoint_prior_scale=params['changepoint_prior_scale'])
model = model.fit(df)
forecast = model.make_future_dataframe(periods=48,
include_history=False,
freq='H')
forecast = model.predict(forecast)
forecast.insert(0, 'unique_id', index)
forecast['ds'] = np.arange(max_ds + 1, max_ds + 48 + 1)
forecast = forecast.filter(items=['unique_id', 'ds', 'yhat'])
return forecast
import logging
logging.getLogger('prophet').setLevel(logging.WARNING)
class suppress_stdout_stderr(object):
'''
A context manager for doing a "deep suppression" of stdout and stderr in
Python, i.e. will suppress all print, even if the print originates in a
compiled C/Fortran sub-function.
This will not suppress raised exceptions, since exceptions are printed
to stderr just before a script exits, and after the context manager has
exited (at least, I think that is why it lets exceptions through).
'''
def __init__(self):
# Open a pair of null files
self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)]
# Save the actual stdout (1) and stderr (2) file descriptors.
self.save_fds = [os.dup(1), os.dup(2)]
def __enter__(self):
# Assign the null pointers to stdout and stderr.
os.dup2(self.null_fds[0], 1)
os.dup2(self.null_fds[1], 2)
def __exit__(self, *_):
# Re-assign the real stdout/stderr back to (1) and (2)
os.dup2(self.save_fds[0], 1)
os.dup2(self.save_fds[1], 2)
# Close the null files
for fd in self.null_fds + self.save_fds:
os.close(fd)
import os
init = time.time()
with suppress_stdout_stderr():
with Pool(cpu_count()) as pool:
forecast_prophet = pool.starmap(fit_and_predict, train.groupby('unique_id'))
end = time.time()
forecast_prophet = pd.concat(forecast_prophet).rename(columns={'yhat': 'prophet'})
time_prophet = end - init
time_prophet
forecast_prophet
test = test.merge(forecast_prophet, how='left', on=['unique_id', 'ds'])
plot_grid(train, test)
Since auto_arima
works with numba is useful to calculate the time for just one time series.
fcst = StatsForecast(train.query('unique_id == "H1"').set_index('unique_id'),
models=models, freq='H',
n_jobs=1)
init = time.time()
with np.errstate(invalid='ignore'):
forecasts = fcst.forecast(48)
end = time.time()
time_nixtla_1 = end - init
time_nixtla_1
times = pd.DataFrame({'n_series': np.arange(1, 414 + 1)})
times['pmdarima'] = time_pmdarima * times['n_series'] / n_series_pmdarima
times['prophet'] = time_prophet * times['n_series'] / n_series
times['auto_arima_nixtla'] = time_nixtla_1 + times['n_series'] * (time_nixtla - time_nixtla_1) / n_series
times = times.set_index('n_series')
fig, axes = plt.subplots(1, 2, figsize = (24, 7))
(times/3600).plot(ax=axes[0], linewidth=4)
np.log10(times).plot(ax=axes[1], linewidth=4)
axes[0].set_title('Time across models [Hours]', fontsize=22)
axes[1].set_title('Time across models [Log10 Scale]', fontsize=22)
axes[0].set_ylabel('Time [Hours]', fontsize=20)
axes[1].set_ylabel('Time Seconds [Log10 Scale]', fontsize=20)
fig.suptitle('Time comparison using M4-Hourly data', fontsize=27)
for ax in axes:
ax.set_xlabel('Number of Time Series [N]', fontsize=20)
ax.legend(prop={'size': 20})
ax.grid()
for label in (ax.get_xticklabels() + ax.get_yticklabels()):
label.set_fontsize(20)
fig.savefig('computational-efficiency.png', dpi=300)
name_models = test.drop(['unique_id', 'ds', 'y_test'], 1).columns.tolist()
test_pmdarima = test.query('unique_id in ["H1", "H10"]')
eval_pmdarima = []
for model in name_models:
mae = np.mean(abs(test_pmdarima[model] - test_pmdarima['y_test']))
eval_pmdarima.append({'model': model, 'mae': mae})
pd.DataFrame(eval_pmdarima).sort_values('mae')
eval_prophet = []
for model in name_models:
if 'pmdarima' in model:
continue
mae = np.mean(abs(test[model] - test['y_test']))
eval_prophet.append({'model': model, 'mae': mae})
pd.DataFrame(eval_prophet).sort_values('mae')