Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
import os | |
from .ingest import processCSV | |
from scipy.stats import shapiro | |
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler | |
# https://quant.stackexchange.com/questions/64028/why-and-when-we-should-use-the-log-variable | |
def prepareData(parent_dir, data_dir, run_id): | |
# Run the function to process and merge CSV files | |
merged_data = processCSV(data_folder=os.path.join(parent_dir, 'data', data_dir)) | |
# Dataset Artifact logging | |
merged_data.to_csv(os.path.join(parent_dir, 'artifacts', run_id, 'ingested_dataset.csv')) | |
# Select columns that end with 'Close' except for 'Date' | |
close_df = pd.concat([pd.to_datetime(merged_data['Date']), merged_data.filter(regex=r'Close$')], axis=1) | |
return close_df | |
def createLag(data, amt=10): | |
""" | |
Create a lag inside dataframe, in business days | |
Input: | |
data -> Pandas dataframe | |
amt -> int | |
Output: | |
Copy of pandas Dataframe | |
""" | |
if 'Date' in data: | |
# Ensure 'Date' is a datetime column | |
data['Date'] = pd.to_datetime(data['Date'], errors='coerce') | |
# Check for any null values after conversion | |
if data['Date'].isnull().any(): | |
print("Warning: Some dates couldn't be converted to datetime.") | |
copy = data.copy() | |
# Apply the business day offset | |
copy['Date'] = copy['Date'] + pd.tseries.offsets.BusinessDay(amt) | |
return copy | |
else: | |
print("No 'Date' column found inside dataframe") | |
return data | |
def trainTestValSplit(data, test_size, val_size): | |
""" | |
Splits data into train-test-validation sets | |
Args: | |
data -> Pandas dataframe | |
test_size -> Proportion of data for test set | |
val_size -> Proportiion of data fro validation set | |
Returns: | |
This is not needed yet, actually | |
""" | |
pass | |
def logReturn(data, col): | |
""" | |
Compute log returns for a given column in the dataframe. | |
Args: | |
data (pd.DataFrame): The input dataframe containing the time series data. | |
col (str): The column name for which to compute log returns. | |
Returns: | |
pd.Series: A series containing the log returns. | |
""" | |
return np.log(data[col] / data[col].shift(1)) | |
def scaleStandard(data): | |
""" | |
Apply standard scaling to the data. | |
Args: | |
data (pd.Series): The input series to be scaled. | |
Returns: | |
pd.Series: The scaled series. | |
StandardScaler: The scaler object used for transformation. | |
""" | |
scaler = StandardScaler() | |
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1)) | |
return pd.Series(scaled_data.flatten(), index=data.index), scaler | |
def scaleRobust(data): | |
""" | |
Apply robust scaling to the data, less sensitive to outliers. | |
Args: | |
data (pd.Series): The input series to be scaled. | |
Returns: | |
pd.Series: The scaled series. | |
RobustScaler: The scaler object used for transformation. | |
""" | |
scaler = RobustScaler() | |
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1)) | |
return pd.Series(scaled_data.flatten(), index=data.index), scaler | |
def scaleMinMax(data): | |
""" | |
Apply Min-Max scaling to the data. | |
Args: | |
data (pd.Series): The input series to be scaled. | |
Returns: | |
pd.Series: The scaled series. | |
MinMaxScaler: The scaler object used for transformation. | |
""" | |
scaler = MinMaxScaler() | |
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1)) | |
return pd.Series(scaled_data.flatten(), index=data.index), scaler | |
def cap_outliers(data, z_thresh=3): | |
""" | |
Cap outliers in the data within a defined boundary. | |
Args: | |
data (pd.Series): The input series containing the data. | |
z_thresh (float): The z-score threshold to define outliers. | |
Returns: | |
pd.Series: The series with outliers capped. | |
""" | |
median = data.median() | |
std_dev = data.std() | |
lower_limit = median - z_thresh * std_dev | |
upper_limit = median + z_thresh * std_dev | |
return data.clip(lower=lower_limit, upper=upper_limit) | |
import pandas as pd | |
from pandas.tseries.holiday import USFederalHolidayCalendar | |
def calendarFeatures(df, date_col='ds'): | |
""" | |
Extracts calendar features from a date column. | |
Parameters: | |
df (pd.DataFrame): DataFrame containing the date column. | |
date_col (str): Name of the date column (default is 'ds'). | |
Returns: | |
pd.DataFrame: DataFrame with additional calendar features. | |
""" | |
# Ensure the date column is in datetime format | |
df[date_col] = pd.to_datetime(df[date_col]) | |
# Extract basic date features | |
df['day_of_week'] = df[date_col].dt.dayofweek # Monday=0, Sunday=6 | |
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) # 1 if weekend, 0 otherwise | |
df['month'] = df[date_col].dt.month | |
df['day_of_month'] = df[date_col].dt.day | |
df['quarter'] = df[date_col].dt.quarter | |
df['year'] = df[date_col].dt.year | |
# Extract holiday information using US Federal Holidays as an example (can be customized) | |
cal = USFederalHolidayCalendar() | |
holidays = cal.holidays(start=df[date_col].min(), end=df[date_col].max()) | |
df['is_holiday'] = df[date_col].isin(holidays).astype(int) # 1 if holiday, 0 otherwise | |
return df | |
def transformData(data, dir, id, y_var='brent_futures_Close', apply_log_return=False): | |
""" | |
Transform the time series data by applying various scaling and transformation techniques. | |
Args: | |
data (pd.DataFrame): The input dataframe containing the time series data. | |
apply_log_return (bool): Whether to apply log return transformation. | |
Returns: | |
data (pd.DataFrame): The transformed dataframe. | |
scalers (dict): The scaler object used for the columns | |
""" | |
data.sort_values(by='Date', inplace=True) | |
scalers = {} | |
# Apply transformations per column with checks for skewness, outliers, and distribution | |
for col in data.columns: | |
if col == 'Date': | |
continue | |
# Handle outliers by capping them rather than removing them | |
try: | |
data[col] = cap_outliers(data[col]) | |
except Exception as e: | |
print(f"Error capping outliers for '{col}': {e}") | |
# Scaling: Apply Robust scaling for skewed data and Standard scaling otherwise | |
try: | |
if data[col].skew() > 1 or data[col].skew() < -1: | |
data[col], scaler = scaleRobust(data[col]) | |
else: | |
data[col], scaler = scaleStandard(data[col]) | |
scalers[col] = scaler | |
except Exception as e: | |
print(f"Error scaling '{col}': {e}") | |
# Normality check using Shapiro-Wilk test and log transform if skewed | |
try: | |
stat, p_value = shapiro(data[col].dropna()) | |
if p_value < 0.05: # Not normally distributed | |
if data[col].min() <= 0: | |
data[col] += abs(data[col].min()) + 1 # Ensure positive values | |
data[col] = np.log1p(data[col]) | |
print(f"Applied logarithm transformation to '{col}' due to skewness.") | |
except Exception as e: | |
print(f"Error testing normality or applying log transformation to '{col}': {e}") | |
# Apply log return transformation if specified, maintaining time series order | |
if apply_log_return: | |
for col in data.columns: | |
if col != 'Date': # Avoid log returns on date column | |
try: | |
data[col] = logReturn(data, col) | |
print(f"Applied log return to '{col}'.") | |
except Exception as e: | |
print(f"Error processing log return for '{col}': {e}") | |
# Specifically handle y_var if log return transformation is requested | |
if y_var in data.columns and apply_log_return: | |
try: | |
data[y_var] = logReturn(data, y_var) | |
except Exception as e: | |
print(f"Error processing log return for y_var: {e}") | |
# Dataset Artifact logging | |
data.to_csv(os.path.join(dir, 'artifacts', id, 'transformed_dataset.csv')) | |
return data, scalers |