The purpose of this notebook is to create a scalability benchmark (time
and performance). To that end, Nixtla’s
StatsForecast (using the ETS
model) is trained on the M5 dataset using spark to distribute the
training. As a comparison, Facebook’s
Prophet model is used.
An AWS cluster (mounted on databricks) of 11 instances of type
m5.2xlarge (8 cores, 32 GB RAM) with runtime 10.4 LTS was used.
This
notebook was used as base case.
The example uses the M5
dataset.
It consists of 30,490
bottom time series.
Main results
Method | Time (mins) | Performance (wRMSSE) |
---|
StatsForecast | 7.5 | 0.68 |
Prophet | 18.23 | 0.77 |
Installing libraries
pip install prophet "neuralforecast<1.0.0" "statsforecast[fugue]"
StatsForecast pipeline
from time import time
from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import ETS, SeasonalNaive
from statsforecast.core import StatsForecast
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})
Forecast
With statsforecast you don’t have to download your data. The distributed
backend can handle a file with your data.
init = time()
ets_forecasts = backend.forecast(
"s3://m5-benchmarks/data/train/m5-target.parquet",
[ETS(season_length=7, model='ZAA')],
freq="D",
h=28,
).toPandas()
end = time()
print(f'Minutes taken by StatsForecast on a Spark cluster: {(end - init) / 60}')
The M5 competition used the weighted root mean squared scaled error. You
can find details of the metric
here.
Y_hat = ets_forecasts.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])
wrmsse_ets = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
| wrmsse |
---|
Total | 0.682358 |
Level1 | 0.449115 |
Level2 | 0.533754 |
Level3 | 0.592317 |
Level4 | 0.497086 |
Level5 | 0.572189 |
Level6 | 0.593880 |
Level7 | 0.665358 |
Level8 | 0.652183 |
Level9 | 0.734492 |
Level10 | 1.012633 |
Level11 | 0.969902 |
Level12 | 0.915380 |
Prophet pipeline
import logging
from time import time
import pandas as pd
from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from prophet import Prophet
from pyspark.sql.types import *
logging.getLogger('py4j').setLevel(logging.ERROR)
Download data
train_schema = StructType([
StructField('unique_id', StringType()),
StructField('ds', DateType()),
StructField('y', DoubleType())
])
train = spark.read.parquet(
's3://m5-benchmarks/data/train/m5-target.parquet',
header=True,
schema=train_schema
)
train.createOrReplaceTempView('train')
sql_statement = '''
SELECT
unique_id AS unique_id,
CAST(ds as date) as ds,
y as y
FROM train
'''
m5_history = (
spark
.sql( sql_statement )
.repartition(sc.defaultParallelism, ['unique_id'])
).cache()
Forecast function using Prophet
def forecast( history_pd: pd.DataFrame ) -> pd.DataFrame:
history_pd = history_pd.dropna()
model = Prophet(
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
model.fit( history_pd )
future_pd = model.make_future_dataframe(
periods=28,
freq='d',
include_history=False
)
forecast_pd = model.predict( future_pd )
forecast_pd['unique_id'] = history_pd['unique_id'].unique()[0]
f_pd = forecast_pd[['unique_id', 'ds','yhat']]
return f_pd
result_schema = StructType([
StructField('unique_id', StringType()),
StructField('ds',DateType()),
StructField('yhat',FloatType()),
])
Training Prophet on the M5 dataset
init = time()
results = (
m5_history
.groupBy('unique_id')
.applyInPandas(forecast, schema=result_schema)
).toPandas()
end = time()
print(f'Minutes taken by Prophet on a Spark cluster: {(end - init) / 60}')
The M5 competition used the weighted root mean squared scaled error. You
can find details of the metric
here.
Y_hat = results.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])
wrmsse = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
| wrmsse |
---|
Total | 0.771800 |
Level1 | 0.507905 |
Level2 | 0.586328 |
Level3 | 0.666686 |
Level4 | 0.549358 |
Level5 | 0.655003 |
Level6 | 0.647176 |
Level7 | 0.747047 |
Level8 | 0.743422 |
Level9 | 0.824667 |
Level10 | 1.207069 |
Level11 | 1.108780 |
Level12 | 1.018163 |