ryanrahmadifa
Added files
79e1719
raw
history blame
8.2 kB
import pandas as pd
import numpy as np
import os
from .ingest import processCSV
from scipy.stats import shapiro
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler
# https://quant.stackexchange.com/questions/64028/why-and-when-we-should-use-the-log-variable
def prepareData(parent_dir, data_dir, run_id):
# Run the function to process and merge CSV files
merged_data = processCSV(data_folder=os.path.join(parent_dir, 'data', data_dir))
# Dataset Artifact logging
merged_data.to_csv(os.path.join(parent_dir, 'artifacts', run_id, 'ingested_dataset.csv'))
# Select columns that end with 'Close' except for 'Date'
close_df = pd.concat([pd.to_datetime(merged_data['Date']), merged_data.filter(regex=r'Close$')], axis=1)
return close_df
def createLag(data, amt=10):
"""
Create a lag inside dataframe, in business days
Input:
data -> Pandas dataframe
amt -> int
Output:
Copy of pandas Dataframe
"""
if 'Date' in data:
# Ensure 'Date' is a datetime column
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
# Check for any null values after conversion
if data['Date'].isnull().any():
print("Warning: Some dates couldn't be converted to datetime.")
copy = data.copy()
# Apply the business day offset
copy['Date'] = copy['Date'] + pd.tseries.offsets.BusinessDay(amt)
return copy
else:
print("No 'Date' column found inside dataframe")
return data
def trainTestValSplit(data, test_size, val_size):
"""
Splits data into train-test-validation sets
Args:
data -> Pandas dataframe
test_size -> Proportion of data for test set
val_size -> Proportiion of data fro validation set
Returns:
This is not needed yet, actually
"""
pass
def logReturn(data, col):
"""
Compute log returns for a given column in the dataframe.
Args:
data (pd.DataFrame): The input dataframe containing the time series data.
col (str): The column name for which to compute log returns.
Returns:
pd.Series: A series containing the log returns.
"""
return np.log(data[col] / data[col].shift(1))
def scaleStandard(data):
"""
Apply standard scaling to the data.
Args:
data (pd.Series): The input series to be scaled.
Returns:
pd.Series: The scaled series.
StandardScaler: The scaler object used for transformation.
"""
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
return pd.Series(scaled_data.flatten(), index=data.index), scaler
def scaleRobust(data):
"""
Apply robust scaling to the data, less sensitive to outliers.
Args:
data (pd.Series): The input series to be scaled.
Returns:
pd.Series: The scaled series.
RobustScaler: The scaler object used for transformation.
"""
scaler = RobustScaler()
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
return pd.Series(scaled_data.flatten(), index=data.index), scaler
def scaleMinMax(data):
"""
Apply Min-Max scaling to the data.
Args:
data (pd.Series): The input series to be scaled.
Returns:
pd.Series: The scaled series.
MinMaxScaler: The scaler object used for transformation.
"""
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
return pd.Series(scaled_data.flatten(), index=data.index), scaler
def cap_outliers(data, z_thresh=3):
"""
Cap outliers in the data within a defined boundary.
Args:
data (pd.Series): The input series containing the data.
z_thresh (float): The z-score threshold to define outliers.
Returns:
pd.Series: The series with outliers capped.
"""
median = data.median()
std_dev = data.std()
lower_limit = median - z_thresh * std_dev
upper_limit = median + z_thresh * std_dev
return data.clip(lower=lower_limit, upper=upper_limit)
import pandas as pd
from pandas.tseries.holiday import USFederalHolidayCalendar
def calendarFeatures(df, date_col='ds'):
"""
Extracts calendar features from a date column.
Parameters:
df (pd.DataFrame): DataFrame containing the date column.
date_col (str): Name of the date column (default is 'ds').
Returns:
pd.DataFrame: DataFrame with additional calendar features.
"""
# Ensure the date column is in datetime format
df[date_col] = pd.to_datetime(df[date_col])
# Extract basic date features
df['day_of_week'] = df[date_col].dt.dayofweek # Monday=0, Sunday=6
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) # 1 if weekend, 0 otherwise
df['month'] = df[date_col].dt.month
df['day_of_month'] = df[date_col].dt.day
df['quarter'] = df[date_col].dt.quarter
df['year'] = df[date_col].dt.year
# Extract holiday information using US Federal Holidays as an example (can be customized)
cal = USFederalHolidayCalendar()
holidays = cal.holidays(start=df[date_col].min(), end=df[date_col].max())
df['is_holiday'] = df[date_col].isin(holidays).astype(int) # 1 if holiday, 0 otherwise
return df
def transformData(data, dir, id, y_var='brent_futures_Close', apply_log_return=False):
"""
Transform the time series data by applying various scaling and transformation techniques.
Args:
data (pd.DataFrame): The input dataframe containing the time series data.
apply_log_return (bool): Whether to apply log return transformation.
Returns:
data (pd.DataFrame): The transformed dataframe.
scalers (dict): The scaler object used for the columns
"""
data.sort_values(by='Date', inplace=True)
scalers = {}
# Apply transformations per column with checks for skewness, outliers, and distribution
for col in data.columns:
if col == 'Date':
continue
# Handle outliers by capping them rather than removing them
try:
data[col] = cap_outliers(data[col])
except Exception as e:
print(f"Error capping outliers for '{col}': {e}")
# Scaling: Apply Robust scaling for skewed data and Standard scaling otherwise
try:
if data[col].skew() > 1 or data[col].skew() < -1:
data[col], scaler = scaleRobust(data[col])
else:
data[col], scaler = scaleStandard(data[col])
scalers[col] = scaler
except Exception as e:
print(f"Error scaling '{col}': {e}")
# Normality check using Shapiro-Wilk test and log transform if skewed
try:
stat, p_value = shapiro(data[col].dropna())
if p_value < 0.05: # Not normally distributed
if data[col].min() <= 0:
data[col] += abs(data[col].min()) + 1 # Ensure positive values
data[col] = np.log1p(data[col])
print(f"Applied logarithm transformation to '{col}' due to skewness.")
except Exception as e:
print(f"Error testing normality or applying log transformation to '{col}': {e}")
# Apply log return transformation if specified, maintaining time series order
if apply_log_return:
for col in data.columns:
if col != 'Date': # Avoid log returns on date column
try:
data[col] = logReturn(data, col)
print(f"Applied log return to '{col}'.")
except Exception as e:
print(f"Error processing log return for '{col}': {e}")
# Specifically handle y_var if log return transformation is requested
if y_var in data.columns and apply_log_return:
try:
data[y_var] = logReturn(data, y_var)
except Exception as e:
print(f"Error processing log return for y_var: {e}")
# Dataset Artifact logging
data.to_csv(os.path.join(dir, 'artifacts', id, 'transformed_dataset.csv'))
return data, scalers