The standard DataLoader class used by NeuralForecast expects the dataset to be represented by a single DataFrame, which is entirely loaded into memory when fitting the model. However, when the dataset is too large for this, we can instead use the custom large-scale DataLoader. This custom loader assumes that each timeseries is split across a collection of Parquet files, and ensure that only one batch is ever loaded into memory at a given time.

In this notebook, we will demonstrate the expected format of these files, how to train the model and and how to perform inference using this large-scale DataLoader.

Load libraries

import logging
import os
import tempfile

import pandas as pd

from neuralforecast import NeuralForecast
from neuralforecast.models import NHITS
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mae, rmse, smape
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
logging.getLogger('pytorch_lightning').setLevel(logging.ERROR)

Data

Each timeseries should be stored in a directory named unique_id=timeseries_id. Within this directory, the timeseries can be entirely contained in a single Parquet file or split across multiple Parquet files. Regardless of the format, the timeseries must be ordered by time.

For example, the following code splits the AirPassengers DataFrame (of which each timeseries is already sorted by time) into the below format:



>  data
    >  unique_id=Airline1
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet
    >  unique_id=Airline2
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet


We then simply input a list of the paths to these directories.

Y_df = AirPassengersPanel.copy()
Y_df
unique_iddsytrendy_[lag12]
0Airline11949-01-31112.00112.0
1Airline11949-02-28118.01118.0
2Airline11949-03-31132.02132.0
3Airline11949-04-30129.03129.0
4Airline11949-05-31121.04121.0
283Airline21960-08-31906.0283859.0
284Airline21960-09-30808.0284763.0
285Airline21960-10-31761.0285707.0
286Airline21960-11-30690.0286662.0
287Airline21960-12-31732.0287705.0
valid = Y_df.groupby('unique_id').tail(72)
# from now on we will use the id_col as the unique identifier for the timeseries (this is because we are using the unique_id column to partition the data into parquet files)
valid = valid.rename(columns={'unique_id': 'id_col'})

train = Y_df.drop(valid.index)
train['id_col'] = train['unique_id'].copy()

# we generate the files using a temporary directory here to demonstrate the expected file structure
tmpdir = tempfile.TemporaryDirectory()
train.to_parquet(tmpdir.name, partition_cols=['unique_id'], index=False)
files_list = [f"{tmpdir.name}/{dir}" for dir in os.listdir(tmpdir.name)]
files_list
['C:\\Users\\ospra\\AppData\\Local\\Temp\\tmpxe__gjoo/unique_id=Airline1',
 'C:\\Users\\ospra\\AppData\\Local\\Temp\\tmpxe__gjoo/unique_id=Airline2']

You can also create this directory structure with a spark dataframe using the following:

spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
(
  spark_df
  .repartition(id_col)
  .sortWithinPartitions(id_col, time_col)
  .write
  .partitionBy(id_col)
  .parquet(out_dir)
)

The DataLoader class still expects the static data to be passed in as a single DataFrame with one row per timeseries.

static = AirPassengersStatic.rename(columns={'unique_id': 'id_col'})
static
id_colairline1airline2
0Airline101
1Airline210

Model training

We now train a NHITS model on the above dataset. It is worth noting that NeuralForecast currently does not support scaling when using this DataLoader. If you want to scale the timeseries this should be done before passing it in to the fit method.

horizon = 12
stacks = 3
models = [NHITS(input_size=5 * horizon,
                h=horizon,
                futr_exog_list=['trend', 'y_[lag12]'],
                stat_exog_list=['airline1', 'airline2'],
                max_steps=100,
                stack_types = stacks*['identity'],
                n_blocks = stacks*[1],
                mlp_units = [[256,256] for _ in range(stacks)],
                n_pool_kernel_size = stacks*[1],
                interpolation_mode="nearest")]
nf = NeuralForecast(models=models, freq='ME')
nf.fit(df=files_list, static_df=static, id_col='id_col')
Seed set to 1
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
Training: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]

Forecasting

When working with large datasets, we need to provide a single DataFrame containing the input timesteps of all the timeseries for which wish to generate predictions. If we have future exogenous features, we should also include the future values of these features in the separate futr_df DataFrame.

For the below prediction we are assuming we only want to predict the next 12 timesteps for Airline2.

valid_df = valid[valid['id_col'] == 'Airline2']
# we set input_size=60 and horizon=12 when fitting the model
pred_df = valid_df[:60]
futr_df = valid_df[60:72]
futr_df = futr_df.drop(["y"], axis=1)

predictions = nf.predict(df=pred_df, futr_df=futr_df, static_df=static)
Predicting: |          | 0/? [00:00<?, ?it/s]
predictions
id_coldsNHITS
0Airline21960-01-31713.441406
1Airline21960-02-29688.176880
2Airline21960-03-31763.382935
3Airline21960-04-30745.478027
4Airline21960-05-31758.036438
5Airline21960-06-30806.288574
6Airline21960-07-31869.563782
7Airline21960-08-31858.105896
8Airline21960-09-30803.531555
9Airline21960-10-31751.093079
10Airline21960-11-30700.435852
11Airline21960-12-31746.640259

Evaluation

target = valid_df[60:72]
evaluate(
    predictions.merge(target.drop(["trend", "y_[lag12]"], axis=1), on=['id_col', 'ds']),
    metrics=[mae, rmse, smape],
    id_col='id_col',
    agg_fn='mean',
)
metricNHITS
0mae20.728617
1rmse26.980698
2smape0.012879