module mlforecast.distributed.forecast

Global Variables

  • DASK_INSTALLED
  • SPARK_INSTALLED
  • RAY_INSTALLED

class WindowInfo

WindowInfo(n_windows, window_size, step_size, i_window, input_size)

class DistributedMLForecast

Multi backend distributed pipeline

method __init__

__init__(
    models,
    freq: Union[int, str],
    lags: Optional[Iterable[int]] = None,
    lag_transforms: Optional[Dict[int, List[Union[Callable, Tuple[Callable, Any]]]]] = None,
    date_features: Optional[Iterable[Union[str, Callable]]] = None,
    num_threads: int = 1,
    target_transforms: Optional[List[Union[BaseTargetTransform, _BaseGroupedArrayTargetTransform]]] = None,
    engine=None,
    num_partitions: Optional[int] = None,
    lag_transforms_namer: Optional[Callable] = None
)
Create distributed forecast object Args:
  • models (regressor or list of regressors): Models that will be trained and used to compute the forecasts.
  • freq (str or int, optional): Pandas offset alias, e.g. ‘D’, ‘W-THU’ or integer denoting the frequency of the series. Defaults to None.
  • lags (list of int, optional): Lags of the target to use as features. Defaults to None.
  • lag_transforms (dict of int to list of functions, optional): Mapping of target lags to their transformations. Defaults to None.
  • date_features (list of str or callable, optional): Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input. Defaults to None.
  • num_threads (int): Number of threads to use when computing the features. Defaults to 1.
  • target_transforms (list of transformers, optional): Transformations that will be applied to the target before computing the features and restored after the forecasting step. Defaults to None.
  • engine (fugue execution engine, optional): Dask Client, Spark Session, etc to use for the distributed computation. If None will infer depending on the input type. Defaults to None.
  • num_partitions (number of data partitions to use, optional): If None, the default partitions provided by the AnyDataFrame used by the fit and cross_validation methods will be used. If a Ray Dataset is provided and num_partitions is None, the partitioning will be done by the id_col. Defaults to None.
  • lag_transforms_namer (callable, optional): Function that takes a transformation (either function or class), a lag and extra arguments and produces a name. Defaults to None.

method cross_validation

cross_validation(
    df: ~AnyDataFrame,
    n_windows: int,
    h: int,
    id_col: str = 'unique_id',
    time_col: str = 'ds',
    target_col: str = 'y',
    step_size: Optional[int] = None,
    static_features: Optional[List[str]] = None,
    dropna: bool = True,
    keep_last_n: Optional[int] = None,
    refit: bool = True,
    before_predict_callback: Optional[Callable] = None,
    after_predict_callback: Optional[Callable] = None,
    input_size: Optional[int] = None
) → ~AnyDataFrame
Perform time series cross validation. Creates n_windows splits where each window has h test periods, trains the models, computes the predictions and merges the actuals. Args:
  • df (dask, spark or ray DataFrame): Series data in long format.
  • n_windows (int): Number of windows to evaluate.
  • h (int): Number of test periods in each window.
  • id_col (str): Column that identifies each serie. Defaults to ‘unique_id’.
  • time_col (str): Column that identifies each timestep, its values can be timestamps or integers. Defaults to ‘ds’.
  • target_col (str): Column that contains the target. Defaults to ‘y’.
  • step_size (int, optional): Step size between each cross validation window. If None it will be equal to h. Defaults to None.
  • static_features (list of str, optional): Names of the features that are static and will be repeated when forecasting. Defaults to None.
  • dropna (bool): Drop rows with missing values produced by the transformations. Defaults to True.
  • keep_last_n (int, optional): Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. Defaults to None.
  • refit (bool): Retrain model for each cross validation window. If False, the models are trained at the beginning and then used to predict each window. Defaults to True.
  • before_predict_callback (callable, optional): Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. Defaults to None.
  • after_predict_callback (callable, optional): Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. Defaults to None.
  • input_size (int, optional): Maximum training samples per serie in each window. If None, will use an expanding window. Defaults to None.
Returns:
  • (dask, spark or ray DataFrame): Predictions for each window with the series id, timestamp, target value and predictions from each model.

method fit

fit(
    df: ~AnyDataFrame,
    id_col: str = 'unique_id',
    time_col: str = 'ds',
    target_col: str = 'y',
    static_features: Optional[List[str]] = None,
    dropna: bool = True,
    keep_last_n: Optional[int] = None
) → DistributedMLForecast
Apply the feature engineering and train the models. Args:
  • df (dask, spark or ray DataFrame): Series data in long format.
  • id_col (str): Column that identifies each serie. Defaults to ‘unique_id’.
  • time_col (str): Column that identifies each timestep, its values can be timestamps or integers. Defaults to ‘ds’.
  • target_col (str): Column that contains the target. Defaults to ‘y’.
  • static_features (list of str, optional): Names of the features that are static and will be repeated when forecasting. Defaults to None.
  • dropna (bool): Drop rows with missing values produced by the transformations. Defaults to True.
  • keep_last_n (int, optional): Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. Defaults to None.
Returns:
  • (DistributedMLForecast): Forecast object with series values and trained models.

method load

load(path: str, engine) → DistributedMLForecast
Load forecast object Args:
  • path (str): Directory with saved artifacts.
  • engine (fugue execution engine): Dask Client, Spark Session, etc to use for the distributed computation.

method predict

predict(
    h: int,
    before_predict_callback: Optional[Callable] = None,
    after_predict_callback: Optional[Callable] = None,
    X_df: Optional[DataFrame] = None,
    new_df: Optional[~AnyDataFrame] = None,
    ids: Optional[List[str]] = None
) → ~AnyDataFrame
Compute the predictions for the next horizon steps. Args:
  • h (int): Forecast horizon.
  • before_predict_callback (callable, optional): Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. Defaults to None.
  • after_predict_callback (callable, optional): Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. Defaults to None.
  • X_df (pandas DataFrame, optional): Dataframe with the future exogenous features. Should have the id column and the time column. Defaults to None.
  • new_df (dask or spark DataFrame, optional): Series data of new observations for which forecasts are to be generated. This dataframe should have the same structure as the one used to fit the model, including any features and time series data. If new_df is not None, the method will generate forecasts for the new observations. Defaults to None.
  • ids (list of str, optional): List with subset of ids seen during training for which the forecasts should be computed. Defaults to None.
Returns:
  • (dask, spark or ray DataFrame): Predictions for each serie and timestep, with one column per model.

method preprocess

preprocess(
    df: ~AnyDataFrame,
    id_col: str = 'unique_id',
    time_col: str = 'ds',
    target_col: str = 'y',
    static_features: Optional[List[str]] = None,
    dropna: bool = True,
    keep_last_n: Optional[int] = None
) → ~AnyDataFrame
Add the features to data. Args:
  • df (dask, spark or ray DataFrame): Series data in long format.
  • id_col (str): Column that identifies each serie. Defaults to ‘unique_id’.
  • time_col (str): Column that identifies each timestep, its values can be timestamps or integers. Defaults to ‘ds’.
  • target_col (str): Column that contains the target. Defaults to ‘y’.
  • static_features (list of str, optional): Names of the features that are static and will be repeated when forecasting. Defaults to None.
  • dropna (bool): Drop rows with missing values produced by the transformations. Defaults to True.
  • keep_last_n (int, optional): Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. Defaults to None.
Returns:
  • (same type as df): df with added features.

method save

save(path: str) → None
Save forecast object Args:
  • path (str): Directory where artifacts will be stored.

method to_local

to_local() → MLForecast
Convert this distributed forecast object into a local one This pulls all the data from the remote machines, so you have to be sure that it fits in the scheduler/driver. If you’re not sure use the save method instead. Returns:
  • (MLForecast): Local forecast object.

method update

update(df: DataFrame) → None
Update the values of the stored series. Args:
  • df (pandas DataFrame): Dataframe with new observations.