import os

from statsforecast.core import StatsForecast
from statsforecast.models import ( 
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series
os.environ['NIXTLA_ID_AS_COL'] = '1'
n_series = 4
horizon = 7

series = generate_series(n_series)

sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)

sf.cross_validation(df=series, h=horizon, step_size = 24,
    n_windows = 2, level=[90]).head()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Make unique_id a column
series['unique_id'] = series['unique_id'].astype(str)

# Convert to Spark
sdf = spark.createDataFrame(series)
# Returns a Spark DataFrame
sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)
sf.cross_validation(df=sdf, h=horizon, step_size = 24,
    n_windows = 2, level=[90]).show()

source

FugueBackend

 FugueBackend (engine:Any=None, conf:Any=None, **transform_kwargs:Any)

*FugueBackend for Distributed Computation. Source code.

This class uses Fugue backend capable of distributing computation on Spark, Dask and Ray without any rewrites.*

TypeDefaultDetails
engineAnyNoneA selection between Spark, Dask, and Ray.
confAnyNoneEngine configuration.
transform_kwargsAny

source

FugueBackend.forecast

 FugueBackend.forecast (df:~AnyDataFrame, freq:Union[str,int],
                        models:List[Any], fallback_model:Optional[Any],
                        X_df:Optional[~AnyDataFrame], h:int,
                        level:Optional[List[int]], fitted:bool, prediction
                        _intervals:Optional[statsforecast.utils.ConformalI
                        ntervals], id_col:str, time_col:str,
                        target_col:str)

*Memory Efficient core.StatsForecast predictions with FugueBackend.

This method uses Fugue’s transform function, in combination with core.StatsForecast’s forecast to efficiently fit a list of StatsForecast models.*

TypeDetails
dfAnyDataFrameDataFrame with ids, times, targets and exogenous.
freqUnionFrequency of the data. Must be a valid pandas or polars offset alias, or an integer.
modelsListList of instantiated objects models.StatsForecast.
fallback_modelOptionalAny, optional (default=None)
Model to be used if a model fails.
Only works with the forecast and cross_validation methods.
X_dfOptionalDataFrame with ids, times and future exogenous.
hintForecast horizon.
levelOptionalConfidence levels between 0 and 100 for prediction intervals.
fittedboolStore in-sample predictions.
prediction_intervalsOptionalConfiguration to calibrate prediction intervals (Conformal Prediction).
id_colstrColumn that identifies each serie.
time_colstrColumn that identifies each timestep, its values can be timestamps or integers.
target_colstrColumn that contains the target.
ReturnsAnyDataFrame with models columns for point predictions and probabilistic predictions for all fitted models

source

FugueBackend.cross_validation

 FugueBackend.cross_validation (df:~AnyDataFrame, freq:Union[str,int],
                                models:List[Any],
                                fallback_model:Optional[Any], h:int,
                                n_windows:int, step_size:int,
                                test_size:int, input_size:int,
                                level:Optional[List[int]], refit:bool,
                                fitted:bool, prediction_intervals:Optional
                                [statsforecast.utils.ConformalIntervals],
                                id_col:str, time_col:str, target_col:str)

*Temporal Cross-Validation with core.StatsForecast and FugueBackend.

This method uses Fugue’s transform function, in combination with core.StatsForecast’s cross-validation to efficiently fit a list of StatsForecast models through multiple training windows, in either chained or rolled manner.

StatsForecast.models’ speed along with Fugue’s distributed computation allow to overcome this evaluation technique high computational costs. Temporal cross-validation provides better model’s generalization measurements by increasing the test’s length and diversity.*

TypeDetails
dfAnyDataFrameDataFrame with ids, times, targets and exogenous.
freqUnionFrequency of the data. Must be a valid pandas or polars offset alias, or an integer.
modelsListList of instantiated objects models.StatsForecast.
fallback_modelOptionalAny, optional (default=None)
Model to be used if a model fails.
Only works with the forecast and cross_validation methods.
hintForecast horizon.
n_windowsintNumber of windows used for cross validation.
step_sizeintStep size between each window.
test_sizeintLength of test size. If passed, set n_windows=None.
input_sizeintInput size for each window, if not none rolled windows.
levelOptionalConfidence levels between 0 and 100 for prediction intervals.
refitboolWether or not refit the model for each window.
If int, train the models every refit windows.
fittedboolStore in-sample predictions.
prediction_intervalsOptionalConfiguration to calibrate prediction intervals (Conformal Prediction).
id_colstrColumn that identifies each serie.
time_colstrColumn that identifies each timestep, its values can be timestamps or integers.
target_colstrColumn that contains the target.
ReturnsAnyDataFrame, with models columns for point predictions and probabilistic predictions for all fitted models.

Dask Distributed Predictions

Here we provide an example for the distribution of the StatsForecast predictions using Fugue to execute the code in a Dask cluster.

To do it we instantiate the FugueBackend class with a DaskExecutionEngine.

import dask.dataframe as dd
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast import StatsForecast
from statsforecast.models import Naive
from statsforecast.utils import generate_series
# Generate Synthetic Panel Data
df = generate_series(10)
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)

# Instantiate FugueBackend with DaskExecutionEngine
dask_client = Client()
engine = DaskExecutionEngine(dask_client=dask_client)

We have simply create the class to the usual StatsForecast instantiation.

sf = StatsForecast(models=[Naive()], freq='D')

Distributed Forecast

For extremely fast distributed predictions we use FugueBackend as backend that operates like the original StatsForecast.forecast method.

It receives as input a pandas.DataFrame with columns [unique_id,ds,y] and exogenous, where the ds (datestamp) column should be of a format expected by Pandas. The y column must be numeric, and represents the measurement we wish to forecast. And the unique_id uniquely identifies the series in the panel data.

# Distributed predictions with FugueBackend.
sf.forecast(df=df, h=12).compute()
sf = StatsForecast(models=[Naive()], freq='D')
xx = sf.forecast(df=df, h=12, fitted=True).compute()
yy = sf.forecast_fitted_values().compute()

Distributed Cross-Validation

For extremely fast distributed temporcal cross-validation we use cross_validation method that operates like the original StatsForecast.cross_validation method.

# Distributed cross-validation with FugueBackend.
sf.cross_validation(df=df, h=12, n_windows=2).compute()