import pandas as pd import numpy as np import os from .ingest import processCSV from scipy.stats import shapiro from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler # https://quant.stackexchange.com/questions/64028/why-and-when-we-should-use-the-log-variable def prepareData(parent_dir, data_dir, run_id): # Run the function to process and merge CSV files merged_data = processCSV(data_folder=os.path.join(parent_dir, 'data', data_dir)) # Dataset Artifact logging merged_data.to_csv(os.path.join(parent_dir, 'artifacts', run_id, 'ingested_dataset.csv')) # Select columns that end with 'Close' except for 'Date' close_df = pd.concat([pd.to_datetime(merged_data['Date']), merged_data.filter(regex=r'Close$')], axis=1) return close_df def createLag(data, amt=10): """ Create a lag inside dataframe, in business days Input: data -> Pandas dataframe amt -> int Output: Copy of pandas Dataframe """ if 'Date' in data: # Ensure 'Date' is a datetime column data['Date'] = pd.to_datetime(data['Date'], errors='coerce') # Check for any null values after conversion if data['Date'].isnull().any(): print("Warning: Some dates couldn't be converted to datetime.") copy = data.copy() # Apply the business day offset copy['Date'] = copy['Date'] + pd.tseries.offsets.BusinessDay(amt) return copy else: print("No 'Date' column found inside dataframe") return data def trainTestValSplit(data, test_size, val_size): """ Splits data into train-test-validation sets Args: data -> Pandas dataframe test_size -> Proportion of data for test set val_size -> Proportiion of data fro validation set Returns: This is not needed yet, actually """ pass def logReturn(data, col): """ Compute log returns for a given column in the dataframe. Args: data (pd.DataFrame): The input dataframe containing the time series data. col (str): The column name for which to compute log returns. Returns: pd.Series: A series containing the log returns. """ return np.log(data[col] / data[col].shift(1)) def scaleStandard(data): """ Apply standard scaling to the data. Args: data (pd.Series): The input series to be scaled. Returns: pd.Series: The scaled series. StandardScaler: The scaler object used for transformation. """ scaler = StandardScaler() scaled_data = scaler.fit_transform(data.values.reshape(-1, 1)) return pd.Series(scaled_data.flatten(), index=data.index), scaler def scaleRobust(data): """ Apply robust scaling to the data, less sensitive to outliers. Args: data (pd.Series): The input series to be scaled. Returns: pd.Series: The scaled series. RobustScaler: The scaler object used for transformation. """ scaler = RobustScaler() scaled_data = scaler.fit_transform(data.values.reshape(-1, 1)) return pd.Series(scaled_data.flatten(), index=data.index), scaler def scaleMinMax(data): """ Apply Min-Max scaling to the data. Args: data (pd.Series): The input series to be scaled. Returns: pd.Series: The scaled series. MinMaxScaler: The scaler object used for transformation. """ scaler = MinMaxScaler() scaled_data = scaler.fit_transform(data.values.reshape(-1, 1)) return pd.Series(scaled_data.flatten(), index=data.index), scaler def cap_outliers(data, z_thresh=3): """ Cap outliers in the data within a defined boundary. Args: data (pd.Series): The input series containing the data. z_thresh (float): The z-score threshold to define outliers. Returns: pd.Series: The series with outliers capped. """ median = data.median() std_dev = data.std() lower_limit = median - z_thresh * std_dev upper_limit = median + z_thresh * std_dev return data.clip(lower=lower_limit, upper=upper_limit) import pandas as pd from pandas.tseries.holiday import USFederalHolidayCalendar def calendarFeatures(df, date_col='ds'): """ Extracts calendar features from a date column. Parameters: df (pd.DataFrame): DataFrame containing the date column. date_col (str): Name of the date column (default is 'ds'). Returns: pd.DataFrame: DataFrame with additional calendar features. """ # Ensure the date column is in datetime format df[date_col] = pd.to_datetime(df[date_col]) # Extract basic date features df['day_of_week'] = df[date_col].dt.dayofweek # Monday=0, Sunday=6 df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) # 1 if weekend, 0 otherwise df['month'] = df[date_col].dt.month df['day_of_month'] = df[date_col].dt.day df['quarter'] = df[date_col].dt.quarter df['year'] = df[date_col].dt.year # Extract holiday information using US Federal Holidays as an example (can be customized) cal = USFederalHolidayCalendar() holidays = cal.holidays(start=df[date_col].min(), end=df[date_col].max()) df['is_holiday'] = df[date_col].isin(holidays).astype(int) # 1 if holiday, 0 otherwise return df def transformData(data, dir, id, y_var='brent_futures_Close', apply_log_return=False): """ Transform the time series data by applying various scaling and transformation techniques. Args: data (pd.DataFrame): The input dataframe containing the time series data. apply_log_return (bool): Whether to apply log return transformation. Returns: data (pd.DataFrame): The transformed dataframe. scalers (dict): The scaler object used for the columns """ data.sort_values(by='Date', inplace=True) scalers = {} # Apply transformations per column with checks for skewness, outliers, and distribution for col in data.columns: if col == 'Date': continue # Handle outliers by capping them rather than removing them try: data[col] = cap_outliers(data[col]) except Exception as e: print(f"Error capping outliers for '{col}': {e}") # Scaling: Apply Robust scaling for skewed data and Standard scaling otherwise try: if data[col].skew() > 1 or data[col].skew() < -1: data[col], scaler = scaleRobust(data[col]) else: data[col], scaler = scaleStandard(data[col]) scalers[col] = scaler except Exception as e: print(f"Error scaling '{col}': {e}") # Normality check using Shapiro-Wilk test and log transform if skewed try: stat, p_value = shapiro(data[col].dropna()) if p_value < 0.05: # Not normally distributed if data[col].min() <= 0: data[col] += abs(data[col].min()) + 1 # Ensure positive values data[col] = np.log1p(data[col]) print(f"Applied logarithm transformation to '{col}' due to skewness.") except Exception as e: print(f"Error testing normality or applying log transformation to '{col}': {e}") # Apply log return transformation if specified, maintaining time series order if apply_log_return: for col in data.columns: if col != 'Date': # Avoid log returns on date column try: data[col] = logReturn(data, col) print(f"Applied log return to '{col}'.") except Exception as e: print(f"Error processing log return for '{col}': {e}") # Specifically handle y_var if log return transformation is requested if y_var in data.columns and apply_log_return: try: data[y_var] = logReturn(data, y_var) except Exception as e: print(f"Error processing log return for y_var: {e}") # Dataset Artifact logging data.to_csv(os.path.join(dir, 'artifacts', id, 'transformed_dataset.csv')) return data, scalers