StatsForecast works on top of Spark, Dask, and Ray through Fugue. StatsForecast will read the input DataFrame and use the corresponding engine. For example, if the input is a Spark DataFrame, StatsForecast will use the existing Spark session to run the forecast.

A benchmark (with older syntax) can be found here where we forecasted one million timeseries in under 15 minutes.

Installation

As long as Spark is installed and configured, StatsForecast will be able to use it. If executing on a distributed Spark cluster, make use the statsforecast library is installed across all the workers.

StatsForecast on Pandas

Before running on Spark, it’s recommended to test on a smaller Pandas dataset to make sure everything is working. This example also helps show the small differences when using Spark.

from statsforecast.core import StatsForecast
from statsforecast.models import ( 
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series

n_series = 4
horizon = 7

series = generate_series(n_series)

sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)
sf.forecast(df=series, h=horizon).head()
dsAutoETS
unique_id
02000-08-105.261609
02000-08-116.196357
02000-08-120.282309
02000-08-131.264195
02000-08-142.262453

Executing on Spark

To run the forecasts distributed on Spark, just pass in a Spark DataFrame instead. Instead of having the unique_id as an index, it needs to be a column because Spark has no index.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
# Make unique_id a column
series = series.reset_index()
series['unique_id'] = series['unique_id'].astype(str)

# Convert to Spark
sdf = spark.createDataFrame(series)

# Returns a Spark DataFrame
sf.forecast(df=sdf, h=horizon, level=[90]).show(5)
+---------+-------------------+----------+
|unique_id|                 ds|   AutoETS|
+---------+-------------------+----------+
|        1|2000-04-07 00:00:00|  4.312628|
|        1|2000-04-08 00:00:00|  5.228625|
|        1|2000-04-09 00:00:00|   6.24151|
|        1|2000-04-10 00:00:00|0.23369633|
|        1|2000-04-11 00:00:00|  1.173954|
+---------+-------------------+----------+
only showing top 5 rows

Helpful Configuration

There are some Spark-specific configurations that may help optimize the workload.

"spark.speculation": "true",
"spark.sql.shuffle.partitions": "8000",
"spark.sql.adaptive.enabled": "false",
"spark.task.cpus": "1"