Ray is an open source unified compute framework to scale Python workloads. In this guide, we will explain how to use TimeGPT on top of Ray.

Outline:

  1. Installation

  2. Load Your Data

  3. Initialize Ray

  4. Use TimeGPT on Ray

  5. Shutdown Ray

1. Installation

Install Ray through Fugue. Fugue provides an easy-to-use interface for distributed computing that lets users execute Python code on top of several distributed computing frameworks, including Ray.

If executing on a distributed Ray cluster, ensure that the nixtla library is installed across all the workers.

2. Load Data

You can load your data as a pandas DataFrame. In this tutorial, we will use a dataset that contains hourly electricity prices from different markets.

import pandas as pd 

df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv') 
df.head()
unique_iddsy
0BE2016-10-22 00:00:0070.00
1BE2016-10-22 01:00:0037.10
2BE2016-10-22 02:00:0037.10
3BE2016-10-22 03:00:0044.75
4BE2016-10-22 04:00:0037.10

3. Initialize Ray

Initialize Ray and convert the pandas DataFrame to a Ray DataFrame.

import ray
from ray.cluster_utils import Cluster

ray_cluster = Cluster(
    initialize_head=True,
    head_node_args={"num_cpus": 2}
)
ray.init(address=ray_cluster.address, ignore_reinit_error=True)
2024-04-26 14:47:26,225 WARNING cluster_utils.py:121 -- Ray cluster mode is currently experimental and untested on Windows. If you are using it and running into issues please file a report at https://github.com/ray-project/ray/issues.
2024-04-26 14:47:28,636 INFO utils.py:108 -- Overwriting previous Ray address (127.0.0.1:64941). Running ray.init() on this node will now connect to the new instance at 127.0.0.1:60031. To override this behavior, pass address=127.0.0.1:64941 to ray.init().
2024-04-26 14:47:28,636 INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 127.0.0.1:60031...
2024-04-26 14:47:28,647 INFO worker.py:1621 -- Connected to Ray cluster.
Python version:3.10.14
Ray version:2.6.2
ray_df = ray.data.from_pandas(df)
ray_df
MaterializedDataset(
   num_blocks=1,
   num_rows=8400,
   schema={unique_id: object, ds: object, y: float64}
)

4. Use TimeGPT on Ray

Using TimeGPT on top of Ray is almost identical to the non-distributed case. The only difference is that you need to use a Ray DataFrame.

First, instantiate the NixtlaClient class.

from nixtla import NixtlaClient
nixtla_client = NixtlaClient(
    # defaults to os.environ.get("NIXTLA_API_KEY")
    api_key = 'my_api_key_provided_by_nixtla'
)

Then use any method from the NixtlaClient class such as forecast or cross_validation.

fcst_df = nixtla_client.forecast(ray_df, h=12)
2024-04-26 14:47:31,617 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
2024-04-26 14:47:31,618 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-04-26 14:47:31,619 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2024-04-26 14:47:32,114 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_coarse_key)] -> LimitOperator[limit=1]
2024-04-26 14:47:32,114 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-04-26 14:47:32,115 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2024-04-26 14:47:32,166 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_coarse_key)] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]
2024-04-26 14:47:32,167 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2024-04-26 14:47:32,167 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`

To visualize the result, use the to_pandas method to convert the output of Ray to a pandas DataFrame.

fcst_df.to_pandas().tail()
unique_iddsTimeGPT
55NP2018-12-24 07:00:0053.784847
56NP2018-12-24 08:00:0054.437321
57NP2018-12-24 09:00:0054.66077
58NP2018-12-24 10:00:0054.744473
59NP2018-12-24 11:00:0054.737762
cv_df = nixtla_client.cross_validation(ray_df, h=12, freq='H', n_windows=5, step_size=2)
2024-04-26 14:47:40,202 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
2024-04-26 14:47:40,202 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-04-26 14:47:40,202 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2024-04-26 14:47:40,261 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_coarse_key)] -> LimitOperator[limit=1]
2024-04-26 14:47:40,261 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-04-26 14:47:40,262 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2024-04-26 14:47:40,305 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_coarse_key)] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]
2024-04-26 14:47:40,306 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2024-04-26 14:47:40,306 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
cv_df.to_pandas().tail()
(MapBatches(group_fn) pid=12096) INFO:nixtla.nixtla_client:Validating inputs...
unique_iddscutoffTimeGPT
295NP2018-12-23 19:00:002018-12-23 11:00:0053.441555
296NP2018-12-23 20:00:002018-12-23 11:00:0052.649628
297NP2018-12-23 21:00:002018-12-23 11:00:0051.753975
298NP2018-12-23 22:00:002018-12-23 11:00:0050.681946
299NP2018-12-23 23:00:002018-12-23 11:00:0049.716431

You can also use exogenous variables with TimeGPT on top of Ray. To do this, please refer to the Exogenous Variables tutorial. Just keep in mind that instead of using a pandas DataFrame, you need to use a Ray DataFrame instead.

5. Shutdown Ray

When you are done, shutdown the Ray session.

ray.shutdown()