FugueBackend
import os
from statsforecast.core import StatsForecast
from statsforecast.models import (
AutoARIMA,
AutoETS,
)
from statsforecast.utils import generate_series
os.environ['NIXTLA_ID_AS_COL'] = '1'
n_series = 4
horizon = 7
series = generate_series(n_series)
sf = StatsForecast(
models=[AutoETS(season_length=7)],
freq='D',
)
sf.cross_validation(df=series, h=horizon, step_size = 24,
n_windows = 2, level=[90]).head()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Make unique_id a column
series['unique_id'] = series['unique_id'].astype(str)
# Convert to Spark
sdf = spark.createDataFrame(series)
# Returns a Spark DataFrame
sf = StatsForecast(
models=[AutoETS(season_length=7)],
freq='D',
)
sf.cross_validation(df=sdf, h=horizon, step_size = 24,
n_windows = 2, level=[90]).show()
source
FugueBackend
FugueBackend (engine:Any=None, conf:Any=None, **transform_kwargs:Any)
*FugueBackend for Distributed Computation. Source code.
This class uses Fugue backend capable of distributing computation on Spark, Dask and Ray without any rewrites.*
Type | Default | Details | |
---|---|---|---|
engine | Any | None | A selection between Spark, Dask, and Ray. |
conf | Any | None | Engine configuration. |
transform_kwargs | Any |
source
FugueBackend.forecast
FugueBackend.forecast (df:~AnyDataFrame, freq:Union[str,int], models:List[Any], fallback_model:Optional[Any], X_df:Optional[~AnyDataFrame], h:int, level:Optional[List[int]], fitted:bool, prediction _intervals:Optional[statsforecast.utils.ConformalI ntervals], id_col:str, time_col:str, target_col:str)
*Memory Efficient core.StatsForecast predictions with FugueBackend.
This method uses Fugue’s transform function, in combination with
core.StatsForecast
’s forecast to efficiently fit a list of
StatsForecast models.*
Type | Details | |
---|---|---|
df | AnyDataFrame | DataFrame with ids, times, targets and exogenous. |
freq | Union | Frequency of the data. Must be a valid pandas or polars offset alias, or an integer. |
models | List | List of instantiated objects models.StatsForecast. |
fallback_model | Optional | Any, optional (default=None) Model to be used if a model fails. Only works with the forecast and cross_validation methods. |
X_df | Optional | DataFrame with ids, times and future exogenous. |
h | int | Forecast horizon. |
level | Optional | Confidence levels between 0 and 100 for prediction intervals. |
fitted | bool | Store in-sample predictions. |
prediction_intervals | Optional | Configuration to calibrate prediction intervals (Conformal Prediction). |
id_col | str | Column that identifies each serie. |
time_col | str | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | Column that contains the target. |
Returns | Any | DataFrame with models columns for point predictions and probabilistic predictions for all fitted models |
source
FugueBackend.cross_validation
FugueBackend.cross_validation (df:~AnyDataFrame, freq:Union[str,int], models:List[Any], fallback_model:Optional[Any], h:int, n_windows:int, step_size:int, test_size:int, input_size:int, level:Optional[List[int]], refit:bool, fitted:bool, prediction_intervals:Optional [statsforecast.utils.ConformalIntervals], id_col:str, time_col:str, target_col:str)
*Temporal Cross-Validation with core.StatsForecast and FugueBackend.
This method uses Fugue’s transform function, in combination with
core.StatsForecast
’s cross-validation to efficiently fit a list of
StatsForecast models through multiple training windows, in either
chained or rolled manner.
StatsForecast.models
’ speed along with Fugue’s distributed computation
allow to overcome this evaluation technique high computational costs.
Temporal cross-validation provides better model’s generalization
measurements by increasing the test’s length and diversity.*
Type | Details | |
---|---|---|
df | AnyDataFrame | DataFrame with ids, times, targets and exogenous. |
freq | Union | Frequency of the data. Must be a valid pandas or polars offset alias, or an integer. |
models | List | List of instantiated objects models.StatsForecast. |
fallback_model | Optional | Any, optional (default=None) Model to be used if a model fails. Only works with the forecast and cross_validation methods. |
h | int | Forecast horizon. |
n_windows | int | Number of windows used for cross validation. |
step_size | int | Step size between each window. |
test_size | int | Length of test size. If passed, set n_windows=None . |
input_size | int | Input size for each window, if not none rolled windows. |
level | Optional | Confidence levels between 0 and 100 for prediction intervals. |
refit | bool | Wether or not refit the model for each window. If int, train the models every refit windows. |
fitted | bool | Store in-sample predictions. |
prediction_intervals | Optional | Configuration to calibrate prediction intervals (Conformal Prediction). |
id_col | str | Column that identifies each serie. |
time_col | str | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | Column that contains the target. |
Returns | Any | DataFrame, with models columns for point predictions and probabilistic predictions for all fitted models . |
Dask Distributed Predictions
Here we provide an example for the distribution of the
StatsForecast
predictions using Fugue
to execute the code in a Dask cluster.
To do it we instantiate the
FugueBackend
class with a DaskExecutionEngine
.
import dask.dataframe as dd
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast import StatsForecast
from statsforecast.models import Naive
from statsforecast.utils import generate_series
# Generate Synthetic Panel Data
df = generate_series(10)
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)
# Instantiate FugueBackend with DaskExecutionEngine
dask_client = Client()
engine = DaskExecutionEngine(dask_client=dask_client)
We have simply create the class to the usual
StatsForecast
instantiation.
sf = StatsForecast(models=[Naive()], freq='D')
Distributed Forecast
For extremely fast distributed predictions we use FugueBackend as backend that operates like the original StatsForecast.forecast method.
It receives as input a pandas.DataFrame with columns
[unique_id
,ds
,y
] and exogenous, where the ds
(datestamp)
column should be of a format expected by Pandas. The y
column must be
numeric, and represents the measurement we wish to forecast. And the
unique_id
uniquely identifies the series in the panel data.
# Distributed predictions with FugueBackend.
sf.forecast(df=df, h=12).compute()
sf = StatsForecast(models=[Naive()], freq='D')
xx = sf.forecast(df=df, h=12, fitted=True).compute()
yy = sf.forecast_fitted_values().compute()
Distributed Cross-Validation
For extremely fast distributed temporcal cross-validation we use
cross_validation
method that operates like the original
StatsForecast.cross_validation
method.
# Distributed cross-validation with FugueBackend.
sf.cross_validation(df=df, h=12, n_windows=2).compute()