Quick start (distributed)
Minimal example of distributed training with MLForecast
The
DistributedMLForecast
class is a high level abstraction that encapsulates all the steps in the
pipeline (preprocessing, fitting the model and computing predictions)
and applies them in a distributed way.
The different things that you need to use
DistributedMLForecast
(as opposed to
MLForecast
)
are:
- You need to set up a cluster. We currently support dask, ray and spark.
- Your data needs to be a distributed collection (dask, ray or spark dataframe).
- You need to use a model that implements distributed training in your framework of choice, e.g. SynapseML for LightGBM in spark.
import platform
import sys
import tempfile
import matplotlib.pyplot as plt
import git
import numpy as np
import pandas as pd
import s3fs
from sklearn.base import BaseEstimator
from mlforecast.distributed import DistributedMLForecast
from mlforecast.lag_transforms import ExpandingMean, ExponentiallyWeightedMean, RollingMean
from mlforecast.target_transforms import Differences
from mlforecast.utils import generate_daily_series, generate_prices_for_series
Dask
import dask.dataframe as dd
from dask.distributed import Client
Client setup
client = Client(n_workers=2, threads_per_worker=1)
Here we define a client that connects to a
dask.distributed.LocalCluster
, however it could be any other kind of
cluster.
Data setup
For dask, the data must be a dask.dataframe.DataFrame
. You need to
make sure that each time serie is only in one partition and it is
recommended that you have as many partitions as you have workers. If you
have more partitions than workers make sure to set num_threads=1
to
avoid having nested parallelism.
The required input format is the same as for
MLForecast
,
except that it’s a dask.dataframe.DataFrame
instead of a
pandas.Dataframe
.
series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
npartitions = 10
partitioned_series = dd.from_pandas(series.set_index('unique_id'), npartitions=npartitions) # make sure we split by the id_col
partitioned_series = partitioned_series.map_partitions(lambda df: df.reset_index())
partitioned_series['unique_id'] = partitioned_series['unique_id'].astype(str) # can't handle categoricals atm
partitioned_series
unique_id | ds | y | static_0 | static_1 | |
---|---|---|---|---|---|
npartitions=10 | |||||
id_00 | object | datetime64[ns] | float64 | int64 | int64 |
id_10 | … | … | … | … | … |
… | … | … | … | … | … |
id_90 | … | … | … | … | … |
id_99 | … | … | … | … | … |
Models
In order to perform distributed forecasting, we need to use a model that
is able to train in a distributed way using dask
. The current
implementations are in
DaskLGBMForecast
and
DaskXGBForecast
which are just wrappers around the native implementations.
from mlforecast.distributed.models.dask.lgb import DaskLGBMForecast
from mlforecast.distributed.models.dask.xgb import DaskXGBForecast
models = [DaskXGBForecast(random_state=0), DaskLGBMForecast(random_state=0)]
Training
Once we have our models we instantiate a
DistributedMLForecast
object defining our features. We can then call fit
on this object
passing our dask dataframe.
fcst = DistributedMLForecast(
models=models,
freq='D',
target_transforms=[Differences([7])],
lags=[7],
lag_transforms={
1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
7: [RollingMean(window_size=14)],
},
date_features=['dayofweek', 'month'],
num_threads=1,
engine=client,
)
fcst.fit(partitioned_series)
Once we have our fitted models we can compute the predictions for the next 7 timesteps.
Forecasting
preds = fcst.predict(7).compute()
preds.head()
unique_id | ds | DaskXGBForecast | DaskLGBMForecast | |
---|---|---|---|---|
0 | id_00 | 2002-09-27 00:00:00 | 22.489947 | 21.679944 |
1 | id_00 | 2002-09-28 00:00:00 | 81.806826 | 84.151205 |
2 | id_00 | 2002-09-29 00:00:00 | 162.705641 | 164.024508 |
3 | id_00 | 2002-09-30 00:00:00 | 246.990386 | 246.099977 |
4 | id_00 | 2002-10-01 00:00:00 | 314.741463 | 315.261537 |
Saving and loading
Once you’ve trained your model you can use the
DistributedMLForecast.save
method to save the artifacts for inference. Keep in mind that if you’re
on a remote cluster you should set a remote storage like S3 as the
destination.
mlforecast uses
fsspec to handle
the different filesystems, so if you’re using s3 for example you also
need to install s3fs. If
you’re using pip you can just include the aws extra,
e.g. pip install 'mlforecast[aws,dask]'
, which will install the
required dependencies to perform distributed training with dask and
saving to S3. If you’re using conda you’ll have to manually install them
(conda install dask fsspec fugue s3fs
).
# define unique name for CI
def build_unique_name(engine):
pyver = f'{sys.version_info.major}_{sys.version_info.minor}'
repo = git.Repo(search_parent_directories=True)
sha = repo.head.object.hexsha
return f'{sys.platform}-{pyver}-{engine}-{sha}'
save_dir = build_unique_name('dask')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
tmpdir = tempfile.TemporaryDirectory()
try:
s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
fcst.save(save_path)
except Exception as e:
print(e)
save_path = f'{tmpdir.name}/{save_dir}'
fcst.save(save_path)
Once you’ve saved your forecast object you can then load it back by specifying the path where it was saved along with an engine, which will be used to perform the distributed computations (in this case the dask client).
fcst2 = DistributedMLForecast.load(save_path, engine=client)
We can verify that this object produces the same results.
preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
Converting to local
Another option to store your distributed forecast object is to first
turn it into a local one and then save it. Keep in mind that in order to
do that all the remote data that is stored from the series will have to
be pulled into a single machine (the scheduler in dask, driver in spark,
etc.), so you have to be sure that it’ll fit in memory, it should
consume about 2x the size of your target column (you can reduce this
further by using the keep_last_n
argument in the fit
method).
local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)
Cross validation
cv_res = fcst.cross_validation(
partitioned_series,
n_windows=3,
h=14,
)
cv_res.compute().head()
unique_id | ds | DaskXGBForecast | DaskLGBMForecast | cutoff | y | |
---|---|---|---|---|---|---|
17 | id_01 | 2002-08-19 00:00:00 | 224.458336 | 222.742605 | 2002-08-15 00:00:00 | 210.723139 |
43 | id_03 | 2002-08-17 00:00:00 | 2.235601 | 2.210624 | 2002-08-15 00:00:00 | 2.416967 |
44 | id_03 | 2002-08-18 00:00:00 | 3.276747 | 3.239702 | 2002-08-15 00:00:00 | 3.060194 |
119 | id_08 | 2002-08-23 00:00:00 | 131.261689 | 131.180289 | 2002-08-15 00:00:00 | 138.668463 |
131 | id_09 | 2002-08-21 00:00:00 | 27.716417 | 28.263963 | 2002-08-15 00:00:00 | 22.88374 |
client.close()
Spark
Session setup
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.2")
.config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
.getOrCreate()
)
Data setup
For spark, the data must be a pyspark DataFrame
. You need to make sure
that each time serie is only in one partition (which you can do using
repartitionByRange
, for example) and it is recommended that you have
as many partitions as you have workers. If you have more partitions than
workers make sure to set num_threads=1
to avoid having nested
parallelism.
The required input format is the same as for
MLForecast
,
i.e. it should have at least an id column, a time column and a target
column.
numPartitions = 4
series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False)
spark_series = spark.createDataFrame(series).repartitionByRange(numPartitions, 'unique_id')
Models
In order to perform distributed forecasting, we need to use a model that
is able to train in a distributed way using spark
. The current
implementations are in
SparkLGBMForecast
and
SparkXGBForecast
which are just wrappers around the native implementations.
from mlforecast.distributed.models.spark.lgb import SparkLGBMForecast
from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
models = [SparkLGBMForecast(seed=0), SparkXGBForecast(random_state=0)]
Training
fcst = DistributedMLForecast(
models,
freq='D',
target_transforms=[Differences([7])],
lags=[1],
lag_transforms={
1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
},
date_features=['dayofweek'],
)
fcst.fit(
spark_series,
static_features=['static_0', 'static_1'],
)
Forecasting
preds = fcst.predict(14).toPandas()
preds.head()
unique_id | ds | SparkLGBMForecast | SparkXGBForecast | |
---|---|---|---|---|
0 | id_00 | 2001-05-15 | 430.964632 | 431.202969 |
1 | id_00 | 2001-05-16 | 505.411960 | 504.030227 |
2 | id_00 | 2001-05-17 | 9.889056 | 9.706636 |
3 | id_00 | 2001-05-18 | 99.359694 | 96.258271 |
4 | id_00 | 2001-05-19 | 196.307731 | 197.443618 |
Saving and loading
Once you’ve trained your model you can use the
DistributedMLForecast.save
method to save the artifacts for inference. Keep in mind that if you’re
on a remote cluster you should set a remote storage like S3 as the
destination.
mlforecast uses
fsspec to handle
the different filesystems, so if you’re using s3 for example you also
need to install s3fs. If
you’re using pip you can just include the aws extra,
e.g. pip install 'mlforecast[aws,spark]'
, which will install the
required dependencies to perform distributed training with spark and
saving to S3. If you’re using conda you’ll have to manually install them
(conda install fsspec fugue pyspark s3fs
).
save_dir = build_unique_name('spark')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
try:
s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
fcst.save(save_path)
except Exception as e:
print(e)
save_path = f'{tmpdir.name}/{save_dir}'
fcst.save(save_path)
Once you’ve saved your forecast object you can then load it back by specifying the path where it was saved along with an engine, which will be used to perform the distributed computations (in this case the spark session).
fcst2 = DistributedMLForecast.load(save_path, engine=spark)
We can verify that this object produces the same results.
preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
Converting to local
Another option to store your distributed forecast object is to first
turn it into a local one and then save it. Keep in mind that in order to
do that all the remote data that is stored from the series will have to
be pulled into a single machine (the scheduler in dask, driver in spark,
etc.), so you have to be sure that it’ll fit in memory, it should
consume about 2x the size of your target column (you can reduce this
further by using the keep_last_n
argument in the fit
method).
local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)
Cross validation
cv_res = fcst.cross_validation(
spark_series,
n_windows=3,
h=14,
).toPandas()
cv_res.head()
unique_id | ds | SparkLGBMForecast | SparkXGBForecast | cutoff | y | |
---|---|---|---|---|---|---|
0 | id_15 | 2001-04-04 | 88.438691 | 86.105463 | 2001-04-02 | 92.468763 |
1 | id_25 | 2001-04-12 | 355.712493 | 354.525400 | 2001-04-02 | 320.701359 |
2 | id_03 | 2001-04-08 | 257.243845 | 253.834157 | 2001-04-02 | 274.420045 |
3 | id_14 | 2001-04-07 | 24.925278 | 23.833504 | 2001-04-02 | 26.906679 |
4 | id_01 | 2001-04-16 | 89.180665 | 90.743194 | 2001-04-02 | 93.807725 |
spark.stop()
Ray
Session setup
import ray
from ray.cluster_utils import Cluster
ray_cluster = Cluster(
initialize_head=True,
head_node_args={"num_cpus": 2}
)
ray.init(address=ray_cluster.address, ignore_reinit_error=True)
# add mock node to simulate a cluster
mock_node = ray_cluster.add_node(num_cpus=2)
Data setup
For ray, the data must be a ray DataFrame
. It is recommended that you
have as many partitions as you have workers. If you have more partitions
than workers make sure to set num_threads=1
to avoid having nested
parallelism.
The required input format is the same as for
MLForecast
,
i.e. it should have at least an id column, a time column and a target
column.
series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False)
# we need noncategory unique_id
series['unique_id'] = series['unique_id'].astype(str)
ray_series = ray.data.from_pandas(series)
Models
The ray integration allows to include lightgbm
(RayLGBMRegressor
),
and xgboost
(RayXGBRegressor
).
from mlforecast.distributed.models.ray.lgb import RayLGBMForecast
from mlforecast.distributed.models.ray.xgb import RayXGBForecast
models = [RayLGBMForecast(random_state=0), RayXGBForecast(random_state=0)]
Training
To control the number of partitions to use using Ray, we have to include
num_partitions
to
DistributedMLForecast
.
num_partitions = 4
fcst = DistributedMLForecast(
models,
freq='D',
target_transforms=[Differences([7])],
lags=[1],
lag_transforms={
1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
},
date_features=['dayofweek'],
num_partitions=num_partitions, # Use num_partitions to reduce overhead
)
fcst.fit(
ray_series,
static_features=['static_0', 'static_1'],
)
Forecasting
preds = fcst.predict(14).to_pandas()
preds.head()
unique_id | ds | RayLGBMForecast | RayXGBForecast | |
---|---|---|---|---|
0 | id_01 | 2001-05-15 | 118.505341 | 118.32222 |
1 | id_01 | 2001-05-16 | 152.321457 | 152.265915 |
2 | id_01 | 2001-05-17 | 181.979599 | 181.945618 |
3 | id_01 | 2001-05-18 | 9.530758 | 9.543224 |
4 | id_01 | 2001-05-19 | 40.503441 | 40.661186 |
Saving and loading
Once you’ve trained your model you can use the
DistributedMLForecast.save
method to save the artifacts for inference. Keep in mind that if you’re
on a remote cluster you should set a remote storage like S3 as the
destination.
mlforecast uses
fsspec to handle
the different filesystems, so if you’re using s3 for example you also
need to install s3fs. If
you’re using pip you can just include the aws extra,
e.g. pip install 'mlforecast[aws,ray]'
, which will install the
required dependencies to perform distributed training with ray and
saving to S3. If you’re using conda you’ll have to manually install them
(conda install fsspec fugue ray s3fs
).
save_dir = build_unique_name('ray')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
try:
s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
fcst.save(save_path)
except Exception as e:
print(e)
save_path = f'{tmpdir.name}/{save_dir}'
fcst.save(save_path)
Once you’ve saved your forecast object you can then load it back by specifying the path where it was saved along with an engine, which will be used to perform the distributed computations (in this case the ‘ray’ string).
fcst2 = DistributedMLForecast.load(save_path, engine='ray')
We can verify that this object produces the same results.
preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
Converting to local
Another option to store your distributed forecast object is to first
turn it into a local one and then save it. Keep in mind that in order to
do that all the remote data that is stored from the series will have to
be pulled into a single machine (the scheduler in dask, driver in spark,
etc.), so you have to be sure that it’ll fit in memory, it should
consume about 2x the size of your target column (you can reduce this
further by using the keep_last_n
argument in the fit
method).
local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)
Cross validation
cv_res = fcst.cross_validation(
ray_series,
n_windows=3,
h=14,
).to_pandas()
cv_res.head()
unique_id | ds | RayLGBMForecast | RayXGBForecast | cutoff | y | |
---|---|---|---|---|---|---|
0 | id_10 | 2001-05-01 | 24.767561 | 24.528799 | 2001-04-30 | 31.878545 |
1 | id_10 | 2001-05-07 | 1.916985 | 2.323445 | 2001-04-30 | 7.365955 |
2 | id_13 | 2001-05-01 | 210.900330 | 212.959320 | 2001-04-30 | 190.485236 |
3 | id_14 | 2001-05-01 | 196.620819 | 196.253036 | 2001-04-30 | 213.631212 |
4 | id_14 | 2001-05-03 | 323.323334 | 322.372894 | 2001-04-30 | 338.234837 |
ray.shutdown()