The DistributedMLForecast class is a high level abstraction that encapsulates all the steps in the pipeline (preprocessing, fitting the model and computing predictions) and applies them in a distributed way.

The different things that you need to use DistributedMLForecast (as opposed to MLForecast) are:

  1. You need to set up a cluster. We currently support dask, ray and spark.
  2. Your data needs to be a distributed collection (dask, ray or spark dataframe).
  3. You need to use a model that implements distributed training in your framework of choice, e.g. SynapseML for LightGBM in spark.
import platform
import sys

import matplotlib.pyplot as plt
import git
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator

from mlforecast.distributed import DistributedMLForecast
from mlforecast.lag_transforms import ExpandingMean, RollingMean
from mlforecast.target_transforms import Differences
from mlforecast.utils import generate_daily_series, generate_prices_for_series

Dask

import dask.dataframe as dd
from dask.distributed import Client

Client setup

client = Client(n_workers=2, threads_per_worker=1)

Here we define a client that connects to a dask.distributed.LocalCluster, however it could be any other kind of cluster.

Data setup

For dask, the data must be a dask.dataframe.DataFrame. You need to make sure that each time serie is only in one partition and it is recommended that you have as many partitions as you have workers. If you have more partitions than workers make sure to set num_threads=1 to avoid having nested parallelism.

The required input format is the same as for MLForecast, except that it’s a dask.dataframe.DataFrame instead of a pandas.Dataframe.

series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
npartitions = 10
partitioned_series = dd.from_pandas(series.set_index('unique_id'), npartitions=npartitions)  # make sure we split by the id_col
partitioned_series = partitioned_series.map_partitions(lambda df: df.reset_index())
partitioned_series['unique_id'] = partitioned_series['unique_id'].astype(str)  # can't handle categoricals atm
partitioned_series
unique_iddsystatic_0static_1
npartitions=10
id_00objectdatetime64[ns]float64int64int64
id_10
id_90
id_99

Models

In order to perform distributed forecasting, we need to use a model that is able to train in a distributed way using dask. The current implementations are in DaskLGBMForecast and DaskXGBForecast which are just wrappers around the native implementations.

from mlforecast.distributed.models.dask.lgb import DaskLGBMForecast
from mlforecast.distributed.models.dask.xgb import DaskXGBForecast
models = [DaskXGBForecast(random_state=0), DaskLGBMForecast(random_state=0)]

Training

Once we have our models we instantiate a DistributedMLForecast object defining our features. We can then call fit on this object passing our dask dataframe.

fcst = DistributedMLForecast(
    models=models,
    freq='D',
    target_transforms=[Differences([7])],
    lags=[7],
    lag_transforms={
        1: [ExpandingMean()],
        7: [RollingMean(window_size=14)],
    },
    date_features=['dayofweek', 'month'],
    num_threads=1,
    engine=client,
)
fcst.fit(partitioned_series)

Once we have our fitted models we can compute the predictions for the next 7 timesteps.

Forecasting

preds = fcst.predict(7).compute()
preds.head()
unique_iddsDaskXGBForecastDaskLGBMForecast
0id_002002-09-2720.99937121.892795
1id_002002-09-2884.77169283.002009
2id_002002-09-29162.389419163.528475
3id_002002-09-30245.002456245.472042
4id_002002-10-01317.240952313.948840

Saving and loading

Once you’ve trained your model you can use the DistributedMLForecast.save method to save the artifacts for inference. Keep in mind that if you’re on a remote cluster you should set a remote storage like S3 as the destination.

mlforecast uses fsspec to handle the different filesystems, so if you’re using s3 for example you also need to install s3fs. If you’re using pip you can just include the aws extra, e.g. pip install 'mlforecast[aws,dask]', which will install the required dependencies to perform distributed training with dask and saving to S3. If you’re using conda you’ll have to manually install them (conda install dask fsspec fugue s3fs).

# define unique name for CI
def build_unique_name(engine):
    pyver = f'{sys.version_info.major}_{sys.version_info.minor}'
    repo = git.Repo(search_parent_directories=True)
    sha = repo.head.object.hexsha
    return f'{sys.platform}-{pyver}-{engine}-{sha}'
save_dir = build_unique_name('dask')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
fcst.save(save_path)

Once you’ve saved your forecast object you can then load it back by specifying the path where it was saved along with an engine, which will be used to perform the distributed computations (in this case the dask client).

fcst2 = DistributedMLForecast.load(save_path, engine=client)

We can verify that this object produces the same results.

preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)

Converting to local

Another option to store your distributed forecast object is to first turn it into a local one and then save it. Keep in mind that in order to do that all the remote data that is stored from the series will have to be pulled into a single machine (the scheduler in dask, driver in spark, etc.), so you have to be sure that it’ll fit in memory, it should consume about 2x the size of your target column (you can reduce this further by using the keep_last_n argument in the fit method).

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)

Cross validation

cv_res = fcst.cross_validation(
    partitioned_series,
    n_windows=3,
    h=14,
)
cv_res
cv_res.compute().head()
unique_iddsDaskXGBForecastDaskLGBMForecastcutoffy
0id_002002-08-1622.70693821.9675682002-08-1511.878591
1id_002002-08-1795.88594898.2854822002-08-1575.108162
2id_002002-08-18172.546631171.5272722002-08-15175.278407
3id_002002-08-19238.256594238.3757262002-08-15226.062025
4id_002002-08-20306.005923305.1466362002-08-15318.433401
client.close()

Spark

Session setup

from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.2")
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
    .getOrCreate()
)

Data setup

For spark, the data must be a pyspark DataFrame. You need to make sure that each time serie is only in one partition (which you can do using repartitionByRange, for example) and it is recommended that you have as many partitions as you have workers. If you have more partitions than workers make sure to set num_threads=1 to avoid having nested parallelism.

The required input format is the same as for MLForecast, i.e. it should have at least an id column, a time column and a target column.

numPartitions = 4
series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False)
spark_series = spark.createDataFrame(series).repartitionByRange(numPartitions, 'unique_id')

Models

In order to perform distributed forecasting, we need to use a model that is able to train in a distributed way using spark. The current implementations are in SparkLGBMForecast and SparkXGBForecast which are just wrappers around the native implementations.

from mlforecast.distributed.models.spark.lgb import SparkLGBMForecast
from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
models = [SparkLGBMForecast(seed=0), SparkXGBForecast(random_state=0)]

Training

fcst = DistributedMLForecast(
    models,
    freq='D',
    target_transforms=[Differences([7])],    
    lags=[1],
    lag_transforms={
        1: [ExpandingMean()],
    },
    date_features=['dayofweek'],
)
fcst.fit(
    spark_series,
    static_features=['static_0', 'static_1'],
)

Forecasting

preds = fcst.predict(14).toPandas()
preds.head()
unique_iddsSparkLGBMForecastSparkXGBForecast
0id_002001-05-15431.677682424.488985
1id_002001-05-16503.673189502.923172
2id_002001-05-178.1502858.019412
3id_002001-05-1897.62092397.031792
4id_002001-05-19194.568960193.862475

Saving and loading

Once you’ve trained your model you can use the DistributedMLForecast.save method to save the artifacts for inference. Keep in mind that if you’re on a remote cluster you should set a remote storage like S3 as the destination.

mlforecast uses fsspec to handle the different filesystems, so if you’re using s3 for example you also need to install s3fs. If you’re using pip you can just include the aws extra, e.g. pip install 'mlforecast[aws,spark]', which will install the required dependencies to perform distributed training with spark and saving to S3. If you’re using conda you’ll have to manually install them (conda install fsspec fugue pyspark s3fs).

save_dir = build_unique_name('spark')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
fcst.save(save_path)
                                                                                

Once you’ve saved your forecast object you can then load it back by specifying the path where it was saved along with an engine, which will be used to perform the distributed computations (in this case the spark session).

fcst2 = DistributedMLForecast.load(save_path, engine=spark)
                                                                                

We can verify that this object produces the same results.

preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)

Converting to local

Another option to store your distributed forecast object is to first turn it into a local one and then save it. Keep in mind that in order to do that all the remote data that is stored from the series will have to be pulled into a single machine (the scheduler in dask, driver in spark, etc.), so you have to be sure that it’ll fit in memory, it should consume about 2x the size of your target column (you can reduce this further by using the keep_last_n argument in the fit method).

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)

Cross validation

cv_res = fcst.cross_validation(
    spark_series,
    n_windows=3,
    h=14,
).toPandas()
cv_res.head()
unique_iddsSparkLGBMForecastSparkXGBForecastcutoffy
0id_122001-04-03342.978379341.9301272001-04-02328.907629
1id_232001-04-03429.591043428.3203982001-04-02424.716749
2id_262001-04-107.5542847.7076862001-04-0219.814264
3id_182001-04-1198.88504498.8481262001-04-0298.877898
4id_002001-04-13122.727000117.7134872001-04-0298.526008
spark.stop()

Ray

Session setup

import ray
from ray.cluster_utils import Cluster
ray_cluster = Cluster(
    initialize_head=True,
    head_node_args={"num_cpus": 2}
)
ray.init(address=ray_cluster.address, ignore_reinit_error=True)
# add mock node to simulate a cluster
mock_node = ray_cluster.add_node(num_cpus=2)

Data setup

For ray, the data must be a ray DataFrame. It is recommended that you have as many partitions as you have workers. If you have more partitions than workers make sure to set num_threads=1 to avoid having nested parallelism.

The required input format is the same as for MLForecast, i.e. it should have at least an id column, a time column and a target column.

series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False)
# we need noncategory unique_id
series['unique_id'] = series['unique_id'].astype(str)
ray_series = ray.data.from_pandas(series)

Models

The ray integration allows to include lightgbm (RayLGBMRegressor), and xgboost (RayXGBRegressor).

from mlforecast.distributed.models.ray.lgb import RayLGBMForecast
from mlforecast.distributed.models.ray.xgb import RayXGBForecast
models = [RayLGBMForecast(random_state=0), RayXGBForecast(random_state=0)]

Training

To control the number of partitions to use using Ray, we have to include num_partitions to DistributedMLForecast.

num_partitions = 4
fcst = DistributedMLForecast(
    models,
    freq='D',
    target_transforms=[Differences([7])],
    lags=[1],
    lag_transforms={
        1: [ExpandingMean()],
    },
    date_features=['dayofweek'],
    num_partitions=num_partitions, # Use num_partitions to reduce overhead
)
fcst.fit(
    ray_series,
    static_features=['static_0', 'static_1'],
)

Forecasting

preds = fcst.predict(14).to_pandas()
preds.head()
unique_iddsRayLGBMForecastRayXGBForecast
0id_002001-05-15431.677682427.262462
1id_002001-05-16503.673189502.605670
2id_002001-05-178.1502857.604773
3id_002001-05-1897.62092397.582869
4id_002001-05-19194.568960192.818578

Saving and loading

Once you’ve trained your model you can use the DistributedMLForecast.save method to save the artifacts for inference. Keep in mind that if you’re on a remote cluster you should set a remote storage like S3 as the destination.

mlforecast uses fsspec to handle the different filesystems, so if you’re using s3 for example you also need to install s3fs. If you’re using pip you can just include the aws extra, e.g. pip install 'mlforecast[aws,ray]', which will install the required dependencies to perform distributed training with ray and saving to S3. If you’re using conda you’ll have to manually install them (conda install fsspec fugue ray s3fs).

save_dir = build_unique_name('ray')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
fcst.save(save_path)

Once you’ve saved your forecast object you can then load it back by specifying the path where it was saved along with an engine, which will be used to perform the distributed computations (in this case the ‘ray’ string).

fcst2 = DistributedMLForecast.load(save_path, engine='ray')

We can verify that this object produces the same results.

preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)

Converting to local

Another option to store your distributed forecast object is to first turn it into a local one and then save it. Keep in mind that in order to do that all the remote data that is stored from the series will have to be pulled into a single machine (the scheduler in dask, driver in spark, etc.), so you have to be sure that it’ll fit in memory, it should consume about 2x the size of your target column (you can reduce this further by using the keep_last_n argument in the fit method).

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)

Cross validation

cv_res = fcst.cross_validation(
    ray_series,
    n_windows=3,
    h=14,
).to_pandas()
cv_res.head()
unique_iddsRayLGBMForecastRayXGBForecastcutoffy
0id_102001-05-0124.96246122.9986152001-04-3031.878545
1id_102001-05-0253.21964554.2981052001-04-3048.349363
2id_102001-05-0378.06873276.1119072001-04-3071.607111
3id_102001-05-04103.153889104.3441352001-04-30103.482107
4id_102001-05-05116.708231115.9505232001-04-30124.719690
ray.shutdown()