Spaces:
Sleeping
Sleeping
from neuralforecast.losses.pytorch import MAE | |
from neuralforecast.auto import AutoNHITS, AutoTSMixer, AutoiTransformer, AutoTSMixerx, NBEATSx | |
from neuralforecast import NeuralForecast | |
from modules.transform import transformData, prepareData, calendarFeatures, createLag | |
from pytorch_lightning.loggers import CSVLogger | |
import uuid | |
from pytorch_lightning import Trainer | |
import os | |
import pandas as pd | |
import pickle | |
from pathlib import Path | |
def trainModel(dataset, | |
artifacts_path, | |
variate='uni', | |
y_var='brent_futures_Close', | |
horizon_len=5, | |
val_size=0.1, | |
test_size=0.1, | |
lag_amt=0): # Maybe change y_var to an input? (via User Interaction) | |
# Code for univariate time-series forecasting | |
if variate == 'univariate': | |
from neuralforecast.auto import AutoNHITS, AutoTSMixer, AutoTSMixerx, AutoNBEATSx | |
Y_df = dataset.rename({'Date' : 'ds', y_var : 'y'}, axis=1) | |
Y_df['unique_id'] = 0 | |
# TestValSplit | |
len_data = len(Y_df.ds.unique()) | |
val_size = int(.1 * len_data) | |
test_size = int(.1 * len_data) | |
Y_df['ds'] = pd.to_datetime(Y_df['ds']) | |
Y_df = Y_df[['ds', 'y', 'unique_id']] | |
Y_df = calendarFeatures(Y_df) | |
Y_df.to_csv(os.path.join(artifacts_path, 'train_data.csv')) | |
print(f'Total length is {len_data}, with validation and test size of {val_size} for each') | |
import optuna | |
optuna.logging.set_verbosity(optuna.logging.WARNING) # Use this to disable training prints from optuna | |
def config_nhits(horizon_len, trial): | |
return { | |
"max_steps": 1000, # Number of SGD steps | |
"input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), # Size of input window | |
"learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), # Initial Learning rate | |
"n_pool_kernel_size": trial.suggest_categorical("n_pool_kernel_size", [[2, 2, 2], [16, 8, 1]]), # MaxPool's Kernel size | |
"n_freq_downsample": trial.suggest_categorical("n_freq_downsample", [[168, 24, 1], [24, 12, 1]]), # Interpolation expressivity ratios | |
"val_check_steps": 50, # Compute validation every 50 steps | |
"early_stop_patience_steps": 5, # Stops at 5 steps max if loss doesn't get beter | |
"random_seed": trial.suggest_int("random_seed", 1, 10), # Random seed | |
} | |
def config_tsmixer(horizon_len, trial): | |
return { | |
"max_steps": 1000, | |
"n_series" : 1, | |
"input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), | |
"learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), | |
"ff_dim": trial.suggest_categorical("ff_dim", [64,128]), | |
"n_block": trial.suggest_categorical("n_block", [4,8]), | |
"val_check_steps": 50, | |
"early_stop_patience_steps": 5, | |
"scaler_type": 'identity', | |
} | |
def config_nbeatsx(horizon_len, trial): | |
return { | |
"max_steps": 1000, # Number of SGD steps | |
"futr_exog_list": ['day_of_week', 'is_weekend', 'month', 'day_of_month', 'quarter', 'year', 'is_holiday'], | |
"input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), # Size of input window | |
"learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), # Initial Learning rate # Interpolation expressivity ratios | |
"val_check_steps": 50, # Compute validation every 50 steps | |
"early_stop_patience_steps": 5, # Stops at 5 steps max if loss doesn't get beter | |
"random_seed": trial.suggest_int("random_seed", 1, 10), # Random seed | |
} | |
def config_tsmixerx(horizon_len, trial): | |
return { | |
"max_steps": 1000, | |
"futr_exog_list": ['day_of_week', 'is_weekend', 'month', 'day_of_month', 'quarter', 'year', 'is_holiday'], | |
"n_series" : 1, | |
"input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), | |
"learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), | |
"ff_dim": trial.suggest_categorical("ff_dim", [64,128]), | |
"n_block": trial.suggest_categorical("n_block", [4,8]), | |
"val_check_steps": 50, | |
"early_stop_patience_steps": 5, | |
"scaler_type": 'identity', | |
} | |
model = [AutoNHITS(h=horizon_len, | |
loss=MAE(), | |
valid_loss=MAE(), | |
config=lambda trial: config_nhits(horizon_len, trial), | |
search_alg=optuna.samplers.TPESampler(), | |
backend='optuna', | |
num_samples=10), | |
AutoTSMixer(h=horizon_len, | |
n_series=1, | |
loss=MAE(), | |
valid_loss=MAE(), | |
config=lambda trial: config_tsmixer(horizon_len, trial), | |
search_alg=optuna.samplers.TPESampler(), | |
backend='optuna', | |
num_samples=10), | |
AutoNBEATSx(h=horizon_len, | |
loss=MAE(), | |
valid_loss=MAE(), | |
config=lambda trial: config_nbeatsx(horizon_len, trial), | |
search_alg=optuna.samplers.TPESampler(), | |
backend='optuna', | |
num_samples=10), | |
AutoTSMixerx(h=horizon_len, | |
n_series=1, | |
loss=MAE(), | |
valid_loss=MAE(), | |
config=lambda trial: config_tsmixerx(horizon_len, trial), | |
search_alg=optuna.samplers.TPESampler(), | |
backend='optuna', | |
num_samples=10)] | |
# Set up custom logger to change logging directory | |
log_dir = os.path.join(artifacts_path, 'training_logs') | |
# Setting logger environment if applicable - for illustrative purposes | |
Trainer.default_root_dir = log_dir | |
logger = CSVLogger(save_dir=log_dir, name='forecast_logs') | |
nf = NeuralForecast(models=model, freq='B') | |
nf.fit(df=Y_df, val_size=val_size) | |
results = nf.models[1].results.trials_dataframe() | |
results.drop(columns='user_attrs_ALL_PARAMS') | |
return nf, results | |
# Code for multivariate time-series forecasting | |
if variate == 'multivariate': | |
from neuralforecast.auto import AutoTSMixer, AutoiTransformer | |
Y_df = dataset.melt(id_vars=['Date'], var_name='unique_id', value_name='y') | |
Y_df = Y_df.rename({'Date' : 'ds'}, axis=1) | |
# TestValSplit | |
len_data = len(Y_df.ds.unique()) | |
val_size = int(.1 * len_data) | |
test_size = int(.1 * len_data) | |
Y_df['ds'] = pd.to_datetime(Y_df['ds']) | |
Y_df.to_csv(os.path.join(artifacts_path, 'train_data.csv')) | |
print(f'Total length is {len_data}, with validation and test size of {val_size} for each') | |
import optuna | |
optuna.logging.set_verbosity(optuna.logging.WARNING) # Use this to disable training prints from optuna | |
def config_autoitransformer(horizon_len, trial): | |
return { | |
"max_steps": 1000, | |
"n_series" : Y_df['unique_id'].nunique(), # Number of SGD steps | |
"input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), # Size of input window | |
"learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), # Initial Learning rate | |
"hidden_size": trial.suggest_categorical("hidden_size", [128, 256]), # MaxPool's Kernel size | |
"n_heads": trial.suggest_categorical("n_heads", [2,4]), # Interpolation expressivity ratios | |
"e_layers": trial.suggest_categorical("e_layers", [2,4]), | |
"val_check_steps": 50, # Compute validation every 50 steps | |
"early_stop_patience_steps": 5, # Stops at 5 steps max if loss doesn't get beter | |
"random_seed": trial.suggest_int("random_seed", 1, 10), # Random seed | |
} | |
def config_tsmixer(horizon_len, trial): | |
return { | |
"max_steps": 1000, | |
"n_series" : Y_df['unique_id'].nunique(), | |
"input_size" : trial.suggest_categorical("input_size", [horizon_len, horizon_len*2]), | |
"learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-1), | |
"ff_dim": trial.suggest_categorical("ff_dim", [64,128]), | |
"n_block": trial.suggest_categorical("n_block", [4,8]), | |
"val_check_steps": 50, | |
"early_stop_patience_steps": 5, | |
"scaler_type": 'identity', | |
} | |
model = [AutoiTransformer(h=horizon_len, | |
n_series=Y_df['unique_id'].nunique(), | |
loss=MAE(), | |
valid_loss=MAE(), | |
config=lambda trial: config_autoitransformer(horizon_len, trial), | |
search_alg=optuna.samplers.TPESampler(), | |
backend='optuna', | |
num_samples=10), | |
AutoTSMixer(h=horizon_len, | |
n_series=Y_df['unique_id'].nunique(), | |
loss=MAE(), | |
valid_loss=MAE(), | |
config=lambda trial: config_tsmixer(horizon_len, trial), | |
search_alg=optuna.samplers.TPESampler(), | |
backend='optuna', | |
num_samples=10)] | |
# Set up custom logger to change logging directory | |
log_dir = os.path.join(artifacts_path, 'training_logs') | |
# Setting logger environment if applicable - for illustrative purposes | |
Trainer.default_root_dir = log_dir | |
logger = CSVLogger(save_dir=log_dir, name='forecast_logs') | |
nf = NeuralForecast(models=model, freq='B') | |
nf.fit(df=Y_df, val_size=val_size) | |
results = nf.models[1].results.trials_dataframe() | |
results.drop(columns='user_attrs_ALL_PARAMS') | |
return nf, results | |
def main(): | |
import logging | |
directory = Path(__file__).parent.absolute() | |
logging.basicConfig(level=logging.INFO) | |
data_dir = 'crude_oil' # Should be choosable later on? | |
run_id = str(f'{data_dir}_{str(uuid.uuid4())}') | |
artifacts_path = os.path.join(directory, 'artifacts', run_id) | |
logging.info(f'Created forecasting pipeline with id {run_id}') | |
os.mkdir(artifacts_path) | |
prepared_data = prepareData(parent_dir=directory, data_dir=data_dir, run_id=run_id) | |
train_data, transformations = transformData(prepared_data, dir=directory, id=run_id) | |
train_data.to_csv(os.path.join(artifacts_path, 'transformed_dataset.csv')) | |
# Save transformations including StandardScaler objects | |
with open(os.path.join(artifacts_path, 'transformations.pkl'), 'wb') as fp: | |
pickle.dump(transformations, fp) | |
nf, results = trainModel(dataset=train_data, variate='univariate', artifacts_path=artifacts_path) | |
results.to_csv(os.path.join(artifacts_path, 'training_results.csv')) | |
nf.save(path=os.path.join(artifacts_path, 'model'), | |
model_index=None, | |
overwrite=True, | |
save_dataset=True) | |
if __name__ == "__main__": | |
main() |