> ## Documentation Index
> Fetch the complete documentation index at: https://nixtlaverse.nixtla.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Scalable Time Series Modeling with open-source projects

> How to Forecast 1M Time Series in 15 Minutes with Spark, Fugue and Nixtla's Statsforecast.

<figure>
  <a href="https://github.com/nixtla/statsforecast">
    <img src="https://img.shields.io/github/stars/nixtla/statsforecast?label=GitHub%20Stars&style=social.png" />
  </a>
</figure>

By [Fugue](https://github.com/fugue-project/) and
[Nixtla](https://github.com/nixtla/). Originally posted on
[TDS](https://medium.com/towards-data-science/distributed-forecast-of-1m-time-series-in-under-15-minutes-with-spark-nixtla-and-fugue-e9892da6fd5c).

> **TL:DR We will show how you can leverage the distributed power of
> Spark and the highly efficient code from StatsForecast to fit millions
> of models in a couple of minutes.**

Time-series modeling, analysis, and prediction of trends and
seasonalities for data collected over time is a rapidly growing category
of software applications.

Businesses, from electricity and economics to healthcare analytics,
collect time-series data daily to predict patterns and build better
data-driven product experiences. For example, temperature and humidity
prediction is used in manufacturing to prevent defects, streaming
metrics predictions help identify music’s popular artists, and sales
forecasting for thousands of SKUs across different locations in the
supply chain is used to optimize inventory costs. As data generation
increases, the forecasting necessities have evolved from modeling a few
time series to predicting millions.

## Motivation

[Nixtla](https://github.com/nixtla) is an open-source project focused on
state-of-the-art time series forecasting. They have a couple of
libraries such as
[StatsForecast](https://github.com/nixtla/statsforecast) for statistical
models, [NeuralForecast](https://github.com/nixtla/neuralforecast) for
deep learning, and
[HierarchicalForecast](https://github.com/nixtla/hierarchicalforecast)
for forecast aggregations across different levels of hierarchies. These
are production-ready time series libraries focused on different modeling
techniques.

This article looks at
[StatsForecast](https://github.com/nixtla/statsforecast), a
lightning-fast forecasting library with statistical and econometrics
models. The AutoARIMA model of Nixtla is 20x faster than
[pmdarima](http://alkaline-ml.com/pmdarima/), and the ETS (error, trend,
seasonal) models performed 4x faster than
[statsmodels](https://github.com/statsmodels/statsmodels) and are more
robust. The benchmarks and code to reproduce can be found
[here](https://github.com/Nixtla/statsforecast#-accuracy---speed). A
huge part of the performance increase is due to using a JIT compiler
called [numba](https://numba.pydata.org/) to achieve high speeds.

The faster iteration time means that data scientists can run more
experiments and converge to more accurate models faster. It also means
that running benchmarks at scale becomes easier.

In this article, we are interested in the scalability of the
StatsForecast library in fitting models over
[Spark](https://spark.apache.org/docs/latest/api/python/index.html) or
[Dask](https://github.com/dask/dask) using the
[Fugue](https://github.com/fugue-project/fugue/) library. This
combination will allow us to train a huge number of models distributedly
over a temporary cluster quickly.

## Experiment Setup

When dealing with large time series data, users normally have to deal
with thousands of logically independent time series (think of telemetry
of different users or different product sales). In this case, we can
train one big model over all of the series, or we can create one model
for each series. Both are valid approaches since the bigger model will
pick up trends across the population, while training thousands of models
may fit individual series data better.

> **Note**
>
> Note: to pick up both the micro and macro trends of the time series
> population in one model, check the Nixtla
> [HierarchicalForecast](https://github.com/Nixtla/hierarchicalforecast)
> library, but this is also more computationally expensive and trickier
> to scale.

This article will deal with the scenario where we train a couple of
models (AutoARIMA or ETS) per univariate time series. For this setup, we
group the full data by time series, and then train each model for each
group. The image below illustrates this. The distributed DataFrame can
either be a Spark or Dask DataFrame.

<figure>
  <img src="https://miro.medium.com/max/1400/0*HbHd-D8XmtN5F2bI.png" alt="AutoARIMA per partition" />

  <figcaption aria-hidden="true">AutoARIMA per partition</figcaption>
</figure>

Nixtla previously released benchmarks with
[Anyscale](https://www.anyscale.com/) on distributing this model
training on Ray. The setup and results can be found in this
[blog](https://www.anyscale.com/blog/how-nixtla-uses-ray-to-accurately-predict-more-than-a-million-time-series).
The results are also shown below. It took 2000 cpus to run one million
AutoARIMA models in 35 minutes. We’ll compare this against running on
Spark.

<figure>
  <img src="https://miro.medium.com/max/1400/0*bnlD5NAslUxfTniv.png" alt="StatsForecast on Ray results" />

  <figcaption aria-hidden="true">StatsForecast on Ray results</figcaption>
</figure>

## StatsForecast code

First, we’ll look at the StatsForecast code used to run the AutoARIMA
distributedly on [Ray](https://docs.ray.io/en/latest/index.html). This
is a simplified version to run the scenario with a one million time
series. It is also updated for the recent StatsForecast v1.0.0 release,
so it may look a bit different from the code in the previous benchmarks.

```python theme={null}
from time import time

import pandas as pd
from statsforecast.utils import generate_series
from statsforecast.models import AutoARIMA
from statsforecast.core import StatsForecast

series = generate_series(n_series=1000000, seed=1)

model = StatsForecast(df=series,
                      models=[AutoARIMA()],
                      freq='D',
                      n_jobs=-1,
              ray_address=ray_address)

init = time()
forecasts = model.forecast(7)
print(f'n_series: 1000000 total time: {(time() - init) / 60}')
```

The interface of StatsForecast is very minimal. It is already designed
to perform the AutoARIMA on each group of data. Just supplying the
ray\_address will make this code snippet run distributedly. Without it,
n\_jobswill indicate the number of parallel processes for forecasting.
model.forecast() will do the fit and predict in one step, and the input
to this method in the time horizon to forecast.

## Using Fugue to run on Spark and Dask

[Fugue](https://github.com/fugue-project/fugue) is an abstraction layer
that ports Python, Pandas, and SQL code to Spark and Dask. The most
minimal interface is the `transform()` function. This function takes in
a function and DataFrame, and brings it to Spark or Dask. We can use the
`transform()` function to bring StatsForecast execution to Spark.

There are two parts to the code below. First, we have the forecast logic
defined in the `forecast_series` function. Some parameters are hardcoded
for simplicity. The most important one is that `n_jobs=1`. This is
because Spark or Dask will already serve as the parallelization layer,
and having two stages of parallelism can cause resource deadlocks.

```python theme={null}
from fugue import transform

def forecast_series(df: pd.DataFrame, models) -> pd.DataFrame:
    tdf = df.set_index("unique_id")
    model = StatsForecast(df=tdf, models=models, freq='D', n_jobs=1)
    return model.forecast(7).reset_index()

transform(series.reset_index(),
          forecast_series,
          params=dict(models=[AutoARIMA()]),
          schema="unique_id:int, ds:date, AutoARIMA:float",
          partition={"by": "unique_id"},
          engine="spark"
          ).show()
```

Second, the `transform()` function is used to apply the
`forecast_series()` function on Spark. The first two arguments are the
DataFrame and function to be applied. Output schema is a requirement for
Spark, so we need to pass it in, and the partition argument will take
care of splitting the time series modelling by `unique_id`.

This code already works and returns a Spark DataFrame output.

## Nixtla’s FugueBackend

The `transform()` above is a general look at what Fugue can do. In
practice, the Fugue and Nixtla teams collaborated to add a more native
`FugueBackend` to the StatsForecast library. Along with it is a utility
`forecast()` function to simplify the forecasting interface. Below is an
end-to-end example of running StatsForecast on one million time series.

```python theme={null}
from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import AutoARIMA
from statsforecast.core import StatsForecast

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})

forecast(spark.read.parquet("/tmp/1m.parquet"),
         [AutoARIMA()],
         freq="D",
         h=7,
         parallel=backend).toPandas()
```

We just need to create the FugueBackend, which takes in a SparkSession
and passes it to `forecast()`. This function can take either a DataFrame
or file path to the data. If a file path is provided, it will be loaded
with the parallel backend. In this example above, we replaced the file
each time we ran the experiment to generate benchmarks.

> **Caution**
>
> It’s also important to note that we can test locally before running
> the `forecast()` on full data. All we have to do is not supply
> anything for the parallel argument; everything will run on Pandas
> sequentially.

## Benchmark Results

The benchmark results can be seen below. As of the time of this writing,
Dask and Ray made recent releases, so only the Spark metrics are up to
date. We will make a follow-up article after running these experiments
with the updates.

<figure>
  <img src="https://miro.medium.com/max/1400/0*2ovS-D5XHQcVQobK.png" alt="Spark and Dask benchmarks for StatsForecast at scale" />

  <figcaption aria-hidden="true">Spark and Dask benchmarks for
  StatsForecast at scale</figcaption>
</figure>

> **Note**
>
> Note: The attempt was to use 2000 cpus but we were limited by
> available compute instances on AWS.

The important part here is that AutoARIMA trained one million time
series models in less than 15 minutes. The cluster configuration is
attached in the appendix. With very few lines of code, we were able to
orchestrate the training of these time series models distributedly.

## Conclusion

Training thousands of time series models distributedly normally takes a
lot of coding with Spark and Dask, but we were able to run these
experiments with very few lines of code. Nixtla’s StatsForecast offers
the ability to quickly utilize all of the compute resources available to
find the best model for each time series. All users need to do is supply
a relevant parallel backend (Ray or Fugue) to run on a cluster.

On the scale of one million timeseries, our total training time took 12
minutes for AutoARIMA. This is the equivalent of close to 400 cpu-hours
that we ran immediately, allowing data scientists to quickly iterate at
scale without having to write the explicit code for parallelization.
Because we used an ephemeral cluster, the cost is effectively the same
as running this sequentially on an EC2 instance (parallelized over all
cores).

## Resources

1. [Nixtla StatsForecast repo](https://github.com/Nixtla/statsforecast)
2. [StatsForecast docs](../../index)
3. [Fugue repo](https://github.com/fugue-project/fugue/)
4. [Fugue tutorials](https://fugue-tutorials.readthedocs.io/)

To chat with us:

1. [Fugue Slack](http://slack.fugue.ai/)
2. [Nixtla
   Slack](https://join.slack.com/t/nixtlaworkspace/shared_invite/zt-135dssye9-fWTzMpv2WBthq8NK0Yvu6A)

## Appendix

For anyone. interested in the cluster configuration, it can be seen
below. This will spin up a Databricks cluster. The important thing is
the node\_type\_id that has the machines used.

```text theme={null}
{
    "num_workers": 20,
    "cluster_name": "fugue-nixtla-2",
    "spark_version": "10.4.x-scala2.12",
    "spark_conf": {
        "spark.speculation": "true",
        "spark.sql.shuffle.partitions": "8000",
        "spark.sql.adaptive.enabled": "false",
        "spark.task.cpus": "1"
    },
    "aws_attributes": {
        "first_on_demand": 1,
        "availability": "SPOT_WITH_FALLBACK",
        "zone_id": "us-west-2c",
        "spot_bid_price_percent": 100,
        "ebs_volume_type": "GENERAL_PURPOSE_SSD",
        "ebs_volume_count": 1,
        "ebs_volume_size": 32
    },
    "node_type_id": "m5.24xlarge",
    "driver_node_type_id": "m5.2xlarge",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": {
        "MKL_NUM_THREADS": "1",
        "OPENBLAS_NUM_THREADS": "1",
        "VECLIB_MAXIMUM_THREADS": "1",
        "OMP_NUM_THREADS": "1",
        "NUMEXPR_NUM_THREADS": "1"
    },
    "autotermination_minutes": 20,
    "enable_elastic_disk": false,
    "cluster_source": "UI",
    "init_scripts": [],
    "runtime_engine": "STANDARD",
    "cluster_id": "0728-004950-oefym0ss"
}
```
