Skip to main content
The FugueBackend class enables distributed computation for StatsForecast using Fugue, which provides a unified interface for Spark, Dask, and Ray backends without requiring code rewrites.

Overview

With FugueBackend, you can:
  • Distribute forecasting and cross-validation across clusters
  • Switch between Spark, Dask, and Ray without changing your code
  • Scale to large datasets with parallel processing
  • Maintain the same API as the standard StatsForecast interface

API Reference

FugueBackend

FugueBackend(engine=None, conf=None, **transform_kwargs)
Bases: ParallelBackend FugueBackend for Distributed Computation. Source code. This class uses Fugue backend capable of distributing computation on Spark, Dask and Ray without any rewrites. Parameters:
NameTypeDescriptionDefault
engineExecutionEngineA selection between Spark, Dask, and Ray.None
confConfigEngine configuration.None
**transform_kwargsAnyAdditional kwargs for Fugue’s transform method.

FugueBackend.forecast

forecast(
    *,
    df,
    freq,
    models,
    fallback_model,
    X_df,
    h,
    level,
    fitted,
    prediction_intervals,
    id_col,
    time_col,
    target_col
)
Memory Efficient core.StatsForecast predictions with FugueBackend. This method uses Fugue’s transform function, in combination with core.StatsForecast’s forecast to efficiently fit a list of StatsForecast models. Parameters:
NameTypeDescriptionDefault
dfDataFrameInput DataFrame containing time series data. Must have columns for series identifiers, timestamps, and target values. Can optionally include exogenous features.required
freqstr or intFrequency of the time series data. Must be a valid pandas or polars offset alias (e.g., ‘D’ for daily, ‘M’ for monthly, ‘H’ for hourly), or an integer representing the number of observations per cycle.required
modelsList[Any]List of instantiated StatsForecast model objects. Each model should implement the forecast interface. Models must have unique names, which can be set using the alias parameter.required
fallback_modelAnyModel to use when a primary model fails during fitting or forecasting. Only works with the forecast and cross_validation methods. If None, exceptions from failing models will be raised.required
X_dfDataFrameDataFrame containing future exogenous variables. Required if any models use exogenous features. Must include future values for all time series and forecast horizon.required
hintForecast horizon, the number of time steps ahead to predict.required
levelList[float]Confidence levels between 0 and 100 for prediction intervals (e.g., [80, 95] for 80% and 95% intervals).required
fittedboolIf True, stores in-sample (fitted) predictions which can be retrieved using forecast_fitted_values().required
prediction_intervalsConformalIntervalsConfiguration for calibrating prediction intervals using Conformal Prediction.required
id_colstrName of the column containing unique identifiers for each time series. Defaults to ‘unique_id’.required
time_colstrName of the column containing timestamps or time indices. Values can be timestamps (datetime) or integers. Defaults to ‘ds’.required
target_colstrName of the column containing the target variable to forecast. Defaults to ‘y’.required
Returns:
TypeDescription
Anypandas.DataFrame: DataFrame with models columns for point predictions and probabilistic predictions for all fitted models

FugueBackend.cross_validation

cross_validation(
    *,
    df,
    freq,
    models,
    fallback_model,
    h,
    n_windows,
    step_size,
    test_size,
    input_size,
    level,
    refit,
    fitted,
    prediction_intervals,
    id_col,
    time_col,
    target_col
)
Temporal Cross-Validation with core.StatsForecast and FugueBackend. This method uses Fugue’s transform function, in combination with core.StatsForecast’s cross-validation to efficiently fit a list of StatsForecast models through multiple training windows, in either chained or rolled manner. StatsForecast.models’ speed along with Fugue’s distributed computation allow to overcome this evaluation technique high computational costs. Temporal cross-validation provides better model’s generalization measurements by increasing the test’s length and diversity. Parameters:
NameTypeDescriptionDefault
dfDataFrameInput DataFrame containing time series data with columns for series identifiers, timestamps, and target values.required
freqstr or intFrequency of the time series data. Must be a valid pandas or polars offset alias (e.g., ‘D’ for daily, ‘M’ for monthly, ‘H’ for hourly), or an integer representing the number of observations per cycle.required
modelsList[Any]List of instantiated StatsForecast model objects. Each model should implement the forecast interface. Models must have unique names, which can be set using the alias parameter.required
fallback_modelAnyModel to use when a primary model fails during fitting or forecasting. Only works with the forecast and cross_validation methods. If None, exceptions from failing models will be raised.required
hintForecast horizon for each validation window.required
n_windowsintNumber of validation windows to create. Cannot be specified together with test_size.required
step_sizeintNumber of time steps between consecutive validation windows. Smaller values create overlapping windows.required
test_sizeintTotal size of the test period. If provided, n_windows is computed automatically. Overrides n_windows if specified.required
input_sizeintMaximum number of training observations to use for each window. If None, uses expanding windows with all available history. If specified, uses rolling windows of fixed size.required
levelList[float]Confidence levels between 0 and 100 for prediction intervals (e.g., [80, 95]).required
refitbool or intControls model refitting frequency. If True, refits models for every window. If False, fits once and uses the forward method. If an integer n, refits every n windows. Models must implement the forward method when refit is not True.required
fittedboolIf True, stores in-sample predictions for each window, accessible via cross_validation_fitted_values().required
prediction_intervalsConformalIntervalsConfiguration for calibrating prediction intervals using Conformal Prediction. Requires level to be specified.required
id_colstrName of the column containing unique identifiers for each time series. Defaults to ‘unique_id’.required
time_colstrName of the column containing timestamps or time indices. Defaults to ‘ds’.required
target_colstrName of the column containing the target variable. Defaults to ‘y’.required
Returns:
TypeDescription
Anypandas.DataFrame: DataFrame, with models columns for point predictions and probabilistic predictions for all fitted models.

Quick Start

Basic Usage with Spark

from statsforecast.core import StatsForecast
from statsforecast.models import AutoARIMA, AutoETS
from statsforecast.utils import generate_series
from pyspark.sql import SparkSession

# Generate example data
n_series = 4
horizon = 7
series = generate_series(n_series)

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Convert unique_id to string and create Spark DataFrame
series['unique_id'] = series['unique_id'].astype(str)
sdf = spark.createDataFrame(series)

# Use StatsForecast with Spark DataFrame (automatically uses FugueBackend)
sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)

# Returns a Spark DataFrame
results = sf.cross_validation(
    df=sdf,
    h=horizon,
    step_size=24,
    n_windows=2,
    level=[90]
)
results.show()

Basic Forecasting

from statsforecast import StatsForecast
from statsforecast.models import AutoETS
from statsforecast.utils import generate_series

# Generate data
series = generate_series(n_series=4)

# Standard usage (pandas/polars)
sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)

# Forecast with pandas DataFrame
sf.cross_validation(
    df=series,
    h=7,
    step_size=24,
    n_windows=2,
    level=[90]
).head()

Dask Distributed Example

Here’s a complete example using Dask for distributed predictions:
import dask.dataframe as dd
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast import StatsForecast
from statsforecast.models import Naive
from statsforecast.utils import generate_series

# Generate synthetic panel data
df = generate_series(10)
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)

# Instantiate Dask client and execution engine
dask_client = Client()
engine = DaskExecutionEngine(dask_client=dask_client)

# Create StatsForecast instance
sf = StatsForecast(models=[Naive()], freq='D')

Distributed Forecast

The FugueBackend automatically handles distributed forecasting when you pass a Dask/Spark/Ray DataFrame:
# Distributed predictions
forecast_df = sf.forecast(df=df, h=12).compute()

# With fitted values
sf = StatsForecast(models=[Naive()], freq='D')
forecast_df = sf.forecast(df=df, h=12, fitted=True).compute()
fitted_df = sf.forecast_fitted_values().compute()

Distributed Cross-Validation

Perform distributed temporal cross-validation across your cluster:
# Distributed cross-validation
cv_results = sf.cross_validation(
    df=df,
    h=12,
    n_windows=2
).compute()

How It Works

  1. Automatic Detection: When you pass a Spark, Dask, or Ray DataFrame to StatsForecast methods, the FugueBackend is automatically used.
  2. Data Partitioning: Data is partitioned by unique_id, allowing parallel processing across different time series.
  3. Distributed Execution: Each partition is processed independently using the standard StatsForecast logic.
  4. Result Aggregation: Results are collected and returned in the same format as the input (Spark/Dask/Ray DataFrame).

Supported Backends

  • Apache Spark: For large-scale distributed processing
  • Dask: For flexible distributed computing with Python
  • Ray: For modern distributed machine learning workloads

Notes

  • Ensure your cluster has sufficient resources for the number of time series and models
  • The unique_id column should be string type for distributed operations
  • Use .compute() on Dask DataFrames to materialize results
  • Use .show() or .collect() on Spark DataFrames to view results

See Also