Skip to main content

Torch Time Series Dataset

TimeSeriesLoader

TimeSeriesLoader(dataset, **kwargs)
Bases: DataLoader TimeSeriesLoader DataLoader. Small change to PyTorch’s Data loader. Combines a dataset and a sampler, and provides an iterable over the given dataset. The class ~torch.utils.data.DataLoader supports both map-style and iterable-style datasets with single- or multi-process loading, customizing loading order and optional automatic batching (collation) and memory pinning. Parameters:
NameTypeDescriptionDefault
datasetDataset to load data from.required
batch_sizeintHow many samples per batch to load. Defaults to 1.required
shuffleboolSet to True to have the data reshuffled at every epoch. Defaults to False.required
samplerSampler or IterableDefines the strategy to draw samples from the dataset.required
drop_lastboolSet to True to drop the last incomplete batch. Defaults to False.required
**kwargsAdditional keyword arguments for DataLoader.

BaseTimeSeriesDataset

BaseTimeSeriesDataset(
    temporal_cols, max_size, min_size, y_idx, static=None, static_cols=None
)
Bases: Dataset Base class for time series datasets. Parameters:
NameTypeDescriptionDefault
temporal_colsColumn names for temporal features.required
max_sizeintMaximum size of time series.required
min_sizeintMinimum size of time series.required
y_idxintIndex of target variable.required
staticOptionalStatic features array.None
static_colsOptionalColumn names for static features.None

LocalFilesTimeSeriesDataset

LocalFilesTimeSeriesDataset(
    files_ds,
    temporal_cols,
    id_col,
    time_col,
    target_col,
    last_times,
    indices,
    max_size,
    min_size,
    y_idx,
    static=None,
    static_cols=None,
)
Bases: BaseTimeSeriesDataset Time series dataset that loads data from local files. Parameters:
NameTypeDescriptionDefault
files_dsList[str]List of file paths.required
temporal_colsColumn names for temporal features.required
id_colstrName of ID column.required
time_colstrName of time column.required
target_colstrName of target column.required
last_timesLast time for each time series.required
indicesSeries indices.required
max_sizeintMaximum size of time series.required
min_sizeintMinimum size of time series.required
y_idxintIndex of target variable.required
staticOptionalStatic features array.None
static_colsOptionalColumn names for static features.None

LocalFilesTimeSeriesDataset.from_data_directories

from_data_directories(
    directories,
    static_df=None,
    exogs=[],
    id_col="unique_id",
    time_col="ds",
    target_col="y",
)
Create dataset from data directories. Expects directories to be a list of directories of the form [unique_id=id_0, unique_id=id_1, …]. Each directory should contain the timeseries corresponding to that unique_id, represented as a pandas or polars DataFrame. The timeseries can be entirely contained in one parquet file or split between multiple, but within each parquet files the timeseries should be sorted by time. Parameters:
NameTypeDescriptionDefault
directoriesList of directory paths.required
static_dfOptionalStatic features DataFrame.None
exogsListList of exogenous variable names. Defaults to [].[]
id_colstrName of ID column. Defaults to “unique_id”.‘unique_id’
time_colstrName of time column. Defaults to “ds”.‘ds’
target_colstrName of target column. Defaults to “y”.‘y’
Returns:
NameTypeDescription
LocalFilesTimeSeriesDatasetDataset created from directories.

TimeSeriesDataset

TimeSeriesDataset(
    temporal, temporal_cols, indptr, y_idx, static=None, static_cols=None
)
Bases: BaseTimeSeriesDataset Time series dataset implementation. Parameters:
NameTypeDescriptionDefault
temporalTemporal data array.required
temporal_colsColumn names for temporal features.required
indptrIndex pointers for time series grouping.required
y_idxintIndex of target variable.required
staticOptionalStatic features array.None
static_colsOptionalColumn names for static features.None

TimeSeriesDataset.append

append(futr_dataset)
Add future observations to the dataset. Parameters:
NameTypeDescriptionDefault
futr_datasetTimeSeriesDatasetFuture dataset to append.required
Returns:
NameTypeDescription
TimeSeriesDatasetTimeSeriesDatasetCopy of dataset with future observations appended.
Raises:
TypeDescription
ValueErrorIf datasets have different number of groups.

TimeSeriesDataset.trim_dataset

trim_dataset(dataset, left_trim=0, right_trim=0)
Trim temporal information from a dataset. Returns temporal indexes [t+left:t-right] for all series. Parameters:
NameTypeDescriptionDefault
datasetDataset to trim.required
left_trimintNumber of observations to trim from the left. Defaults to 0.0
right_trimintNumber of observations to trim from the right. Defaults to 0.0
Returns:
NameTypeDescription
TimeSeriesDatasetTrimmed dataset.
Raises:
TypeDescription
ExceptionIf trim size exceeds minimum series length.

TimeSeriesDataModule

TimeSeriesDataModule(
    dataset,
    batch_size=32,
    valid_batch_size=1024,
    drop_last=False,
    shuffle_train=True,
    **dataloaders_kwargs
)
Bases: LightningDataModule PyTorch Lightning data module for time series datasets. Parameters:
NameTypeDescriptionDefault
datasetBaseTimeSeriesDatasetTime series dataset.required
batch_sizeintBatch size for training. Defaults to 32.32
valid_batch_sizeintBatch size for validation. Defaults to 1024.1024
drop_lastboolWhether to drop the last incomplete batch. Defaults to False.False
shuffle_trainboolWhether to shuffle training data. Defaults to True.True
**dataloaders_kwargsAdditional keyword arguments for data loaders.

Example

import lightning.pytorch as L
import torch.utils.data as data
from pytorch_lightning.demos.boring_classes import RandomDataset

class MyDataModule(L.LightningDataModule):
    def prepare_data(self):
        # download, IO, etc. Useful with shared filesystems
        # only called on 1 GPU/TPU in distributed
        ...

    def setup(self, stage):
        # make assignments here (val/train/test split)
        # called on every process in DDP
        dataset = RandomDataset(1, 100)
        self.train, self.val, self.test = data.random_split(
            dataset, [80, 10, 10], generator=torch.Generator().manual_seed(42)
        )

    def train_dataloader(self):
        return data.DataLoader(self.train)

    def val_dataloader(self):
        return data.DataLoader(self.val)

    def test_dataloader(self):
        return data.DataLoader(self.test)

    def on_exception(self, exception):
        # clean up state after the trainer faced an exception
        ...

    def teardown(self):
        # clean up state after the trainer stops, delete files...
        # called on every process in DDP
        ...*
# To test correct future_df wrangling of the `update_df` method
# We are checking that we are able to recover the AirPassengers dataset
# using the dataframe or splitting it into parts and initializing.