Installation

As long as Ray is installed and configured, StatsForecast will be able to use it. If executing on a distributed Ray cluster, make use the statsforecast library is installed across all the workers.

StatsForecast on Pandas

Before running on Ray, it’s recommended to test on a smaller Pandas dataset to make sure everything is working. This example also helps show the small differences when using Ray.

from statsforecast.core import StatsForecast
from statsforecast.models import ( 
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series

n_series = 4
horizon = 7

series = generate_series(n_series)

sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)
sf.forecast(df=series, h=horizon).head()
dsAutoETS
unique_id
02000-08-105.261609
02000-08-116.196357
02000-08-120.282309
02000-08-131.264195
02000-08-142.262453

Executing on Ray

To run the forecasts distributed on Ray, just pass in a Ray Dataset instead. Instead of having the unique_id as an index, it needs to be a column because Ray has no index.

import ray
import logging
ray.init(logging_level=logging.ERROR)

series = series.reset_index()
series['unique_id'] = series['unique_id'].astype(str)
ctx = ray.data.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
ray_series = ray.data.from_pandas(series).repartition(4)
sf.forecast(df=ray_series, h=horizon).take(5)
2023-06-17 01:39:08,329 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
Repartition 0:   0%|          | 0/4 [00:00<?, ?it/s]
2023-06-17 01:39:09,554 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
Repartition 0:   0%|          | 0/16 [00:00<?, ?it/s]
2023-06-17 01:39:09,727 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_simple_key)]
MapBatches(add_simple_key) 0:   0%|          | 0/16 [00:00<?, ?it/s]
2023-06-17 01:39:11,134 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]
Sort Sample 0:   0%|          | 0/16 [00:00<?, ?it/s]
Shuffle Map 0:   0%|          | 0/16 [00:00<?, ?it/s]
Shuffle Reduce 0:   0%|          | 0/16 [00:00<?, ?it/s]
MapBatches(group_fn) 0:   0%|          | 0/16 [00:00<?, ?it/s]
[{'unique_id': '0',
  'ds': datetime.datetime(2000, 8, 10, 0, 0),
  'AutoETS': 5.261609077453613},
 {'unique_id': '0',
  'ds': datetime.datetime(2000, 8, 11, 0, 0),
  'AutoETS': 6.196357250213623},
 {'unique_id': '0',
  'ds': datetime.datetime(2000, 8, 12, 0, 0),
  'AutoETS': 0.28230854868888855},
 {'unique_id': '0',
  'ds': datetime.datetime(2000, 8, 13, 0, 0),
  'AutoETS': 1.2641948461532593},
 {'unique_id': '0',
  'ds': datetime.datetime(2000, 8, 14, 0, 0),
  'AutoETS': 2.2624528408050537}]