File size: 8,196 Bytes
79e1719
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import pandas as pd
import numpy as np
import os
from .ingest import processCSV
from scipy.stats import shapiro
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler

# https://quant.stackexchange.com/questions/64028/why-and-when-we-should-use-the-log-variable

def prepareData(parent_dir, data_dir, run_id):

    # Run the function to process and merge CSV files
    merged_data = processCSV(data_folder=os.path.join(parent_dir, 'data', data_dir))

    # Dataset Artifact logging
    merged_data.to_csv(os.path.join(parent_dir, 'artifacts', run_id, 'ingested_dataset.csv'))

    # Select columns that end with 'Close' except for 'Date'
    close_df = pd.concat([pd.to_datetime(merged_data['Date']), merged_data.filter(regex=r'Close$')], axis=1)

    return close_df

def createLag(data, amt=10):
    """
    Create a lag inside dataframe, in business days

    Input:
        data -> Pandas dataframe 
        amt -> int

    Output:
        Copy of pandas Dataframe
    """
    if 'Date' in data:
        # Ensure 'Date' is a datetime column
        data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
        
        # Check for any null values after conversion
        if data['Date'].isnull().any():
            print("Warning: Some dates couldn't be converted to datetime.")

        copy = data.copy()
        # Apply the business day offset
        copy['Date'] = copy['Date'] + pd.tseries.offsets.BusinessDay(amt)
        return copy
    else:
        print("No 'Date' column found inside dataframe")
        return data

def trainTestValSplit(data, test_size, val_size):
    """
    Splits data into train-test-validation sets

    Args:
        data -> Pandas dataframe
        test_size -> Proportion of data for test set
        val_size -> Proportiion of data fro validation set

    Returns:
        This is not needed yet, actually
    """
    pass

def logReturn(data, col):
    """
    Compute log returns for a given column in the dataframe.

    Args:
        data (pd.DataFrame): The input dataframe containing the time series data.
        col (str): The column name for which to compute log returns.

    Returns:
        pd.Series: A series containing the log returns.
    """
    return np.log(data[col] / data[col].shift(1))

def scaleStandard(data):
    """
    Apply standard scaling to the data.

    Args:
        data (pd.Series): The input series to be scaled.

    Returns:
        pd.Series: The scaled series.
        StandardScaler: The scaler object used for transformation.
    """
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
    return pd.Series(scaled_data.flatten(), index=data.index), scaler

def scaleRobust(data):
    """
    Apply robust scaling to the data, less sensitive to outliers.

    Args:
        data (pd.Series): The input series to be scaled.

    Returns:
        pd.Series: The scaled series.
        RobustScaler: The scaler object used for transformation.
    """
    scaler = RobustScaler()
    scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
    return pd.Series(scaled_data.flatten(), index=data.index), scaler

def scaleMinMax(data):
    """
    Apply Min-Max scaling to the data.

    Args:
        data (pd.Series): The input series to be scaled.

    Returns:
        pd.Series: The scaled series.
        MinMaxScaler: The scaler object used for transformation.
    """
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(data.values.reshape(-1, 1))
    return pd.Series(scaled_data.flatten(), index=data.index), scaler

def cap_outliers(data, z_thresh=3):
    """
    Cap outliers in the data within a defined boundary.

    Args:
        data (pd.Series): The input series containing the data.
        z_thresh (float): The z-score threshold to define outliers.

    Returns:
        pd.Series: The series with outliers capped.
    """
    median = data.median()
    std_dev = data.std()
    lower_limit = median - z_thresh * std_dev
    upper_limit = median + z_thresh * std_dev
    return data.clip(lower=lower_limit, upper=upper_limit)

import pandas as pd
from pandas.tseries.holiday import USFederalHolidayCalendar

def calendarFeatures(df, date_col='ds'):
    """
    Extracts calendar features from a date column.

    Parameters:
        df (pd.DataFrame): DataFrame containing the date column.
        date_col (str): Name of the date column (default is 'ds').

    Returns:
        pd.DataFrame: DataFrame with additional calendar features.
    """
    # Ensure the date column is in datetime format
    df[date_col] = pd.to_datetime(df[date_col])

    # Extract basic date features
    df['day_of_week'] = df[date_col].dt.dayofweek  # Monday=0, Sunday=6
    df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)  # 1 if weekend, 0 otherwise
    df['month'] = df[date_col].dt.month
    df['day_of_month'] = df[date_col].dt.day
    df['quarter'] = df[date_col].dt.quarter
    df['year'] = df[date_col].dt.year
    # Extract holiday information using US Federal Holidays as an example (can be customized)
    cal = USFederalHolidayCalendar()
    holidays = cal.holidays(start=df[date_col].min(), end=df[date_col].max())
    df['is_holiday'] = df[date_col].isin(holidays).astype(int)  # 1 if holiday, 0 otherwise

    return df


def transformData(data, dir, id, y_var='brent_futures_Close', apply_log_return=False):
    """
    Transform the time series data by applying various scaling and transformation techniques.

    Args:
        data (pd.DataFrame): The input dataframe containing the time series data.
        apply_log_return (bool): Whether to apply log return transformation.

    Returns:
        data (pd.DataFrame): The transformed dataframe.
        scalers (dict): The scaler object used for the columns
    """

    data.sort_values(by='Date', inplace=True)
    scalers = {}

    # Apply transformations per column with checks for skewness, outliers, and distribution
    for col in data.columns:
        if col == 'Date':
            continue

        # Handle outliers by capping them rather than removing them
        try:
            data[col] = cap_outliers(data[col])
        except Exception as e:
            print(f"Error capping outliers for '{col}': {e}")

        # Scaling: Apply Robust scaling for skewed data and Standard scaling otherwise
        try:
            if data[col].skew() > 1 or data[col].skew() < -1:
                data[col], scaler = scaleRobust(data[col])
            else:
                data[col], scaler = scaleStandard(data[col])
            scalers[col] = scaler

        except Exception as e:
            print(f"Error scaling '{col}': {e}")

        # Normality check using Shapiro-Wilk test and log transform if skewed
        try:
            stat, p_value = shapiro(data[col].dropna())
            if p_value < 0.05:  # Not normally distributed
                if data[col].min() <= 0:
                    data[col] += abs(data[col].min()) + 1  # Ensure positive values
                data[col] = np.log1p(data[col])
                print(f"Applied logarithm transformation to '{col}' due to skewness.")
        except Exception as e:
            print(f"Error testing normality or applying log transformation to '{col}': {e}")

    # Apply log return transformation if specified, maintaining time series order
    if apply_log_return:
        for col in data.columns:
            if col != 'Date':  # Avoid log returns on date column
                try:
                    data[col] = logReturn(data, col)
                    print(f"Applied log return to '{col}'.")
                except Exception as e:
                    print(f"Error processing log return for '{col}': {e}")

        # Specifically handle y_var if log return transformation is requested
        if y_var in data.columns and apply_log_return:
            try:
                data[y_var] = logReturn(data, y_var)

            except Exception as e:
                print(f"Error processing log return for y_var: {e}")

    # Dataset Artifact logging
    data.to_csv(os.path.join(dir, 'artifacts', id, 'transformed_dataset.csv'))

    return data, scalers