StatsForecast ETS and Facebook Prophet on Spark (M5)
This notebook was originally executed using DataBricks
The purpose of this notebook is to create a scalability benchmark (time and performance). To that end, Nixtla’s StatsForecast (using the ETS model) is trained on the M5 dataset using spark to distribute the training. As a comparison, Facebook’s Prophet model is used.
An AWS cluster (mounted on databricks) of 11 instances of type m5.2xlarge (8 cores, 32 GB RAM) with runtime 10.4 LTS was used. This notebook was used as base case.
The example uses the M5
dataset.
It consists of 30,490
bottom time series.
Main results
Method | Time (mins) | Performance (wRMSSE) |
---|---|---|
StatsForecast | 7.5 | 0.68 |
Prophet | 18.23 | 0.77 |
Installing libraries
pip install prophet "neuralforecast<1.0.0" "statsforecast[fugue]"
StatsForecast pipeline
from time import time
from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import ETS, SeasonalNaive
from statsforecast.core import StatsForecast
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})
Forecast
With statsforecast you don’t have to download your data. The distributed backend can handle a file with your data.
init = time()
ets_forecasts = backend.forecast(
"s3://m5-benchmarks/data/train/m5-target.parquet",
[ETS(season_length=7, model='ZAA')],
freq="D",
h=28,
).toPandas()
end = time()
print(f'Minutes taken by StatsForecast on a Spark cluster: {(end - init) / 60}')
Evaluating performance
The M5 competition used the weighted root mean squared scaled error. You can find details of the metric here.
Y_hat = ets_forecasts.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])#.drop(columns=['unique_id'])
wrmsse_ets = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
wrmsse_ets
wrmsse | |
---|---|
Total | 0.682358 |
Level1 | 0.449115 |
Level2 | 0.533754 |
Level3 | 0.592317 |
Level4 | 0.497086 |
Level5 | 0.572189 |
Level6 | 0.593880 |
Level7 | 0.665358 |
Level8 | 0.652183 |
Level9 | 0.734492 |
Level10 | 1.012633 |
Level11 | 0.969902 |
Level12 | 0.915380 |
Prophet pipeline
import logging
from time import time
import pandas as pd
from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from prophet import Prophet
from pyspark.sql.types import *
# disable informational messages from prophet
logging.getLogger('py4j').setLevel(logging.ERROR)
Download data
# structure of the training data set
train_schema = StructType([
StructField('unique_id', StringType()),
StructField('ds', DateType()),
StructField('y', DoubleType())
])
# read the training file into a dataframe
train = spark.read.parquet(
's3://m5-benchmarks/data/train/m5-target.parquet',
header=True,
schema=train_schema
)
# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')
sql_statement = '''
SELECT
unique_id AS unique_id,
CAST(ds as date) as ds,
y as y
FROM train
'''
m5_history = (
spark
.sql( sql_statement )
.repartition(sc.defaultParallelism, ['unique_id'])
).cache()
Forecast function using Prophet
def forecast( history_pd: pd.DataFrame ) -> pd.DataFrame:
# TRAIN MODEL AS BEFORE
# --------------------------------------
# remove missing values (more likely at day-store-item level)
history_pd = history_pd.dropna()
# configure the model
model = Prophet(
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# train the model
model.fit( history_pd )
# --------------------------------------
# BUILD FORECAST AS BEFORE
# --------------------------------------
# make predictions
future_pd = model.make_future_dataframe(
periods=28,
freq='d',
include_history=False
)
forecast_pd = model.predict( future_pd )
# --------------------------------------
# ASSEMBLE EXPECTED RESULT SET
# --------------------------------------
# get relevant fields from forecast
forecast_pd['unique_id'] = history_pd['unique_id'].unique()[0]
f_pd = forecast_pd[['unique_id', 'ds','yhat']]
# --------------------------------------
# return expected dataset
return f_pd
result_schema = StructType([
StructField('unique_id', StringType()),
StructField('ds',DateType()),
StructField('yhat',FloatType()),
])
Training Prophet on the M5 dataset
init = time()
results = (
m5_history
.groupBy('unique_id')
.applyInPandas(forecast, schema=result_schema)
).toPandas()
end = time()
print(f'Minutes taken by Prophet on a Spark cluster: {(end - init) / 60}')
Evaluating performance
The M5 competition used the weighted root mean squared scaled error. You can find details of the metric here.
Y_hat = results.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])#.drop(columns=['unique_id'])
wrmsse = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
wrmsse
wrmsse | |
---|---|
Total | 0.771800 |
Level1 | 0.507905 |
Level2 | 0.586328 |
Level3 | 0.666686 |
Level4 | 0.549358 |
Level5 | 0.655003 |
Level6 | 0.647176 |
Level7 | 0.747047 |
Level8 | 0.743422 |
Level9 | 0.824667 |
Level10 | 1.207069 |
Level11 | 1.108780 |
Level12 | 1.018163 |