As long as Ray is installed and configured, StatsForecast will be able
to use it. If executing on a distributed Ray cluster, make use the
statsforecast
library is installed across all the workers.
Before running on Ray, it’s recommended to test on a smaller Pandas
dataset to make sure everything is working. This example also helps show
the small differences when using Ray.
from statsforecast.core import StatsForecast
from statsforecast.models import (
AutoARIMA,
AutoETS,
)
from statsforecast.utils import generate_series
n_series = 4
horizon = 7
series = generate_series(n_series)
sf = StatsForecast(
models=[AutoETS(season_length=7)],
freq='D',
)
sf.forecast(df=series, h=horizon).head()
To run the forecasts distributed on Ray, just pass in a Ray Dataset
instead. Instead of having the unique_id
as an index, it needs to be a
column because Ray has no index.
import ray
import logging
ray.init(logging_level=logging.ERROR)
series = series.reset_index()
series['unique_id'] = series['unique_id'].astype(str)
ctx = ray.data.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
ray_series = ray.data.from_pandas(series).repartition(4)
sf.forecast(df=ray_series, h=horizon).take(5)
2023-06-17 01:39:08,329 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
Repartition 0: 0%| | 0/4 [00:00<?, ?it/s]
2023-06-17 01:39:09,554 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
Repartition 0: 0%| | 0/16 [00:00<?, ?it/s]
2023-06-17 01:39:09,727 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_simple_key)]
MapBatches(add_simple_key) 0: 0%| | 0/16 [00:00<?, ?it/s]
2023-06-17 01:39:11,134 INFO bulk_executor.py:42 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]
Sort Sample 0: 0%| | 0/16 [00:00<?, ?it/s]
Shuffle Map 0: 0%| | 0/16 [00:00<?, ?it/s]
Shuffle Reduce 0: 0%| | 0/16 [00:00<?, ?it/s]
MapBatches(group_fn) 0: 0%| | 0/16 [00:00<?, ?it/s]
[{'unique_id': '0',
'ds': datetime.datetime(2000, 8, 10, 0, 0),
'AutoETS': 5.261609077453613},
{'unique_id': '0',
'ds': datetime.datetime(2000, 8, 11, 0, 0),
'AutoETS': 6.196357250213623},
{'unique_id': '0',
'ds': datetime.datetime(2000, 8, 12, 0, 0),
'AutoETS': 0.28230854868888855},
{'unique_id': '0',
'ds': datetime.datetime(2000, 8, 13, 0, 0),
'AutoETS': 1.2641948461532593},
{'unique_id': '0',
'ds': datetime.datetime(2000, 8, 14, 0, 0),
'AutoETS': 2.2624528408050537}]