
TL:DR We will show how you can leverage the distributed power of Spark and the highly efficient code from StatsForecast to fit millions of models in a couple of minutes.Time-series modeling, analysis, and prediction of trends and seasonalities for data collected over time is a rapidly growing category of software applications. Businesses, from electricity and economics to healthcare analytics, collect time-series data daily to predict patterns and build better data-driven product experiences. For example, temperature and humidity prediction is used in manufacturing to prevent defects, streaming metrics predictions help identify music’s popular artists, and sales forecasting for thousands of SKUs across different locations in the supply chain is used to optimize inventory costs. As data generation increases, the forecasting necessities have evolved from modeling a few time series to predicting millions.
Motivation
Nixtla is an open-source project focused on state-of-the-art time series forecasting. They have a couple of libraries such as StatsForecast for statistical models, NeuralForecast for deep learning, and HierarchicalForecast for forecast aggregations across different levels of hierarchies. These are production-ready time series libraries focused on different modeling techniques. This article looks at StatsForecast, a lightning-fast forecasting library with statistical and econometrics models. The AutoARIMA model of Nixtla is 20x faster than pmdarima, and the ETS (error, trend, seasonal) models performed 4x faster than statsmodels and are more robust. The benchmarks and code to reproduce can be found here. A huge part of the performance increase is due to using a JIT compiler called numba to achieve high speeds. The faster iteration time means that data scientists can run more experiments and converge to more accurate models faster. It also means that running benchmarks at scale becomes easier. In this article, we are interested in the scalability of the StatsForecast library in fitting models over Spark or Dask using the Fugue library. This combination will allow us to train a huge number of models distributedly over a temporary cluster quickly.Experiment Setup
When dealing with large time series data, users normally have to deal with thousands of logically independent time series (think of telemetry of different users or different product sales). In this case, we can train one big model over all of the series, or we can create one model for each series. Both are valid approaches since the bigger model will pick up trends across the population, while training thousands of models may fit individual series data better.Note Note: to pick up both the micro and macro trends of the time series population in one model, check the Nixtla HierarchicalForecast library, but this is also more computationally expensive and trickier to scale.This article will deal with the scenario where we train a couple of models (AutoARIMA or ETS) per univariate time series. For this setup, we group the full data by time series, and then train each model for each group. The image below illustrates this. The distributed DataFrame can either be a Spark or Dask DataFrame. Nixtla previously released benchmarks with Anyscale on distributing this model training on Ray. The setup and results can be found in this blog. The results are also shown below. It took 2000 cpus to run one million AutoARIMA models in 35 minutes. We’ll compare this against running on Spark.
StatsForecast code
First, we’ll look at the StatsForecast code used to run the AutoARIMA distributedly on Ray. This is a simplified version to run the scenario with a one million time series. It is also updated for the recent StatsForecast v1.0.0 release, so it may look a bit different from the code in the previous benchmarks.Using Fugue to run on Spark and Dask
Fugue is an abstraction layer that ports Python, Pandas, and SQL code to Spark and Dask. The most minimal interface is thetransform()
function. This function takes in
a function and DataFrame, and brings it to Spark or Dask. We can use the
transform()
function to bring StatsForecast execution to Spark.
There are two parts to the code below. First, we have the forecast logic
defined in the forecast_series
function. Some parameters are hardcoded
for simplicity. The most important one is that n_jobs=1
. This is
because Spark or Dask will already serve as the parallelization layer,
and having two stages of parallelism can cause resource deadlocks.
transform()
function is used to apply the
forecast_series()
function on Spark. The first two arguments are the
DataFrame and function to be applied. Output schema is a requirement for
Spark, so we need to pass it in, and the partition argument will take
care of splitting the time series modelling by unique_id
.
This code already works and returns a Spark DataFrame output.
Nixtla’s FugueBackend
Thetransform()
above is a general look at what Fugue can do. In
practice, the Fugue and Nixtla teams collaborated to add a more native
FugueBackend
to the StatsForecast library. Along with it is a utility
forecast()
function to simplify the forecasting interface. Below is an
end-to-end example of running StatsForecast on one million time series.
forecast()
. This function can take either a DataFrame
or file path to the data. If a file path is provided, it will be loaded
with the parallel backend. In this example above, we replaced the file
each time we ran the experiment to generate benchmarks.
Caution
It’s also important to note that we can test locally before running
the forecast()
on full data. All we have to do is not supply
anything for the parallel argument; everything will run on Pandas
sequentially.
Benchmark Results
The benchmark results can be seen below. As of the time of this writing, Dask and Ray made recent releases, so only the Spark metrics are up to date. We will make a follow-up article after running these experiments with the updates.Note Note: The attempt was to use 2000 cpus but we were limited by available compute instances on AWS.The important part here is that AutoARIMA trained one million time series models in less than 15 minutes. The cluster configuration is attached in the appendix. With very few lines of code, we were able to orchestrate the training of these time series models distributedly.