> ## Documentation Index
> Fetch the complete documentation index at: https://nixtlaverse.nixtla.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Using Large Datasets

> Tutorial on how to train neuralforecast models on datasets that cannot
> fit into memory

The standard DataLoader class used by NeuralForecast expects the dataset
to be represented by a single DataFrame, which is entirely loaded into
memory when fitting the model. However, when the dataset is too large
for this, we can instead use the custom large-scale DataLoader. This
custom loader assumes that each timeseries is split across a collection
of Parquet files, and ensure that only one batch is ever loaded into
memory at a given time.

In this notebook, we will demonstrate the expected format of these
files, how to train the model and and how to perform inference using
this large-scale DataLoader.

## Load libraries

```python theme={null}
import logging
import os
import tempfile

import pandas as pd

from neuralforecast import NeuralForecast
from neuralforecast.models import NHITS
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mae, rmse, smape
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
```

```python theme={null}
logging.getLogger('pytorch_lightning').setLevel(logging.ERROR)
```

## Data

Each timeseries should be stored in a directory named
**unique\_id=timeseries\_id**. Within this directory, the timeseries can
be entirely contained in a single Parquet file or split across multiple
Parquet files. Regardless of the format, the timeseries must be ordered
by time.

For example, the following code splits the AirPassengers DataFrame (of
which each timeseries is already sorted by time) into the below format:

<br />

<br />

**>**  data\
    **>**  unique\_id=Airline1\
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet\
    **>**  unique\_id=Airline2\
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet

<br /> We then simply input a list of the paths to these directories.

```python theme={null}
Y_df = AirPassengersPanel.copy()
Y_df
```

|     | unique\_id | ds         | y     | trend | y\_\[lag12] |
| --- | ---------- | ---------- | ----- | ----- | ----------- |
| 0   | Airline1   | 1949-01-31 | 112.0 | 0     | 112.0       |
| 1   | Airline1   | 1949-02-28 | 118.0 | 1     | 118.0       |
| 2   | Airline1   | 1949-03-31 | 132.0 | 2     | 132.0       |
| 3   | Airline1   | 1949-04-30 | 129.0 | 3     | 129.0       |
| 4   | Airline1   | 1949-05-31 | 121.0 | 4     | 121.0       |
| ... | ...        | ...        | ...   | ...   | ...         |
| 283 | Airline2   | 1960-08-31 | 906.0 | 283   | 859.0       |
| 284 | Airline2   | 1960-09-30 | 808.0 | 284   | 763.0       |
| 285 | Airline2   | 1960-10-31 | 761.0 | 285   | 707.0       |
| 286 | Airline2   | 1960-11-30 | 690.0 | 286   | 662.0       |
| 287 | Airline2   | 1960-12-31 | 732.0 | 287   | 705.0       |

```python theme={null}
valid = Y_df.groupby('unique_id').tail(72)
# from now on we will use the id_col as the unique identifier for the timeseries (this is because we are using the unique_id column to partition the data into parquet files)
valid = valid.rename(columns={'unique_id': 'id_col'})

train = Y_df.drop(valid.index)
train['id_col'] = train['unique_id'].copy()

# we generate the files using a temporary directory here to demonstrate the expected file structure
tmpdir = tempfile.TemporaryDirectory()
train.to_parquet(tmpdir.name, partition_cols=['unique_id'], index=False)
files_list = [f"{tmpdir.name}/{dir}" for dir in os.listdir(tmpdir.name)]
files_list
```

```text theme={null}
['C:\\Users\\ospra\\AppData\\Local\\Temp\\tmpxe__gjoo/unique_id=Airline1',
 'C:\\Users\\ospra\\AppData\\Local\\Temp\\tmpxe__gjoo/unique_id=Airline2']
```

You can also create this directory structure with a spark dataframe
using the following:

```python theme={null}
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
(
  spark_df
  .repartition(id_col)
  .sortWithinPartitions(id_col, time_col)
  .write
  .partitionBy(id_col)
  .parquet(out_dir)
)
```

The DataLoader class still expects the static data to be passed in as a
single DataFrame with one row per timeseries.

```python theme={null}
static = AirPassengersStatic.rename(columns={'unique_id': 'id_col'})
static
```

|   | id\_col  | airline1 | airline2 |
| - | -------- | -------- | -------- |
| 0 | Airline1 | 0        | 1        |
| 1 | Airline2 | 1        | 0        |

## Model training

We now train a NHITS model on the above dataset. It is worth noting that
NeuralForecast currently does not support scaling when using this
DataLoader. If you want to scale the timeseries this should be done
before passing it in to the `fit` method.

```python theme={null}
horizon = 12
stacks = 3
models = [NHITS(input_size=5 * horizon,
                h=horizon,
                futr_exog_list=['trend', 'y_[lag12]'],
                stat_exog_list=['airline1', 'airline2'],
                max_steps=100,
                stack_types = stacks*['identity'],
                n_blocks = stacks*[1],
                mlp_units = [[256,256] for _ in range(stacks)],
                n_pool_kernel_size = stacks*[1],
                interpolation_mode="nearest")]
nf = NeuralForecast(models=models, freq='ME')
nf.fit(df=files_list, static_df=static, id_col='id_col')
```

```text theme={null}
Seed set to 1
```

```text theme={null}
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
```

```text theme={null}
Training: |          | 0/? [00:00<?, ?it/s]
```

```text theme={null}
Validation: |          | 0/? [00:00<?, ?it/s]
```

## Forecasting

When working with large datasets, we need to provide a single DataFrame
containing the input timesteps of all the timeseries for which wish to
generate predictions. If we have future exogenous features, we should
also include the future values of these features in the separate
`futr_df` DataFrame.

For the below prediction we are assuming we only want to predict the
next 12 timesteps for Airline2.

```python theme={null}
valid_df = valid[valid['id_col'] == 'Airline2']
# we set input_size=60 and horizon=12 when fitting the model
pred_df = valid_df[:60]
futr_df = valid_df[60:72]
futr_df = futr_df.drop(["y"], axis=1)

predictions = nf.predict(df=pred_df, futr_df=futr_df, static_df=static)
```

```text theme={null}
Predicting: |          | 0/? [00:00<?, ?it/s]
```

```python theme={null}
predictions
```

|    | id\_col  | ds         | NHITS      |
| -- | -------- | ---------- | ---------- |
| 0  | Airline2 | 1960-01-31 | 713.441406 |
| 1  | Airline2 | 1960-02-29 | 688.176880 |
| 2  | Airline2 | 1960-03-31 | 763.382935 |
| 3  | Airline2 | 1960-04-30 | 745.478027 |
| 4  | Airline2 | 1960-05-31 | 758.036438 |
| 5  | Airline2 | 1960-06-30 | 806.288574 |
| 6  | Airline2 | 1960-07-31 | 869.563782 |
| 7  | Airline2 | 1960-08-31 | 858.105896 |
| 8  | Airline2 | 1960-09-30 | 803.531555 |
| 9  | Airline2 | 1960-10-31 | 751.093079 |
| 10 | Airline2 | 1960-11-30 | 700.435852 |
| 11 | Airline2 | 1960-12-31 | 746.640259 |

## Evaluation

```python theme={null}
target = valid_df[60:72]
```

```python theme={null}
evaluate(
    predictions.merge(target.drop(["trend", "y_[lag12]"], axis=1), on=['id_col', 'ds']),
    metrics=[mae, rmse, smape],
    id_col='id_col',
    agg_fn='mean',
)
```

|   | metric | NHITS     |
| - | ------ | --------- |
| 0 | mae    | 20.728617 |
| 1 | rmse   | 26.980698 |
| 2 | smape  | 0.012879  |
