The purpose of this notebook is to create a scalability benchmark (time and performance). To that end, Nixtla’s StatsForecast (using the ETS model) is trained on the M5 dataset using spark to distribute the training. As a comparison, Facebook’s Prophet model is used.

An AWS cluster (mounted on databricks) of 11 instances of type m5.2xlarge (8 cores, 32 GB RAM) with runtime 10.4 LTS was used. This notebook was used as base case.

The example uses the M5 dataset. It consists of 30,490 bottom time series.

Main results

MethodTime (mins)Performance (wRMSSE)
StatsForecast7.50.68
Prophet18.230.77

Installing libraries

pip install prophet "neuralforecast<1.0.0" "statsforecast[fugue]"

StatsForecast pipeline

from time import time

from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import ETS, SeasonalNaive
from statsforecast.core import StatsForecast

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})

Forecast

With statsforecast you don’t have to download your data. The distributed backend can handle a file with your data.

init = time()
ets_forecasts = backend.forecast(
    "s3://m5-benchmarks/data/train/m5-target.parquet", 
    [ETS(season_length=7, model='ZAA')], 
    freq="D", 
    h=28, 
).toPandas()
end = time()
print(f'Minutes taken by StatsForecast on a Spark cluster: {(end - init) / 60}')

Evaluating performance

The M5 competition used the weighted root mean squared scaled error. You can find details of the metric here.

Y_hat = ets_forecasts.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])#.drop(columns=['unique_id'])
wrmsse_ets = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
wrmsse_ets
wrmsse
Total0.682358
Level10.449115
Level20.533754
Level30.592317
Level40.497086
Level50.572189
Level60.593880
Level70.665358
Level80.652183
Level90.734492
Level101.012633
Level110.969902
Level120.915380

Prophet pipeline

import logging
from time import time

import pandas as pd
from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from prophet import Prophet
from pyspark.sql.types import *

# disable informational messages from prophet
logging.getLogger('py4j').setLevel(logging.ERROR)

Download data

# structure of the training data set
train_schema = StructType([
  StructField('unique_id', StringType()),  
  StructField('ds', DateType()),
  StructField('y', DoubleType())
  ])
 
# read the training file into a dataframe
train = spark.read.parquet(
  's3://m5-benchmarks/data/train/m5-target.parquet', 
  header=True, 
  schema=train_schema
 )
 
# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')
sql_statement = '''
  SELECT
    unique_id AS unique_id,
    CAST(ds as date) as ds,
    y as y
  FROM train
  '''
 
m5_history = (
  spark
    .sql( sql_statement )
    .repartition(sc.defaultParallelism, ['unique_id'])
  ).cache()

Forecast function using Prophet

def forecast( history_pd: pd.DataFrame ) -> pd.DataFrame:
  
  # TRAIN MODEL AS BEFORE
  # --------------------------------------
  # remove missing values (more likely at day-store-item level)
    history_pd = history_pd.dropna()

    # configure the model
    model = Prophet(
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    # train the model
    model.fit( history_pd )
    # --------------------------------------

    # BUILD FORECAST AS BEFORE
    # --------------------------------------
    # make predictions
    future_pd = model.make_future_dataframe(
        periods=28, 
        freq='d', 
        include_history=False
    )
    forecast_pd = model.predict( future_pd )  
    # --------------------------------------

    # ASSEMBLE EXPECTED RESULT SET
    # --------------------------------------
    # get relevant fields from forecast
    forecast_pd['unique_id'] = history_pd['unique_id'].unique()[0]
    f_pd = forecast_pd[['unique_id', 'ds','yhat']]
    # --------------------------------------

    # return expected dataset
    return f_pd
result_schema = StructType([
  StructField('unique_id', StringType()), 
  StructField('ds',DateType()),
  StructField('yhat',FloatType()),
])

Training Prophet on the M5 dataset

init = time()
results = (
  m5_history
    .groupBy('unique_id')
      .applyInPandas(forecast, schema=result_schema)
    ).toPandas()
end = time()
print(f'Minutes taken by Prophet on a Spark cluster: {(end - init) / 60}')

Evaluating performance

The M5 competition used the weighted root mean squared scaled error. You can find details of the metric here.

Y_hat = results.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])#.drop(columns=['unique_id'])
wrmsse = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
wrmsse
wrmsse
Total0.771800
Level10.507905
Level20.586328
Level30.666686
Level40.549358
Level50.655003
Level60.647176
Level70.747047
Level80.743422
Level90.824667
Level101.207069
Level111.108780
Level121.018163