"""Generating deployment files.""" import shutil import numpy as np from pathlib import Path from scipy import signal from scipy.ndimage import label from scipy.stats import zscore from scipy.interpolate import interp1d from scipy.integrate import trapz from scipy.interpolate import interp1d import pandas as pd from sklearn.decomposition import PCA from sklearn.linear_model import LinearRegression as SklearnLinearRegression from concrete.ml.sklearn import LinearRegression as ConcreteLinearRegression from sklearn.model_selection import GridSearchCV, train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from concrete.ml.sklearn.xgb import XGBClassifier from concrete.ml.sklearn import LogisticRegression as ConcreteLogisticRegression from concrete.ml.deployment import FHEModelDev def interpolation(data, fs = 4.0): rr_interpolated=[] for i in range(len(data)): rr_manual=data[i] # Cumulative sum of data x = np.cumsum(rr_manual) / 1000.0 f = interp1d(x, rr_manual, kind='cubic',fill_value="extrapolate") steps = 1 / fs xx = np.arange(1, np.max(x), steps) rr_interpolated.append(f(xx)) return rr_interpolated def frequency_domain(rri, fs=4): ''' Segement found frequencies in the bands - Very Low Frequency (VLF): 0-0.04Hz - Low Frequency (LF): 0.04-0.15Hz - High Frequency (HF): 0.15-0.4Hz ''' # Estimate the spectral density using Welch's method fxx, pxx = signal.welch(x=rri, fs=fs) cond_vlf = (fxx >= 0) & (fxx < 0.04) cond_lf = (fxx >= 0.04) & (fxx < 0.15) cond_hf = (fxx >= 0.15) & (fxx < 0.4) # calculate power in each band by integrating the spectral density vlf = trapz(pxx[cond_vlf], fxx[cond_vlf]) lf = trapz(pxx[cond_lf], fxx[cond_lf]) hf = trapz(pxx[cond_hf], fxx[cond_hf]) # sum these up to get total power total_power = vlf + lf + hf # find which frequency has the most power in each band peak_vlf = fxx[cond_vlf][np.argmax(pxx[cond_vlf])] peak_lf = fxx[cond_lf][np.argmax(pxx[cond_lf])] peak_hf = fxx[cond_hf][np.argmax(pxx[cond_hf])] result=[vlf, lf, hf, total_power, lf/hf, peak_vlf, peak_lf, peak_hf] return np.array(result) def abs_sum_diff(x): # Sum of absolute differences (SAD) is a measure of the similarity between signal return sum(np.abs(np.diff(x))) def statistics(df): statistics = [] labels = [] for i in range(0, len(np.unique(df['User ID']))): mean = np.mean(df[df['User ID'] == i]['Stride Length (m)']) var = np.var(df[df['User ID'] == i]['Stride Length (m)']) median = np.median(df[df['User ID'] == i]['Stride Length (m)']) max_s = np.max(df[df['User ID'] == i]['Stride Length (m)']) min_s = np.min(df[df['User ID'] == i]['Stride Length (m)']) abs_sum_diff_s = abs_sum_diff(df[df['User ID'] == i]['Stride Length (m)']) statistics.append([mean, var, median, max_s, min_s, abs_sum_diff_s]) labels.append(df[df['User ID'] == i]['Injury Risk Score'].iloc[0]) return statistics, labels def train_model_1(): # Running quality df = pd.read_csv('data/200_Users_Running_Dataset.csv') data, risk = statistics(df) data = pd.DataFrame(data) X_train, X_test, y_train, y_test = train_test_split(data, risk, test_size=0.2, random_state=42) sklearn_lr = SklearnLinearRegression() sklearn_lr.fit(X_train, y_train) concrete_lr = ConcreteLinearRegression(n_bits=8) concrete_lr.fit(X_train, y_train) # Compile the model fhe_circuit = concrete_lr.compile(X_train) fhe_circuit.client.keygen(force=False) path_to_model = Path("./deployment_files_model1/").resolve() if path_to_model.exists(): shutil.rmtree(path_to_model) dev = FHEModelDev(path_to_model, concrete_lr) dev.save(via_mlir=True) def train_model_2(): df = pd.read_csv('data/data_mental.csv') labels = df.iloc[:,1].T data = df.iloc[:,2::].T data.dropna(how='any', inplace=True, axis=0) labels = labels.T data = data.T data = np.where((data.values > 1000) | (data.values<600), np.median(data.values), data.values) labels = np.where((labels.values == 'stress'), 1, 0) rr_interpolated = interpolation(data, 4.0) results = [] for i in range(len(data)): results.append(frequency_domain(rr_interpolated[i])) freq_col=['vlf','lf','hf','tot_pow','lf_hf_ratio','peak_vlf','peak_lf','peak_hf'] freq_features = pd.DataFrame(results, columns = freq_col) X_train, X_test, y_train, y_test = train_test_split(freq_features, labels, test_size=0.3, shuffle=True, stratify=labels) scaling = StandardScaler() X_train = scaling.fit_transform(X_train) X_test = scaling.transform(X_test) concrete_lr = ConcreteLinearRegression(n_bits=16) concrete_lr.fit(X_train, y_train) y_pred_q = concrete_lr.predict(X_test) y_pred_q = np.where((y_pred_q > 0.6), 1, 0) # Compile the model fhe_circuit = concrete_lr.compile(X_train) fhe_circuit.client.keygen(force=False) path_to_model = Path("./deployment_files_model2/").resolve() if path_to_model.exists(): shutil.rmtree(path_to_model) dev = FHEModelDev(path_to_model, concrete_lr) dev.save(via_mlir=True) def train_model_3(): N_BITS = 3 df = pd.read_csv('data/dataset_for_last_model.csv') X_train = df.drop('label', axis=1) # Drop the 'label' column for features y_train = df['label'] # The 'label' column is the target print(X_train) print(y_train) # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.0, random_state=0) model = XGBClassifier(n_jobs=1, n_bits=N_BITS) model.fit(X_train, y_train) # Compile the model fhe_circuit = model.compile(X_train) fhe_circuit.client.keygen(force=False) path_to_model = Path("./deployment_files_model3/").resolve() if path_to_model.exists(): shutil.rmtree(path_to_model) dev = FHEModelDev(path_to_model, model) dev.save(via_mlir=True) if __name__ == "__main__": train_model_1() train_model_2() train_model_3()