> ## Documentation Index
> Fetch the complete documentation index at: https://nixtlaverse.nixtla.io/llms.txt
> Use this file to discover all available pages before exploring further.

# StatsForecast ETS and Facebook Prophet on Spark (M5)

> This notebook was originally executed using DataBricks

The purpose of this notebook is to create a scalability benchmark (time
and performance). To that end, Nixtla’s
[StatsForecast](https://github.com/Nixtla/statsforecast) (using the ETS
model) is trained on the M5 dataset using spark to distribute the
training. As a comparison, Facebook’s
[Prophet](https://github.com/facebook/prophet) model is used.

An AWS cluster (mounted on databricks) of 11 instances of type
m5.2xlarge (8 cores, 32 GB RAM) with runtime 10.4 LTS was used.
[This](https://d1r5llqwmkrl74.cloudfront.net/notebooks/RCG/Fine_Grained_Demand_Forecasting/index.html#Fine_Grained_Demand_Forecasting_1.html)
notebook was used as base case.

The example uses the [M5
dataset](https://github.com/Mcompetitions/M5-methods/blob/master/M5-Competitors-Guide.pdf).
It consists of `30,490` bottom time series.

## Main results

| Method        | Time (mins) | Performance (wRMSSE) |
| ------------- | ----------: | -------------------: |
| StatsForecast |         7.5 |                 0.68 |
| Prophet       |       18.23 |                 0.77 |

## Installing libraries

```python theme={null}
pip install prophet "neuralforecast<1.0.0" "statsforecast[fugue]"
```

## StatsForecast pipeline

```python theme={null}
from time import time

from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import ETS, SeasonalNaive
from statsforecast.core import StatsForecast

from pyspark.sql import SparkSession
```

```python theme={null}
spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})
```

### Forecast

With statsforecast you don’t have to download your data. The distributed
backend can handle a file with your data.

```python theme={null}
init = time()
ets_forecasts = backend.forecast(
    "s3://m5-benchmarks/data/train/m5-target.parquet", 
    [ETS(season_length=7, model='ZAA')], 
    freq="D", 
    h=28, 
).toPandas()
end = time()
print(f'Minutes taken by StatsForecast on a Spark cluster: {(end - init) / 60}')
```

### Evaluating performance

The M5 competition used the weighted root mean squared scaled error. You
can find details of the metric
[here](https://github.com/Mcompetitions/M5-methods/blob/master/M5-Competitors-Guide.pdf).

```python theme={null}
Y_hat = ets_forecasts.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
```

```python theme={null}
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])#.drop(columns=['unique_id'])
```

```python theme={null}
wrmsse_ets = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
```

```python theme={null}
wrmsse_ets
```

|         | wrmsse   |
| ------- | -------- |
| Total   | 0.682358 |
| Level1  | 0.449115 |
| Level2  | 0.533754 |
| Level3  | 0.592317 |
| Level4  | 0.497086 |
| Level5  | 0.572189 |
| Level6  | 0.593880 |
| Level7  | 0.665358 |
| Level8  | 0.652183 |
| Level9  | 0.734492 |
| Level10 | 1.012633 |
| Level11 | 0.969902 |
| Level12 | 0.915380 |

## Prophet pipeline

```python theme={null}
import logging
from time import time

import pandas as pd
from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from prophet import Prophet
from pyspark.sql.types import *

# disable informational messages from prophet
logging.getLogger('py4j').setLevel(logging.ERROR)
```

### Download data

```python theme={null}
# structure of the training data set
train_schema = StructType([
  StructField('unique_id', StringType()),  
  StructField('ds', DateType()),
  StructField('y', DoubleType())
  ])
 
# read the training file into a dataframe
train = spark.read.parquet(
  's3://m5-benchmarks/data/train/m5-target.parquet', 
  header=True, 
  schema=train_schema
 )
 
# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')

```

```python theme={null}
sql_statement = '''
  SELECT
    unique_id AS unique_id,
    CAST(ds as date) as ds,
    y as y
  FROM train
  '''
 
m5_history = (
  spark
    .sql( sql_statement )
    .repartition(sc.defaultParallelism, ['unique_id'])
  ).cache()
```

### Forecast function using Prophet

```python theme={null}
def forecast( history_pd: pd.DataFrame ) -> pd.DataFrame:
  
  # TRAIN MODEL AS BEFORE
  # --------------------------------------
  # remove missing values (more likely at day-store-item level)
    history_pd = history_pd.dropna()

    # configure the model
    model = Prophet(
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    # train the model
    model.fit( history_pd )
    # --------------------------------------

    # BUILD FORECAST AS BEFORE
    # --------------------------------------
    # make predictions
    future_pd = model.make_future_dataframe(
        periods=28, 
        freq='d', 
        include_history=False
    )
    forecast_pd = model.predict( future_pd )  
    # --------------------------------------

    # ASSEMBLE EXPECTED RESULT SET
    # --------------------------------------
    # get relevant fields from forecast
    forecast_pd['unique_id'] = history_pd['unique_id'].unique()[0]
    f_pd = forecast_pd[['unique_id', 'ds','yhat']]
    # --------------------------------------

    # return expected dataset
    return f_pd  
```

```python theme={null}
result_schema = StructType([
  StructField('unique_id', StringType()), 
  StructField('ds',DateType()),
  StructField('yhat',FloatType()),
])
```

#### Training Prophet on the M5 dataset

```python theme={null}
init = time()
results = (
  m5_history
    .groupBy('unique_id')
      .applyInPandas(forecast, schema=result_schema)
    ).toPandas()
end = time()
print(f'Minutes taken by Prophet on a Spark cluster: {(end - init) / 60}')

```

### Evaluating performance

The M5 competition used the weighted root mean squared scaled error. You
can find details of the metric
[here](https://github.com/Mcompetitions/M5-methods/blob/master/M5-Competitors-Guide.pdf).

```python theme={null}
Y_hat = results.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
```

```python theme={null}
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])#.drop(columns=['unique_id'])
```

```python theme={null}
wrmsse = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
```

```python theme={null}
wrmsse
```

|         | wrmsse   |
| ------- | -------- |
| Total   | 0.771800 |
| Level1  | 0.507905 |
| Level2  | 0.586328 |
| Level3  | 0.666686 |
| Level4  | 0.549358 |
| Level5  | 0.655003 |
| Level6  | 0.647176 |
| Level7  | 0.747047 |
| Level8  | 0.743422 |
| Level9  | 0.824667 |
| Level10 | 1.207069 |
| Level11 | 1.108780 |
| Level12 | 1.018163 |
