Minimal example of distributed training with MLForecast
DistributedMLForecast
class is a high level abstraction that encapsulates all the steps in the
pipeline (preprocessing, fitting the model and computing predictions)
and applies them in a distributed way.
The different things that you need to use
DistributedMLForecast
(as opposed to
MLForecast
)
are:
dask.distributed.LocalCluster
, however it could be any other kind of
cluster.
dask.dataframe.DataFrame
. You need to
make sure that each time serie is only in one partition and it is
recommended that you have as many partitions as you have workers. If you
have more partitions than workers make sure to set num_threads=1
to
avoid having nested parallelism.
The required input format is the same as for
MLForecast
,
except that it’s a dask.dataframe.DataFrame
instead of a
pandas.Dataframe
.
unique_id | ds | y | static_0 | static_1 | sin1_7 | sin2_7 | cos1_7 | cos2_7 | |
---|---|---|---|---|---|---|---|---|---|
npartitions=10 | |||||||||
id_00 | object | datetime64[ns] | float64 | int64 | int64 | float32 | float32 | float32 | float32 |
id_10 | … | … | … | … | … | … | … | … | … |
… | … | … | … | … | … | … | … | … | … |
id_90 | … | … | … | … | … | … | … | … | … |
id_99 | … | … | … | … | … | … | … | … | … |
dask
. The current
implementations are in
DaskLGBMForecast
and
DaskXGBForecast
which are just wrappers around the native implementations.
DistributedMLForecast
object defining our features. We can then call fit
on this object
passing our dask dataframe.
unique_id | ds | DaskXGBForecast | DaskLGBMForecast | |
---|---|---|---|---|
0 | id_00 | 2002-09-27 00:00:00 | 21.722841 | 21.725511 |
1 | id_00 | 2002-09-28 00:00:00 | 84.918194 | 84.606362 |
2 | id_00 | 2002-09-29 00:00:00 | 162.067624 | 163.36802 |
3 | id_00 | 2002-09-30 00:00:00 | 249.001477 | 246.422894 |
4 | id_00 | 2002-10-01 00:00:00 | 317.149512 | 315.538403 |
DistributedMLForecast.save
method to save the artifacts for inference. Keep in mind that if you’re
on a remote cluster you should set a remote storage like S3 as the
destination.
mlforecast uses
fsspec to handle
the different filesystems, so if you’re using s3 for example you also
need to install s3fs. If
you’re using pip you can just include the aws extra,
e.g. pip install 'mlforecast[aws,dask]'
, which will install the
required dependencies to perform distributed training with dask and
saving to S3. If you’re using conda you’ll have to manually install them
(conda install dask fsspec fugue s3fs
).
keep_last_n
argument in the fit
method).
unique_id | ds | DaskXGBForecast | DaskLGBMForecast | cutoff | y | |
---|---|---|---|---|---|---|
61 | id_04 | 2002-08-21 00:00:00 | 68.3418 | 68.944539 | 2002-08-15 00:00:00 | 69.699857 |
83 | id_15 | 2002-08-29 00:00:00 | 199.315403 | 199.663555 | 2002-08-15 00:00:00 | 206.082864 |
103 | id_17 | 2002-08-21 00:00:00 | 156.822598 | 158.018246 | 2002-08-15 00:00:00 | 152.227984 |
61 | id_24 | 2002-08-21 00:00:00 | 136.598356 | 136.576865 | 2002-08-15 00:00:00 | 138.559945 |
36 | id_33 | 2002-08-24 00:00:00 | 95.6072 | 96.249354 | 2002-08-15 00:00:00 | 102.068997 |
pyspark DataFrame
. You need to make sure
that each time serie is only in one partition (which you can do using
repartitionByRange
, for example) and it is recommended that you have
as many partitions as you have workers. If you have more partitions than
workers make sure to set num_threads=1
to avoid having nested
parallelism.
The required input format is the same as for
MLForecast
,
i.e. it should have at least an id column, a time column and a target
column.
spark
. The current
implementations are in
SparkLGBMForecast
and
SparkXGBForecast
which are just wrappers around the native implementations.
unique_id | ds | SparkLGBMForecast | SparkXGBForecast | |
---|---|---|---|---|
0 | id_00 | 2002-09-27 | 15.053577 | 18.631477 |
1 | id_00 | 2002-09-28 | 93.010037 | 93.796269 |
2 | id_00 | 2002-09-29 | 160.120148 | 159.582315 |
3 | id_00 | 2002-09-30 | 250.445885 | 250.861651 |
4 | id_00 | 2002-10-01 | 323.335956 | 321.564089 |
DistributedMLForecast.save
method to save the artifacts for inference. Keep in mind that if you’re
on a remote cluster you should set a remote storage like S3 as the
destination.
mlforecast uses
fsspec to handle
the different filesystems, so if you’re using s3 for example you also
need to install s3fs. If
you’re using pip you can just include the aws extra,
e.g. pip install 'mlforecast[aws,spark]'
, which will install the
required dependencies to perform distributed training with spark and
saving to S3. If you’re using conda you’ll have to manually install them
(conda install fsspec fugue pyspark s3fs
).
keep_last_n
argument in the fit
method).
unique_id | ds | SparkLGBMForecast | SparkXGBForecast | cutoff | y | |
---|---|---|---|---|---|---|
0 | id_03 | 2002-08-18 | 3.272922 | 3.348874 | 2002-08-15 | 3.060194 |
1 | id_09 | 2002-08-20 | 402.718091 | 402.622501 | 2002-08-15 | 398.784459 |
2 | id_25 | 2002-08-22 | 87.189811 | 86.891632 | 2002-08-15 | 82.731377 |
3 | id_06 | 2002-08-21 | 20.416790 | 20.478502 | 2002-08-15 | 19.196394 |
4 | id_22 | 2002-08-23 | 357.718513 | 360.502024 | 2002-08-15 | 394.770699 |
ray DataFrame
. It is recommended that you
have as many partitions as you have workers. If you have more partitions
than workers make sure to set num_threads=1
to avoid having nested
parallelism.
The required input format is the same as for
MLForecast
,
i.e. it should have at least an id column, a time column and a target
column.
lightgbm
(RayLGBMRegressor
),
and xgboost
(RayXGBRegressor
).
num_partitions
to
DistributedMLForecast
.
unique_id | ds | RayLGBMForecast | RayXGBForecast | |
---|---|---|---|---|
0 | id_00 | 2002-09-27 | 15.232455 | 10.38301 |
1 | id_00 | 2002-09-28 | 92.288994 | 92.531502 |
2 | id_00 | 2002-09-29 | 160.043472 | 160.722885 |
3 | id_00 | 2002-09-30 | 250.03212 | 252.821899 |
4 | id_00 | 2002-10-01 | 322.905182 | 324.387695 |
DistributedMLForecast.save
method to save the artifacts for inference. Keep in mind that if you’re
on a remote cluster you should set a remote storage like S3 as the
destination.
mlforecast uses
fsspec to handle
the different filesystems, so if you’re using s3 for example you also
need to install s3fs. If
you’re using pip you can just include the aws extra,
e.g. pip install 'mlforecast[aws,ray]'
, which will install the
required dependencies to perform distributed training with ray and
saving to S3. If you’re using conda you’ll have to manually install them
(conda install fsspec fugue ray s3fs
).
keep_last_n
argument in the fit
method).
unique_id | ds | RayLGBMForecast | RayXGBForecast | cutoff | y | |
---|---|---|---|---|---|---|
0 | id_05 | 2002-09-21 | 108.285187 | 108.619698 | 2002-09-12 | 108.726387 |
1 | id_08 | 2002-09-16 | 26.287956 | 26.589603 | 2002-09-12 | 27.980670 |
2 | id_08 | 2002-09-25 | 83.210945 | 84.194962 | 2002-09-12 | 86.344885 |
3 | id_11 | 2002-09-22 | 416.994843 | 417.106506 | 2002-09-12 | 425.434661 |
4 | id_16 | 2002-09-14 | 377.916382 | 375.421600 | 2002-09-12 | 400.361977 |