Spaces:
Runtime error
Runtime error
| from itertools import chain | |
| from pathlib import Path | |
| from typing import List, Optional | |
| import neuralforecast as nf | |
| import numpy as np | |
| import pandas as pd | |
| import pytorch_lightning as pl | |
| from datasetsforecast.utils import download_file | |
| from hyperopt import hp | |
| from neuralforecast.auto import NHITS as autoNHITS | |
| from neuralforecast.data.tsdataset import WindowsDataset | |
| from neuralforecast.data.tsloader import TimeSeriesLoader | |
| from neuralforecast.models.mqnhits.mqnhits import MQNHITS | |
| from neuralforecast.models.nhits.nhits import NHITS | |
| # GLOBAL PARAMETERS | |
| DEFAULT_HORIZON = 30 | |
| HYPEROPT_STEPS = 10 | |
| MAX_STEPS = 1000 | |
| N_TS_VAL = 2 * 30 | |
| MODELS = { | |
| "Pretrained N-HiTS M4 Hourly": { | |
| "card": "nhitsh", | |
| "max_steps": 0, | |
| "model": "nhits_m4_hourly", | |
| }, | |
| "Pretrained N-HiTS M4 Hourly (Tiny)": { | |
| "card": "nhitsh", | |
| "max_steps": 0, | |
| "model": "nhits_m4_hourly_tiny", | |
| }, | |
| "Pretrained N-HiTS M4 Daily": { | |
| "card": "nhitsd", | |
| "max_steps": 0, | |
| "model": "nhits_m4_daily", | |
| }, | |
| "Pretrained N-HiTS M4 Monthly": { | |
| "card": "nhitsm", | |
| "max_steps": 0, | |
| "model": "nhits_m4_monthly", | |
| }, | |
| "Pretrained N-HiTS M4 Yearly": { | |
| "card": "nhitsy", | |
| "max_steps": 0, | |
| "model": "nhits_m4_yearly", | |
| }, | |
| "Pretrained N-BEATS M4 Hourly": { | |
| "card": "nbeatsh", | |
| "max_steps": 0, | |
| "model": "nbeats_m4_hourly", | |
| }, | |
| "Pretrained N-BEATS M4 Daily": { | |
| "card": "nbeatsd", | |
| "max_steps": 0, | |
| "model": "nbeats_m4_daily", | |
| }, | |
| "Pretrained N-BEATS M4 Weekly": { | |
| "card": "nbeatsw", | |
| "max_steps": 0, | |
| "model": "nbeats_m4_weekly", | |
| }, | |
| "Pretrained N-BEATS M4 Monthly": { | |
| "card": "nbeatsm", | |
| "max_steps": 0, | |
| "model": "nbeats_m4_monthly", | |
| }, | |
| "Pretrained N-BEATS M4 Yearly": { | |
| "card": "nbeatsy", | |
| "max_steps": 0, | |
| "model": "nbeats_m4_yearly", | |
| }, | |
| } | |
| def download_models(): | |
| for _, meta in MODELS.items(): | |
| if not Path(f'./models/{meta["model"]}.ckpt').is_file(): | |
| download_file( | |
| "./models/", | |
| f'https://nixtla-public.s3.amazonaws.com/transfer/pretrained_models/{meta["model"]}.ckpt', | |
| ) | |
| download_models() | |
| class StandardScaler: | |
| """This class helps to standardize a dataframe with multiple time series.""" | |
| def __init__(self): | |
| self.norm: pd.DataFrame | |
| def fit(self, X: pd.DataFrame) -> "StandardScaler": | |
| self.norm = X.groupby("unique_id").agg({"y": [np.mean, np.std]}) | |
| self.norm = self.norm.droplevel(0, 1).reset_index() | |
| def transform(self, X: pd.DataFrame) -> pd.DataFrame: | |
| transformed = X.merge(self.norm, how="left", on=["unique_id"]) | |
| transformed["y"] = (transformed["y"] - transformed["mean"]) / transformed["std"] | |
| return transformed[["unique_id", "ds", "y"]] | |
| def inverse_transform(self, X: pd.DataFrame, cols: List[str]) -> pd.DataFrame: | |
| transformed = X.merge(self.norm, how="left", on=["unique_id"]) | |
| for col in cols: | |
| transformed[col] = ( | |
| transformed[col] * transformed["std"] + transformed["mean"] | |
| ) | |
| return transformed[["unique_id", "ds"] + cols] | |
| def compute_ds_future(Y_df, fh): | |
| if Y_df["unique_id"].nunique() == 1: | |
| ds_ = pd.to_datetime(Y_df["ds"].values) | |
| try: | |
| freq = pd.infer_freq(ds_) | |
| except: | |
| freq = None | |
| if freq is not None: | |
| ds_future = pd.date_range(ds_[-1], periods=fh + 1, freq=freq)[1:] | |
| else: | |
| freq = ds_[-1] - ds_[-2] | |
| ds_future = [ds_[-1] + (i + 1) * freq for i in range(fh)] | |
| ds_future = list(map(str, ds_future)) | |
| return ds_future | |
| else: | |
| ds_future = chain( | |
| *[compute_ds_future(df, fh) for _, df in Y_df.groupby("unique_id")] | |
| ) | |
| return list(ds_future) | |
| def forecast_pretrained_model( | |
| Y_df: pd.DataFrame, model: str, fh: int, max_steps: int = 0 | |
| ): | |
| if "unique_id" not in Y_df: | |
| Y_df.insert(0, "unique_id", "ts_1") | |
| scaler = StandardScaler() | |
| scaler.fit(Y_df) | |
| Y_df = scaler.transform(Y_df) | |
| # Model | |
| file_ = f"./models/{model}.ckpt" | |
| mqnhits = MQNHITS.load_from_checkpoint(file_) | |
| # Fit | |
| if max_steps > 0: | |
| train_dataset = WindowsDataset( | |
| Y_df=Y_df, | |
| X_df=None, | |
| S_df=None, | |
| mask_df=None, | |
| f_cols=[], | |
| input_size=mqnhits.n_time_in, | |
| output_size=mqnhits.n_time_out, | |
| sample_freq=1, | |
| complete_windows=True, | |
| verbose=False, | |
| ) | |
| train_loader = TimeSeriesLoader( | |
| dataset=train_dataset, batch_size=1, n_windows=32, shuffle=True | |
| ) | |
| trainer = pl.Trainer( | |
| max_epochs=None, | |
| checkpoint_callback=False, | |
| logger=False, | |
| max_steps=max_steps, | |
| gradient_clip_val=1.0, | |
| progress_bar_refresh_rate=1, | |
| log_every_n_steps=1, | |
| ) | |
| trainer.fit(mqnhits, train_loader) | |
| # Forecast | |
| forecast_df = mqnhits.forecast(Y_df=Y_df) | |
| forecast_df = scaler.inverse_transform(forecast_df, cols=["y_5", "y_50", "y_95"]) | |
| # Foreoast | |
| n_ts = forecast_df["unique_id"].nunique() | |
| if fh * n_ts > len(forecast_df): | |
| forecast_df = ( | |
| forecast_df.groupby("unique_id") | |
| .apply(lambda df: pd.concat([df] * fh).head(fh)) | |
| .reset_index(drop=True) | |
| ) | |
| else: | |
| forecast_df = forecast_df.groupby("unique_id").head(fh) | |
| forecast_df["ds"] = compute_ds_future(Y_df, fh) | |
| return forecast_df | |
| if __name__ == "__main__": | |
| df = pd.read_csv( | |
| "https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/ercot_COAST.csv" | |
| ) | |
| df.columns = ["ds", "y"] | |
| multi_df = pd.concat([df.assign(unique_id=f"ts{i}") for i in range(2)]) | |
| assert len(compute_ds_future(multi_df, 80)) == 2 * 80 | |
| for _, meta in MODELS.items(): | |
| # test just a time series (without unique_id) | |
| forecast = forecast_pretrained_model(df, model=meta["model"], fh=80) | |
| assert forecast.shape == (80, 5) | |
| # test multiple time series | |
| multi_forecast = forecast_pretrained_model(multi_df, model=meta["model"], fh=80) | |
| assert multi_forecast.shape == (80 * 2, 5) | |