> ## Documentation Index
> Fetch the complete documentation index at: https://nixtlaverse.nixtla.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Quick start (distributed)

> Minimal example of distributed training with MLForecast

The `DistributedMLForecast` class is a high level abstraction that
encapsulates all the steps in the pipeline (preprocessing, fitting the
model and computing predictions) and applies them in a distributed way.

The different things that you need to use `DistributedMLForecast` (as
opposed to `MLForecast`) are:

1. You need to set up a cluster. We currently support dask, ray and
   spark.
2. Your data needs to be a distributed collection (dask, ray or spark
   dataframe).
3. You need to use a model that implements distributed training in your
   framework of choice, e.g. SynapseML for LightGBM in spark.

```python theme={null}
import platform
import sys
import tempfile

import matplotlib.pyplot as plt
import git
import numpy as np
import pandas as pd
import s3fs
from sklearn.base import BaseEstimator
from utilsforecast.feature_engineering import fourier

from mlforecast.distributed import DistributedMLForecast
from mlforecast.lag_transforms import ExpandingMean, ExponentiallyWeightedMean, RollingMean
from mlforecast.target_transforms import Differences
from mlforecast.utils import generate_daily_series, generate_prices_for_series
```

## Dask

```python theme={null}
import dask.dataframe as dd
from dask.distributed import Client
```

### Client setup

```python theme={null}
client = Client(n_workers=2, threads_per_worker=1)
```

Here we define a client that connects to a
`dask.distributed.LocalCluster`, however it could be any other kind of
cluster.

### Data setup

For dask, the data must be a `dask.dataframe.DataFrame`. You need to
make sure that each time series is only in one partition and it is
recommended that you have as many partitions as you have workers. If you
have more partitions than workers make sure to set `num_threads=1` to
avoid having nested parallelism.

The required input format is the same as for `MLForecast`, except that
it’s a `dask.dataframe.DataFrame` instead of a `pandas.Dataframe`.

```python theme={null}
series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
train, future = fourier(series, freq='d', season_length=7, k=2, h=7)
npartitions = 10
partitioned_series = dd.from_pandas(train.set_index('unique_id'), npartitions=npartitions)  # make sure we split by the id_col
partitioned_series = partitioned_series.map_partitions(lambda df: df.reset_index())
partitioned_series['unique_id'] = partitioned_series['unique_id'].astype(str)  # can't handle categoricals atm
partitioned_series
```

|                | unique\_id | ds              | y       | static\_0 | static\_1 | sin1\_7 | sin2\_7 | cos1\_7 | cos2\_7 |
| -------------- | ---------- | --------------- | ------- | --------- | --------- | ------- | ------- | ------- | ------- |
| npartitions=10 |            |                 |         |           |           |         |         |         |         |
| id\_00         | object     | datetime64\[ns] | float64 | int64     | int64     | float32 | float32 | float32 | float32 |
| id\_10         | ...        | ...             | ...     | ...       | ...       | ...     | ...     | ...     | ...     |
| ...            | ...        | ...             | ...     | ...       | ...       | ...     | ...     | ...     | ...     |
| id\_90         | ...        | ...             | ...     | ...       | ...       | ...     | ...     | ...     | ...     |
| id\_99         | ...        | ...             | ...     | ...       | ...       | ...     | ...     | ...     | ...     |

### Models

In order to perform distributed forecasting, we need to use a model that
is able to train in a distributed way using `dask`. The current
implementations are in `DaskLGBMForecast` and `DaskXGBForecast` which
are just wrappers around the native implementations.

```python theme={null}
from mlforecast.distributed.models.dask.lgb import DaskLGBMForecast
from mlforecast.distributed.models.dask.xgb import DaskXGBForecast
```

```python theme={null}
models = [
    DaskXGBForecast(random_state=0),
    DaskLGBMForecast(random_state=0, verbosity=-1),
]
```

### Training

Once we have our models we instantiate a `DistributedMLForecast` object
defining our features. We can then call `fit` on this object passing our
dask dataframe.

```python theme={null}
fcst = DistributedMLForecast(
    models=models,
    freq='D',
    target_transforms=[Differences([7])],
    lags=[7],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
        7: [RollingMean(window_size=14)],
    },
    date_features=['dayofweek', 'month'],
    num_threads=1,
    engine=client,
)
fcst.fit(partitioned_series, static_features=['static_0', 'static_1'])
```

Once we have our fitted models we can compute the predictions for the
next 7 timesteps.

### Forecasting

```python theme={null}
preds = fcst.predict(7, X_df=future).compute()
preds.head()
```

|   | unique\_id | ds                  | DaskXGBForecast | DaskLGBMForecast |
| - | ---------- | ------------------- | --------------- | ---------------- |
| 0 | id\_00     | 2002-09-27 00:00:00 | 21.722841       | 21.725511        |
| 1 | id\_00     | 2002-09-28 00:00:00 | 84.918194       | 84.606362        |
| 2 | id\_00     | 2002-09-29 00:00:00 | 162.067624      | 163.36802        |
| 3 | id\_00     | 2002-09-30 00:00:00 | 249.001477      | 246.422894       |
| 4 | id\_00     | 2002-10-01 00:00:00 | 317.149512      | 315.538403       |

```python theme={null}
preds2 = fcst.predict(7, X_df=future).compute()
preds3 = fcst.predict(7, new_df=partitioned_series, X_df=future).compute()
pd.testing.assert_frame_equal(preds, preds2)
pd.testing.assert_frame_equal(preds, preds3)
```

### Saving and loading

Once you’ve trained your model you can use the
`DistributedMLForecast.save` method to save the artifacts for inference.
Keep in mind that if you’re on a remote cluster you should set a remote
storage like S3 as the destination.

mlforecast uses
[fsspec](https://filesystem-spec.readthedocs.io/en/latest/) to handle
the different filesystems, so if you’re using s3 for example you also
need to install [s3fs](https://s3fs.readthedocs.io/en/latest/). If
you’re using pip you can just include the aws extra,
e.g. `pip install 'mlforecast[aws,dask]'`, which will install the
required dependencies to perform distributed training with dask and
saving to S3. If you’re using conda you’ll have to manually install them
(`conda install dask fsspec fugue s3fs`).

```python theme={null}
# define unique name for CI
def build_unique_name(engine):
    pyver = f'{sys.version_info.major}_{sys.version_info.minor}'
    repo = git.Repo(search_parent_directories=True)
    sha = repo.head.object.hexsha
    return f'{sys.platform}-{pyver}-{engine}-{sha}'
```

```python theme={null}
save_dir = build_unique_name('dask')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
tmpdir = tempfile.TemporaryDirectory()
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)
```

Once you’ve saved your forecast object you can then load it back by
specifying the path where it was saved along with an engine, which will
be used to perform the distributed computations (in this case the dask
client).

```python theme={null}
fcst2 = DistributedMLForecast.load(save_path, engine=client)
```

We can verify that this object produces the same results.

```python theme={null}
preds = fa.as_pandas(fcst.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
```

### Converting to local

Another option to store your distributed forecast object is to first
turn it into a local one and then save it. Keep in mind that in order to
do that all the remote data that is stored from the series will have to
be pulled into a single machine (the scheduler in dask, driver in spark,
etc.), so you have to be sure that it’ll fit in memory, it should
consume about 2x the size of your target column (you can reduce this
further by using the `keep_last_n` argument in the `fit` method).

```python theme={null}
local_fcst = fcst.to_local()
local_preds = local_fcst.predict(7, X_df=future)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)
```

### Cross validation

```python theme={null}
cv_res = fcst.cross_validation(
    partitioned_series,
    n_windows=3,
    h=14,
    static_features=['static_0', 'static_1'],
)
```

```python theme={null}
cv_res.compute().head()
```

|     | unique\_id | ds                  | DaskXGBForecast | DaskLGBMForecast | cutoff              | y          |
| --- | ---------- | ------------------- | --------------- | ---------------- | ------------------- | ---------- |
| 61  | id\_04     | 2002-08-21 00:00:00 | 68.3418         | 68.944539        | 2002-08-15 00:00:00 | 69.699857  |
| 83  | id\_15     | 2002-08-29 00:00:00 | 199.315403      | 199.663555       | 2002-08-15 00:00:00 | 206.082864 |
| 103 | id\_17     | 2002-08-21 00:00:00 | 156.822598      | 158.018246       | 2002-08-15 00:00:00 | 152.227984 |
| 61  | id\_24     | 2002-08-21 00:00:00 | 136.598356      | 136.576865       | 2002-08-15 00:00:00 | 138.559945 |
| 36  | id\_33     | 2002-08-24 00:00:00 | 95.6072         | 96.249354        | 2002-08-15 00:00:00 | 102.068997 |

```python theme={null}
non_std_series = partitioned_series.copy()
non_std_series = non_std_series.rename(columns={'ds': 'time', 'y': 'value', 'unique_id': 'some_id'})
flow_params = dict(
    models=[DaskXGBForecast(random_state=0)],
    target_transforms=[Differences([7])],    
    lags=[7],
    lag_transforms={
        1: [ExpandingMean()],
        7: [RollingMean(window_size=14)]
    },
    num_threads=1,
)
fcst = DistributedMLForecast(freq='D', **flow_params)
fcst.fit(partitioned_series, static_features=['static_0', 'static_1'])
preds = fcst.predict(7, X_df=future).compute()
fcst2 = DistributedMLForecast(freq='D', **flow_params)
fcst2.preprocess(
    non_std_series,
    id_col='some_id',
    time_col='time',
    target_col='value',
    static_features=['static_0', 'static_1'],
)
fcst2.models_ = fcst.models_  # distributed training can end up with different fits
non_std_preds = fcst2.predict(7, X_df=future.rename(columns={'ds': 'time', 'unique_id': 'some_id'})).compute()
pd.testing.assert_frame_equal(
    preds.drop(columns='ds'),
    non_std_preds.drop(columns='time').rename(columns={'some_id': 'unique_id'})
)
```

```python theme={null}
client.close()
```

## Spark

### Session setup

```python theme={null}
from pyspark.sql import SparkSession
```

```python theme={null}
spark = (
    SparkSession
    .builder
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.2")
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
    .getOrCreate()
)
```

### Data setup

For spark, the data must be a `pyspark DataFrame`. You need to make sure
that each time series is only in one partition (which you can do using
`repartitionByRange`, for example) and it is recommended that you have
as many partitions as you have workers. If you have more partitions than
workers make sure to set `num_threads=1` to avoid having nested
parallelism.

The required input format is the same as for `MLForecast`, i.e. it
should have at least an id column, a time column and a target column.

```python theme={null}
series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
series['unique_id'] = series['unique_id'].astype(str)  # can't handle categoricals atm
train, future = fourier(series, freq='d', season_length=7, k=2, h=7)
numPartitions = 4
spark_series = spark.createDataFrame(train).repartitionByRange(numPartitions, 'unique_id')
```

### Models

In order to perform distributed forecasting, we need to use a model that
is able to train in a distributed way using `spark`. The current
implementations are in `SparkLGBMForecast` and `SparkXGBForecast` which
are just wrappers around the native implementations.

```python theme={null}
from mlforecast.distributed.models.spark.lgb import SparkLGBMForecast
from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
```

```python theme={null}
models = [
    SparkLGBMForecast(seed=0, verbosity=-1),
    SparkXGBForecast(random_state=0),
]
```

### Training

```python theme={null}
fcst = DistributedMLForecast(
    models,
    freq='D',
    target_transforms=[Differences([7])],    
    lags=[1],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
    },
    date_features=['dayofweek'],
)
fcst.fit(
    spark_series,
    static_features=['static_0', 'static_1'],
)
```

### Forecasting

```python theme={null}
preds = fcst.predict(7, X_df=future).toPandas()
```

```text theme={null}
                                                                                
```

```python theme={null}
preds.head()
```

|   | unique\_id | ds         | SparkLGBMForecast | SparkXGBForecast |
| - | ---------- | ---------- | ----------------- | ---------------- |
| 0 | id\_00     | 2002-09-27 | 15.053577         | 18.631477        |
| 1 | id\_00     | 2002-09-28 | 93.010037         | 93.796269        |
| 2 | id\_00     | 2002-09-29 | 160.120148        | 159.582315       |
| 3 | id\_00     | 2002-09-30 | 250.445885        | 250.861651       |
| 4 | id\_00     | 2002-10-01 | 323.335956        | 321.564089       |

### Saving and loading

Once you’ve trained your model you can use the
`DistributedMLForecast.save` method to save the artifacts for inference.
Keep in mind that if you’re on a remote cluster you should set a remote
storage like S3 as the destination.

mlforecast uses
[fsspec](https://filesystem-spec.readthedocs.io/en/latest/) to handle
the different filesystems, so if you’re using s3 for example you also
need to install [s3fs](https://s3fs.readthedocs.io/en/latest/). If
you’re using pip you can just include the aws extra,
e.g. `pip install 'mlforecast[aws,spark]'`, which will install the
required dependencies to perform distributed training with spark and
saving to S3. If you’re using conda you’ll have to manually install them
(`conda install fsspec fugue pyspark s3fs`).

```python theme={null}
save_dir = build_unique_name('spark')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)
```

```text theme={null}
                                                                                
```

Once you’ve saved your forecast object you can then load it back by
specifying the path where it was saved along with an engine, which will
be used to perform the distributed computations (in this case the spark
session).

```python theme={null}
fcst2 = DistributedMLForecast.load(save_path, engine=spark)
```

```text theme={null}
                                                                                
```

We can verify that this object produces the same results.

```python theme={null}
preds = fa.as_pandas(fcst.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
```

```text theme={null}
                                                                                
```

### Converting to local

Another option to store your distributed forecast object is to first
turn it into a local one and then save it. Keep in mind that in order to
do that all the remote data that is stored from the series will have to
be pulled into a single machine (the scheduler in dask, driver in spark,
etc.), so you have to be sure that it’ll fit in memory, it should
consume about 2x the size of your target column (you can reduce this
further by using the `keep_last_n` argument in the `fit` method).

```python theme={null}
local_fcst = fcst.to_local()
local_preds = local_fcst.predict(7, X_df=future)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)
```

### Cross validation

```python theme={null}
cv_res = fcst.cross_validation(
    spark_series,
    n_windows=3,
    h=14,
    static_features=['static_0', 'static_1'],
).toPandas()
```

```python theme={null}
cv_res.head()
```

|   | unique\_id | ds         | SparkLGBMForecast | SparkXGBForecast | cutoff     | y          |
| - | ---------- | ---------- | ----------------- | ---------------- | ---------- | ---------- |
| 0 | id\_03     | 2002-08-18 | 3.272922          | 3.348874         | 2002-08-15 | 3.060194   |
| 1 | id\_09     | 2002-08-20 | 402.718091        | 402.622501       | 2002-08-15 | 398.784459 |
| 2 | id\_25     | 2002-08-22 | 87.189811         | 86.891632        | 2002-08-15 | 82.731377  |
| 3 | id\_06     | 2002-08-21 | 20.416790         | 20.478502        | 2002-08-15 | 19.196394  |
| 4 | id\_22     | 2002-08-23 | 357.718513        | 360.502024       | 2002-08-15 | 394.770699 |

```python theme={null}
spark.stop()
```

## Ray

### Session setup

```python theme={null}
# ray
import ray
from ray.cluster_utils import Cluster
```

```python theme={null}
# ray
ray_cluster = Cluster(
    initialize_head=True,
    head_node_args={"num_cpus": 2}
)
ray.init(address=ray_cluster.address, ignore_reinit_error=True)
# add mock node to simulate a cluster
mock_node = ray_cluster.add_node(num_cpus=2)
```

### Data setup

For ray, the data must be a `ray DataFrame`. It is recommended that you
have as many partitions as you have workers. If you have more partitions
than workers make sure to set `num_threads=1` to avoid having nested
parallelism.

The required input format is the same as for `MLForecast`, i.e. it
should have at least an id column, a time column and a target column.

```python theme={null}
# ray
series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
series['unique_id'] = series['unique_id'].astype(str)  # can't handle categoricals atm
train, future = fourier(series, freq='d', season_length=7, k=2, h=7)
ray_series = ray.data.from_pandas(train)
```

### Models

The ray integration allows to include `lightgbm` (`RayLGBMRegressor`),
and `xgboost` (`RayXGBRegressor`).

```python theme={null}
# ray
from mlforecast.distributed.models.ray.lgb import RayLGBMForecast
from mlforecast.distributed.models.ray.xgb import RayXGBForecast
```

```python theme={null}
# ray
models = [
    RayLGBMForecast(random_state=0, verbosity=-1),
    RayXGBForecast(random_state=0),
]
```

### Training

To control the number of partitions to use using Ray, we have to include
`num_partitions` to `DistributedMLForecast`.

```python theme={null}
# ray
num_partitions = 4
fcst = DistributedMLForecast(
    models,
    freq='D',
    target_transforms=[Differences([7])],
    lags=[1],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
    },
    date_features=['dayofweek'],
    num_partitions=num_partitions, # Use num_partitions to reduce overhead
)
fcst.fit(
    ray_series,
    static_features=['static_0', 'static_1'],
)
```

### Forecasting

```python theme={null}
# ray
preds = fcst.predict(7, X_df=future).to_pandas()
```

```python theme={null}
# ray
preds.head()
```

|   | unique\_id | ds         | RayLGBMForecast | RayXGBForecast |
| - | ---------- | ---------- | --------------- | -------------- |
| 0 | id\_00     | 2002-09-27 | 15.232455       | 10.38301       |
| 1 | id\_00     | 2002-09-28 | 92.288994       | 92.531502      |
| 2 | id\_00     | 2002-09-29 | 160.043472      | 160.722885     |
| 3 | id\_00     | 2002-09-30 | 250.03212       | 252.821899     |
| 4 | id\_00     | 2002-10-01 | 322.905182      | 324.387695     |

### Saving and loading

Once you’ve trained your model you can use the
`DistributedMLForecast.save` method to save the artifacts for inference.
Keep in mind that if you’re on a remote cluster you should set a remote
storage like S3 as the destination.

mlforecast uses
[fsspec](https://filesystem-spec.readthedocs.io/en/latest/) to handle
the different filesystems, so if you’re using s3 for example you also
need to install [s3fs](https://s3fs.readthedocs.io/en/latest/). If
you’re using pip you can just include the aws extra,
e.g. `pip install 'mlforecast[aws,ray]'`, which will install the
required dependencies to perform distributed training with ray and
saving to S3. If you’re using conda you’ll have to manually install them
(`conda install fsspec fugue ray s3fs`).

```python theme={null}
# ray
save_dir = build_unique_name('ray')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)
```

Once you’ve saved your forecast object you can then load it back by
specifying the path where it was saved along with an engine, which will
be used to perform the distributed computations (in this case the ‘ray’
string).

```python theme={null}
# ray
fcst2 = DistributedMLForecast.load(save_path, engine='ray')
```

We can verify that this object produces the same results.

```python theme={null}
# ray
preds = fa.as_pandas(fcst.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
```

### Converting to local

Another option to store your distributed forecast object is to first
turn it into a local one and then save it. Keep in mind that in order to
do that all the remote data that is stored from the series will have to
be pulled into a single machine (the scheduler in dask, driver in spark,
etc.), so you have to be sure that it’ll fit in memory, it should
consume about 2x the size of your target column (you can reduce this
further by using the `keep_last_n` argument in the `fit` method).

```python theme={null}
# ray
local_fcst = fcst.to_local()
local_preds = local_fcst.predict(7, X_df=future)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)
```

### Cross validation

```python theme={null}
# ray
cv_res = fcst.cross_validation(
    ray_series,
    n_windows=3,
    h=14,
    static_features=['static_0', 'static_1'],
).to_pandas()
```

```python theme={null}
# ray
cv_res.head()
```

|   | unique\_id | ds         | RayLGBMForecast | RayXGBForecast | cutoff     | y          |
| - | ---------- | ---------- | --------------- | -------------- | ---------- | ---------- |
| 0 | id\_05     | 2002-09-21 | 108.285187      | 108.619698     | 2002-09-12 | 108.726387 |
| 1 | id\_08     | 2002-09-16 | 26.287956       | 26.589603      | 2002-09-12 | 27.980670  |
| 2 | id\_08     | 2002-09-25 | 83.210945       | 84.194962      | 2002-09-12 | 86.344885  |
| 3 | id\_11     | 2002-09-22 | 416.994843      | 417.106506     | 2002-09-12 | 425.434661 |
| 4 | id\_16     | 2002-09-14 | 377.916382      | 375.421600     | 2002-09-12 | 400.361977 |

```python theme={null}
# ray
ray.shutdown()
```
