from neuralforecast.losses.pytorch import MAE from neuralforecast.auto import AutoNHITS, AutoTSMixer, AutoiTransformer, AutoTSMixerx, NBEATSx from neuralforecast import NeuralForecast from modules.transform import transformData, prepareData, calendarFeatures, createLag from pytorch_lightning.loggers import CSVLogger import uuid from pytorch_lightning import Trainer import os import pandas as pd import pickle from pathlib import Path def trainModel(dataset, artifacts_path, variate='uni', y_var='brent_futures_Close', horizon_len=5, val_size=0.1, test_size=0.1, lag_amt=0): # Maybe change y_var to an input? (via User Interaction) # Code for univariate time-series forecasting if variate == 'univariate': from neuralforecast.auto import AutoNHITS, AutoTSMixer, AutoTSMixerx, AutoNBEATSx Y_df = dataset.rename({'Date' : 'ds', y_var : 'y'}, axis=1) Y_df['unique_id'] = 0 # TestValSplit len_data = len(Y_df.ds.unique()) val_size = int(.1 * len_data) test_size = int(.1 * len_data) Y_df['ds'] = pd.to_datetime(Y_df['ds']) Y_df = Y_df[['ds', 'y', 'unique_id']] Y_df = calendarFeatures(Y_df) Y_df.to_csv(os.path.join(artifacts_path, 'train_data.csv')) print(f'Total length is {len_data}, with validation and test size of {val_size} for each') import optuna optuna.logging.set_verbosity(optuna.logging.WARNING) # Use this to disable training prints from optuna def config_nhits(horizon_len, trial): return { "max_steps": 1000, # Number of SGD steps "input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), # Size of input window "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), # Initial Learning rate "n_pool_kernel_size": trial.suggest_categorical("n_pool_kernel_size", [[2, 2, 2], [16, 8, 1]]), # MaxPool's Kernel size "n_freq_downsample": trial.suggest_categorical("n_freq_downsample", [[168, 24, 1], [24, 12, 1]]), # Interpolation expressivity ratios "val_check_steps": 50, # Compute validation every 50 steps "early_stop_patience_steps": 5, # Stops at 5 steps max if loss doesn't get beter "random_seed": trial.suggest_int("random_seed", 1, 10), # Random seed } def config_tsmixer(horizon_len, trial): return { "max_steps": 1000, "n_series" : 1, "input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), "ff_dim": trial.suggest_categorical("ff_dim", [64,128]), "n_block": trial.suggest_categorical("n_block", [4,8]), "val_check_steps": 50, "early_stop_patience_steps": 5, "scaler_type": 'identity', } def config_nbeatsx(horizon_len, trial): return { "max_steps": 1000, # Number of SGD steps "futr_exog_list": ['day_of_week', 'is_weekend', 'month', 'day_of_month', 'quarter', 'year', 'is_holiday'], "input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), # Size of input window "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), # Initial Learning rate # Interpolation expressivity ratios "val_check_steps": 50, # Compute validation every 50 steps "early_stop_patience_steps": 5, # Stops at 5 steps max if loss doesn't get beter "random_seed": trial.suggest_int("random_seed", 1, 10), # Random seed } def config_tsmixerx(horizon_len, trial): return { "max_steps": 1000, "futr_exog_list": ['day_of_week', 'is_weekend', 'month', 'day_of_month', 'quarter', 'year', 'is_holiday'], "n_series" : 1, "input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), "ff_dim": trial.suggest_categorical("ff_dim", [64,128]), "n_block": trial.suggest_categorical("n_block", [4,8]), "val_check_steps": 50, "early_stop_patience_steps": 5, "scaler_type": 'identity', } model = [AutoNHITS(h=horizon_len, loss=MAE(), valid_loss=MAE(), config=lambda trial: config_nhits(horizon_len, trial), search_alg=optuna.samplers.TPESampler(), backend='optuna', num_samples=10), AutoTSMixer(h=horizon_len, n_series=1, loss=MAE(), valid_loss=MAE(), config=lambda trial: config_tsmixer(horizon_len, trial), search_alg=optuna.samplers.TPESampler(), backend='optuna', num_samples=10), AutoNBEATSx(h=horizon_len, loss=MAE(), valid_loss=MAE(), config=lambda trial: config_nbeatsx(horizon_len, trial), search_alg=optuna.samplers.TPESampler(), backend='optuna', num_samples=10), AutoTSMixerx(h=horizon_len, n_series=1, loss=MAE(), valid_loss=MAE(), config=lambda trial: config_tsmixerx(horizon_len, trial), search_alg=optuna.samplers.TPESampler(), backend='optuna', num_samples=10)] # Set up custom logger to change logging directory log_dir = os.path.join(artifacts_path, 'training_logs') # Setting logger environment if applicable - for illustrative purposes Trainer.default_root_dir = log_dir logger = CSVLogger(save_dir=log_dir, name='forecast_logs') nf = NeuralForecast(models=model, freq='B') nf.fit(df=Y_df, val_size=val_size) results = nf.models[1].results.trials_dataframe() results.drop(columns='user_attrs_ALL_PARAMS') return nf, results # Code for multivariate time-series forecasting if variate == 'multivariate': from neuralforecast.auto import AutoTSMixer, AutoiTransformer Y_df = dataset.melt(id_vars=['Date'], var_name='unique_id', value_name='y') Y_df = Y_df.rename({'Date' : 'ds'}, axis=1) # TestValSplit len_data = len(Y_df.ds.unique()) val_size = int(.1 * len_data) test_size = int(.1 * len_data) Y_df['ds'] = pd.to_datetime(Y_df['ds']) Y_df.to_csv(os.path.join(artifacts_path, 'train_data.csv')) print(f'Total length is {len_data}, with validation and test size of {val_size} for each') import optuna optuna.logging.set_verbosity(optuna.logging.WARNING) # Use this to disable training prints from optuna def config_autoitransformer(horizon_len, trial): return { "max_steps": 1000, "n_series" : Y_df['unique_id'].nunique(), # Number of SGD steps "input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), # Size of input window "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), # Initial Learning rate "hidden_size": trial.suggest_categorical("hidden_size", [128, 256]), # MaxPool's Kernel size "n_heads": trial.suggest_categorical("n_heads", [2,4]), # Interpolation expressivity ratios "e_layers": trial.suggest_categorical("e_layers", [2,4]), "val_check_steps": 50, # Compute validation every 50 steps "early_stop_patience_steps": 5, # Stops at 5 steps max if loss doesn't get beter "random_seed": trial.suggest_int("random_seed", 1, 10), # Random seed } def config_tsmixer(horizon_len, trial): return { "max_steps": 1000, "n_series" : Y_df['unique_id'].nunique(), "input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), "ff_dim": trial.suggest_categorical("ff_dim", [64,128]), "n_block": trial.suggest_categorical("n_block", [4,8]), "val_check_steps": 50, "early_stop_patience_steps": 5, "scaler_type": 'identity', } model = [AutoiTransformer(h=horizon_len, n_series=Y_df['unique_id'].nunique(), loss=MAE(), valid_loss=MAE(), config=lambda trial: config_autoitransformer(horizon_len, trial), search_alg=optuna.samplers.TPESampler(), backend='optuna', num_samples=10), AutoTSMixer(h=horizon_len, n_series=Y_df['unique_id'].nunique(), loss=MAE(), valid_loss=MAE(), config=lambda trial: config_tsmixer(horizon_len, trial), search_alg=optuna.samplers.TPESampler(), backend='optuna', num_samples=10)] # Set up custom logger to change logging directory log_dir = os.path.join(artifacts_path, 'training_logs') # Setting logger environment if applicable - for illustrative purposes Trainer.default_root_dir = log_dir logger = CSVLogger(save_dir=log_dir, name='forecast_logs') nf = NeuralForecast(models=model, freq='B') nf.fit(df=Y_df, val_size=val_size) results = nf.models[1].results.trials_dataframe() results.drop(columns='user_attrs_ALL_PARAMS') return nf, results def main(): import logging directory = Path(__file__).parent.absolute() logging.basicConfig(level=logging.INFO) data_dir = 'crude_oil' # Should be choosable later on? run_id = str(f'{data_dir}_{str(uuid.uuid4())}') artifacts_path = os.path.join(directory, 'artifacts', run_id) logging.info(f'Created forecasting pipeline with id {run_id}') os.mkdir(artifacts_path) prepared_data = prepareData(parent_dir=directory, data_dir=data_dir, run_id=run_id) train_data, transformations = transformData(prepared_data, dir=directory, id=run_id) train_data.to_csv(os.path.join(artifacts_path, 'transformed_dataset.csv')) # Save transformations including StandardScaler objects with open(os.path.join(artifacts_path, 'transformations.pkl'), 'wb') as fp: pickle.dump(transformations, fp) nf, results = trainModel(dataset=train_data, variate='univariate', artifacts_path=artifacts_path) results.to_csv(os.path.join(artifacts_path, 'training_results.csv')) nf.save(path=os.path.join(artifacts_path, 'model'), model_index=None, overwrite=True, save_dataset=True) if __name__ == "__main__": main()