The standard DataLoader class used by NeuralForecast expects the dataset to be represented by a single DataFrame, which is entirely loaded into memory when fitting the model. However, when the dataset is too large for this, we can instead use the custom large-scale DataLoader. This custom loader assumes that each timeseries is split across a collection of Parquet files, and ensure that only one batch is ever loaded into memory at a given time.

In this notebook, we will demonstrate the expected format of these files, how to train the model and and how to perform inference using this large-scale DataLoader.

Load libraries

import logging
import os
import tempfile

import numpy as nppackage_name
import pandas as pd

from neuralforecast import NeuralForecast
from neuralforecast.models import NHITS
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mae, rmse, smape
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
logging.getLogger('pytorch_lightning').setLevel(logging.ERROR)
os.environ['NIXTLA_ID_AS_COL'] = '1'

Data

Each timeseries should be stored in a directory named unique_id=timeseries_id. Within this directory, the timeseries can be entirely contained in a single Parquet file or split across multiple Parquet files. Regardless of the format, the timeseries must be ordered by time.

For example, the following code splits the AirPassengers DataFrame (of which each timeseries is already sorted by time) into the below format:



>  data
    >  unique_id=Airline1
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet
    >  unique_id=Airline2
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet


We then simply input a list of the paths to these directories.

Y_df = AirPassengersPanel.copy()
Y_df
unique_iddsytrendy_[lag12]
0Airline11949-01-31112.00112.0
1Airline11949-02-28118.01118.0
2Airline11949-03-31132.02132.0
3Airline11949-04-30129.03129.0
4Airline11949-05-31121.04121.0
283Airline21960-08-31906.0283859.0
284Airline21960-09-30808.0284763.0
285Airline21960-10-31761.0285707.0
286Airline21960-11-30690.0286662.0
287Airline21960-12-31732.0287705.0
valid = Y_df.groupby('unique_id').tail(72)
# from now on we will use the id_col as the unique identifier for the timeseries (this is because we are using the unique_id column to partition the data into parquet files)
valid = valid.rename(columns={'unique_id': 'id_col'})

train = Y_df.drop(valid.index)
train['id_col'] = train['unique_id'].copy()

# we generate the files using a temporary directory here to demonstrate the expected file structure
tmpdir = tempfile.TemporaryDirectory()
train.to_parquet(tmpdir.name, partition_cols=['unique_id'], index=False)
files_list = [f"{tmpdir.name}/{dir}" for dir in os.listdir(tmpdir.name)]
files_list
['/tmp/tmp9cy2qa36/unique_id=Airline2', '/tmp/tmp9cy2qa36/unique_id=Airline1']

You can also create this directory structure with a spark dataframe using the following:

"""
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
(
  spark_df
  .repartition(id_col)
  .sortWithinPartitions(id_col, time_col)
  .write
  .partitionBy(id_col)
  .parquet(out_dir)
)
"""

The DataLoader class still expects the static data to be passed in as a single DataFrame with one row per timeseries.

static = AirPassengersStatic.rename(columns={'unique_id': 'id_col'})
static
id_colairline1airline2
0Airline101
1Airline210

Model training

We now train a NHITS model on the above dataset. It is worth noting that NeuralForecast currently does not support scaling when using this DataLoader. If you want to scale the timeseries this should be done before passing it in to the fit method.

horizon = 12
stacks = 3
models = [NHITS(input_size=5 * horizon,
                h=horizon,
                futr_exog_list=['trend', 'y_[lag12]'],
                stat_exog_list=['airline1', 'airline2'],
                max_steps=100,
                stack_types = stacks*['identity'],
                n_blocks = stacks*[1],
                mlp_units = [[256,256] for _ in range(stacks)],
                n_pool_kernel_size = stacks*[1])]
nf = NeuralForecast(models=models, freq='ME')
nf.fit(df=files_list, static_df=static, id_col='id_col')
Seed set to 1
2024-07-23 11:46:22.369481: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-23 11:46:22.402269: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-07-23 11:46:22.934767: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
Training: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]

Forecasting

When working with large datasets, we need to provide a single DataFrame containing the input timesteps of all the timeseries for which wish to generate predictions. If we have future exogenous features, we should also include the future values of these features in the separate futr_df DataFrame.

For the below prediction we are assuming we only want to predict the next 12 timesteps for Airline2.

valid_df = valid[valid['id_col'] == 'Airline2']
# we set input_size=60 and horizon=12 when fitting the model
pred_df = valid_df[:60]
futr_df = valid_df[60:72]
futr_df = futr_df.drop(["y"], axis=1)

predictions = nf.predict(df=pred_df, futr_df=futr_df, static_df=static)
Predicting: |          | 0/? [00:00<?, ?it/s]
predictions
id_coldsNHITS
0Airline21960-01-31710.602417
1Airline21960-02-29688.900879
2Airline21960-03-31758.637573
3Airline21960-04-30748.974365
4Airline21960-05-31753.558655
5Airline21960-06-30801.517822
6Airline21960-07-31863.835449
7Airline21960-08-31847.854980
8Airline21960-09-30797.115845
9Airline21960-10-31748.879761
10Airline21960-11-30707.076233
11Airline21960-12-31747.851685

Evaluation

target = valid_df[60:72]
evaluate(
    predictions.merge(target.drop(["trend", "y_[lag12]"], axis=1), on=['id_col', 'ds']),
    metrics=[mae, rmse, smape],
    id_col='id_col',
    agg_fn='mean',
)
metricNHITS
0mae23.693777
1rmse29.992256
2smape0.014734