Spaces:
Sleeping
Sleeping
File size: 8,196 Bytes
79e1719 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
import pandas as pd
import numpy as np
import os
from .ingest import processCSV
from scipy.stats import shapiro
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler
# https://quant.stackexchange.com/questions/64028/why-and-when-we-should-use-the-log-variable
def prepareData(parent_dir, data_dir, run_id):
# Run the function to process and merge CSV files
merged_data = processCSV(data_folder=os.path.join(parent_dir, 'data', data_dir))
# Dataset Artifact logging
merged_data.to_csv(os.path.join(parent_dir, 'artifacts', run_id, 'ingested_dataset.csv'))
# Select columns that end with 'Close' except for 'Date'
close_df = pd.concat([pd.to_datetime(merged_data['Date']), merged_data.filter(regex=r'Close$')], axis=1)
return close_df
def createLag(data, amt=10):
"""
Create a lag inside dataframe, in business days
Input:
data -> Pandas dataframe
amt -> int
Output:
Copy of pandas Dataframe
"""
if 'Date' in data:
# Ensure 'Date' is a datetime column
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
# Check for any null values after conversion
if data['Date'].isnull().any():
print("Warning: Some dates couldn't be converted to datetime.")
copy = data.copy()
# Apply the business day offset
copy['Date'] = copy['Date'] + pd.tseries.offsets.BusinessDay(amt)
return copy
else:
print("No 'Date' column found inside dataframe")
return data
def trainTestValSplit(data, test_size, val_size):
"""
Splits data into train-test-validation sets
Args:
data -> Pandas dataframe
test_size -> Proportion of data for test set
val_size -> Proportiion of data fro validation set
Returns:
This is not needed yet, actually
"""
pass
def logReturn(data, col):
"""
Compute log returns for a given column in the dataframe.
Args:
data (pd.DataFrame): The input dataframe containing the time series data.
col (str): The column name for which to compute log returns.
Returns:
pd.Series: A series containing the log returns.
"""
return np.log(data[col] / data[col].shift(1))
def scaleStandard(data):
"""
Apply standard scaling to the data.
Args:
data (pd.Series): The input series to be scaled.
Returns:
pd.Series: The scaled series.
StandardScaler: The scaler object used for transformation.
"""
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
return pd.Series(scaled_data.flatten(), index=data.index), scaler
def scaleRobust(data):
"""
Apply robust scaling to the data, less sensitive to outliers.
Args:
data (pd.Series): The input series to be scaled.
Returns:
pd.Series: The scaled series.
RobustScaler: The scaler object used for transformation.
"""
scaler = RobustScaler()
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
return pd.Series(scaled_data.flatten(), index=data.index), scaler
def scaleMinMax(data):
"""
Apply Min-Max scaling to the data.
Args:
data (pd.Series): The input series to be scaled.
Returns:
pd.Series: The scaled series.
MinMaxScaler: The scaler object used for transformation.
"""
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
return pd.Series(scaled_data.flatten(), index=data.index), scaler
def cap_outliers(data, z_thresh=3):
"""
Cap outliers in the data within a defined boundary.
Args:
data (pd.Series): The input series containing the data.
z_thresh (float): The z-score threshold to define outliers.
Returns:
pd.Series: The series with outliers capped.
"""
median = data.median()
std_dev = data.std()
lower_limit = median - z_thresh * std_dev
upper_limit = median + z_thresh * std_dev
return data.clip(lower=lower_limit, upper=upper_limit)
import pandas as pd
from pandas.tseries.holiday import USFederalHolidayCalendar
def calendarFeatures(df, date_col='ds'):
"""
Extracts calendar features from a date column.
Parameters:
df (pd.DataFrame): DataFrame containing the date column.
date_col (str): Name of the date column (default is 'ds').
Returns:
pd.DataFrame: DataFrame with additional calendar features.
"""
# Ensure the date column is in datetime format
df[date_col] = pd.to_datetime(df[date_col])
# Extract basic date features
df['day_of_week'] = df[date_col].dt.dayofweek # Monday=0, Sunday=6
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) # 1 if weekend, 0 otherwise
df['month'] = df[date_col].dt.month
df['day_of_month'] = df[date_col].dt.day
df['quarter'] = df[date_col].dt.quarter
df['year'] = df[date_col].dt.year
# Extract holiday information using US Federal Holidays as an example (can be customized)
cal = USFederalHolidayCalendar()
holidays = cal.holidays(start=df[date_col].min(), end=df[date_col].max())
df['is_holiday'] = df[date_col].isin(holidays).astype(int) # 1 if holiday, 0 otherwise
return df
def transformData(data, dir, id, y_var='brent_futures_Close', apply_log_return=False):
"""
Transform the time series data by applying various scaling and transformation techniques.
Args:
data (pd.DataFrame): The input dataframe containing the time series data.
apply_log_return (bool): Whether to apply log return transformation.
Returns:
data (pd.DataFrame): The transformed dataframe.
scalers (dict): The scaler object used for the columns
"""
data.sort_values(by='Date', inplace=True)
scalers = {}
# Apply transformations per column with checks for skewness, outliers, and distribution
for col in data.columns:
if col == 'Date':
continue
# Handle outliers by capping them rather than removing them
try:
data[col] = cap_outliers(data[col])
except Exception as e:
print(f"Error capping outliers for '{col}': {e}")
# Scaling: Apply Robust scaling for skewed data and Standard scaling otherwise
try:
if data[col].skew() > 1 or data[col].skew() < -1:
data[col], scaler = scaleRobust(data[col])
else:
data[col], scaler = scaleStandard(data[col])
scalers[col] = scaler
except Exception as e:
print(f"Error scaling '{col}': {e}")
# Normality check using Shapiro-Wilk test and log transform if skewed
try:
stat, p_value = shapiro(data[col].dropna())
if p_value < 0.05: # Not normally distributed
if data[col].min() <= 0:
data[col] += abs(data[col].min()) + 1 # Ensure positive values
data[col] = np.log1p(data[col])
print(f"Applied logarithm transformation to '{col}' due to skewness.")
except Exception as e:
print(f"Error testing normality or applying log transformation to '{col}': {e}")
# Apply log return transformation if specified, maintaining time series order
if apply_log_return:
for col in data.columns:
if col != 'Date': # Avoid log returns on date column
try:
data[col] = logReturn(data, col)
print(f"Applied log return to '{col}'.")
except Exception as e:
print(f"Error processing log return for '{col}': {e}")
# Specifically handle y_var if log return transformation is requested
if y_var in data.columns and apply_log_return:
try:
data[y_var] = logReturn(data, y_var)
except Exception as e:
print(f"Error processing log return for y_var: {e}")
# Dataset Artifact logging
data.to_csv(os.path.join(dir, 'artifacts', id, 'transformed_dataset.csv'))
return data, scalers |