Using Large Datasets
Tutorial on how to train neuralforecast models on datasets that cannot fit into memory
The standard DataLoader class used by NeuralForecast expects the dataset to be represented by a single DataFrame, which is entirely loaded into memory when fitting the model. However, when the dataset is too large for this, we can instead use the custom large-scale DataLoader. This custom loader assumes that each timeseries is split across a collection of Parquet files, and ensure that only one batch is ever loaded into memory at a given time.
In this notebook, we will demonstrate the expected format of these files, how to train the model and and how to perform inference using this large-scale DataLoader.
Load libraries
import logging
import os
import tempfile
import numpy as nppackage_name
import pandas as pd
from neuralforecast import NeuralForecast
from neuralforecast.models import NHITS
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mae, rmse, smape
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
logging.getLogger('pytorch_lightning').setLevel(logging.ERROR)
os.environ['NIXTLA_ID_AS_COL'] = '1'
Data
Each timeseries should be stored in a directory named unique_id=timeseries_id. Within this directory, the timeseries can be entirely contained in a single Parquet file or split across multiple Parquet files. Regardless of the format, the timeseries must be ordered by time.
For example, the following code splits the AirPassengers DataFrame (of which each timeseries is already sorted by time) into the below format:
> data
> unique_id=Airline1
- a59945617fdb40d1bc6caa4aadad881c-0.parquet
> unique_id=Airline2
- a59945617fdb40d1bc6caa4aadad881c-0.parquet
We then simply input a list of the paths to these directories.
Y_df = AirPassengersPanel.copy()
Y_df
unique_id | ds | y | trend | y_[lag12] | |
---|---|---|---|---|---|
0 | Airline1 | 1949-01-31 | 112.0 | 0 | 112.0 |
1 | Airline1 | 1949-02-28 | 118.0 | 1 | 118.0 |
2 | Airline1 | 1949-03-31 | 132.0 | 2 | 132.0 |
3 | Airline1 | 1949-04-30 | 129.0 | 3 | 129.0 |
4 | Airline1 | 1949-05-31 | 121.0 | 4 | 121.0 |
… | … | … | … | … | … |
283 | Airline2 | 1960-08-31 | 906.0 | 283 | 859.0 |
284 | Airline2 | 1960-09-30 | 808.0 | 284 | 763.0 |
285 | Airline2 | 1960-10-31 | 761.0 | 285 | 707.0 |
286 | Airline2 | 1960-11-30 | 690.0 | 286 | 662.0 |
287 | Airline2 | 1960-12-31 | 732.0 | 287 | 705.0 |
valid = Y_df.groupby('unique_id').tail(72)
# from now on we will use the id_col as the unique identifier for the timeseries (this is because we are using the unique_id column to partition the data into parquet files)
valid = valid.rename(columns={'unique_id': 'id_col'})
train = Y_df.drop(valid.index)
train['id_col'] = train['unique_id'].copy()
# we generate the files using a temporary directory here to demonstrate the expected file structure
tmpdir = tempfile.TemporaryDirectory()
train.to_parquet(tmpdir.name, partition_cols=['unique_id'], index=False)
files_list = [f"{tmpdir.name}/{dir}" for dir in os.listdir(tmpdir.name)]
files_list
['/tmp/tmp9cy2qa36/unique_id=Airline2', '/tmp/tmp9cy2qa36/unique_id=Airline1']
You can also create this directory structure with a spark dataframe using the following:
"""
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
(
spark_df
.repartition(id_col)
.sortWithinPartitions(id_col, time_col)
.write
.partitionBy(id_col)
.parquet(out_dir)
)
"""
The DataLoader class still expects the static data to be passed in as a single DataFrame with one row per timeseries.
static = AirPassengersStatic.rename(columns={'unique_id': 'id_col'})
static
id_col | airline1 | airline2 | |
---|---|---|---|
0 | Airline1 | 0 | 1 |
1 | Airline2 | 1 | 0 |
Model training
We now train a NHITS model on the above dataset. It is worth noting that
NeuralForecast currently does not support scaling when using this
DataLoader. If you want to scale the timeseries this should be done
before passing it in to the fit
method.
horizon = 12
stacks = 3
models = [NHITS(input_size=5 * horizon,
h=horizon,
futr_exog_list=['trend', 'y_[lag12]'],
stat_exog_list=['airline1', 'airline2'],
max_steps=100,
stack_types = stacks*['identity'],
n_blocks = stacks*[1],
mlp_units = [[256,256] for _ in range(stacks)],
n_pool_kernel_size = stacks*[1])]
nf = NeuralForecast(models=models, freq='ME')
nf.fit(df=files_list, static_df=static, id_col='id_col')
Seed set to 1
2024-07-23 11:46:22.369481: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-23 11:46:22.402269: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-07-23 11:46:22.934767: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
Sanity Checking: | | 0/? [00:00<?, ?it/s]
Training: | | 0/? [00:00<?, ?it/s]
Validation: | | 0/? [00:00<?, ?it/s]
Forecasting
When working with large datasets, we need to provide a single DataFrame
containing the input timesteps of all the timeseries for which wish to
generate predictions. If we have future exogenous features, we should
also include the future values of these features in the separate
futr_df
DataFrame.
For the below prediction we are assuming we only want to predict the next 12 timesteps for Airline2.
valid_df = valid[valid['id_col'] == 'Airline2']
# we set input_size=60 and horizon=12 when fitting the model
pred_df = valid_df[:60]
futr_df = valid_df[60:72]
futr_df = futr_df.drop(["y"], axis=1)
predictions = nf.predict(df=pred_df, futr_df=futr_df, static_df=static)
Predicting: | | 0/? [00:00<?, ?it/s]
predictions
id_col | ds | NHITS | |
---|---|---|---|
0 | Airline2 | 1960-01-31 | 710.602417 |
1 | Airline2 | 1960-02-29 | 688.900879 |
2 | Airline2 | 1960-03-31 | 758.637573 |
3 | Airline2 | 1960-04-30 | 748.974365 |
4 | Airline2 | 1960-05-31 | 753.558655 |
5 | Airline2 | 1960-06-30 | 801.517822 |
6 | Airline2 | 1960-07-31 | 863.835449 |
7 | Airline2 | 1960-08-31 | 847.854980 |
8 | Airline2 | 1960-09-30 | 797.115845 |
9 | Airline2 | 1960-10-31 | 748.879761 |
10 | Airline2 | 1960-11-30 | 707.076233 |
11 | Airline2 | 1960-12-31 | 747.851685 |
Evaluation
target = valid_df[60:72]
evaluate(
predictions.merge(target.drop(["trend", "y_[lag12]"], axis=1), on=['id_col', 'ds']),
metrics=[mae, rmse, smape],
id_col='id_col',
agg_fn='mean',
)
metric | NHITS | |
---|---|---|
0 | mae | 23.693777 |
1 | rmse | 29.992256 |
2 | smape | 0.014734 |