team1_Dhiria / dev_dhiria.py
lucacolombo97's picture
New
718b812 verified
"""Generating deployment files."""
import shutil
import numpy as np
from pathlib import Path
from scipy import signal
from scipy.ndimage import label
from scipy.stats import zscore
from scipy.interpolate import interp1d
from scipy.integrate import trapz
from scipy.interpolate import interp1d
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression as SklearnLinearRegression
from concrete.ml.sklearn import LinearRegression as ConcreteLinearRegression
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from concrete.ml.sklearn.xgb import XGBClassifier
from concrete.ml.sklearn import LogisticRegression as ConcreteLogisticRegression
from concrete.ml.deployment import FHEModelDev
def interpolation(data, fs = 4.0):
rr_interpolated=[]
for i in range(len(data)):
rr_manual=data[i]
# Cumulative sum of data
x = np.cumsum(rr_manual) / 1000.0
f = interp1d(x, rr_manual, kind='cubic',fill_value="extrapolate")
steps = 1 / fs
xx = np.arange(1, np.max(x), steps)
rr_interpolated.append(f(xx))
return rr_interpolated
def frequency_domain(rri, fs=4):
'''
Segement found frequencies in the bands
- Very Low Frequency (VLF): 0-0.04Hz
- Low Frequency (LF): 0.04-0.15Hz
- High Frequency (HF): 0.15-0.4Hz
'''
# Estimate the spectral density using Welch's method
fxx, pxx = signal.welch(x=rri, fs=fs)
cond_vlf = (fxx >= 0) & (fxx < 0.04)
cond_lf = (fxx >= 0.04) & (fxx < 0.15)
cond_hf = (fxx >= 0.15) & (fxx < 0.4)
# calculate power in each band by integrating the spectral density
vlf = trapz(pxx[cond_vlf], fxx[cond_vlf])
lf = trapz(pxx[cond_lf], fxx[cond_lf])
hf = trapz(pxx[cond_hf], fxx[cond_hf])
# sum these up to get total power
total_power = vlf + lf + hf
# find which frequency has the most power in each band
peak_vlf = fxx[cond_vlf][np.argmax(pxx[cond_vlf])]
peak_lf = fxx[cond_lf][np.argmax(pxx[cond_lf])]
peak_hf = fxx[cond_hf][np.argmax(pxx[cond_hf])]
result=[vlf, lf, hf, total_power, lf/hf, peak_vlf, peak_lf, peak_hf]
return np.array(result)
def abs_sum_diff(x):
# Sum of absolute differences (SAD) is a measure of the similarity between signal
return sum(np.abs(np.diff(x)))
def statistics(df):
statistics = []
labels = []
for i in range(0, len(np.unique(df['User ID']))):
mean = np.mean(df[df['User ID'] == i]['Stride Length (m)'])
var = np.var(df[df['User ID'] == i]['Stride Length (m)'])
median = np.median(df[df['User ID'] == i]['Stride Length (m)'])
max_s = np.max(df[df['User ID'] == i]['Stride Length (m)'])
min_s = np.min(df[df['User ID'] == i]['Stride Length (m)'])
abs_sum_diff_s = abs_sum_diff(df[df['User ID'] == i]['Stride Length (m)'])
statistics.append([mean, var, median, max_s, min_s, abs_sum_diff_s])
labels.append(df[df['User ID'] == i]['Injury Risk Score'].iloc[0])
return statistics, labels
def train_model_1():
# Running quality
df = pd.read_csv('data/200_Users_Running_Dataset.csv')
data, risk = statistics(df)
data = pd.DataFrame(data)
X_train, X_test, y_train, y_test = train_test_split(data, risk, test_size=0.2, random_state=42)
sklearn_lr = SklearnLinearRegression()
sklearn_lr.fit(X_train, y_train)
concrete_lr = ConcreteLinearRegression(n_bits=8)
concrete_lr.fit(X_train, y_train)
# Compile the model
fhe_circuit = concrete_lr.compile(X_train)
fhe_circuit.client.keygen(force=False)
path_to_model = Path("./deployment_files_model1/").resolve()
if path_to_model.exists():
shutil.rmtree(path_to_model)
dev = FHEModelDev(path_to_model, concrete_lr)
dev.save(via_mlir=True)
def train_model_2():
df = pd.read_csv('data/data_mental.csv')
labels = df.iloc[:,1].T
data = df.iloc[:,2::].T
data.dropna(how='any', inplace=True, axis=0)
labels = labels.T
data = data.T
data = np.where((data.values > 1000) | (data.values<600), np.median(data.values), data.values)
labels = np.where((labels.values == 'stress'), 1, 0)
rr_interpolated = interpolation(data, 4.0)
results = []
for i in range(len(data)):
results.append(frequency_domain(rr_interpolated[i]))
freq_col=['vlf','lf','hf','tot_pow','lf_hf_ratio','peak_vlf','peak_lf','peak_hf']
freq_features = pd.DataFrame(results, columns = freq_col)
X_train, X_test, y_train, y_test = train_test_split(freq_features, labels, test_size=0.3, shuffle=True, stratify=labels)
scaling = StandardScaler()
X_train = scaling.fit_transform(X_train)
X_test = scaling.transform(X_test)
concrete_lr = ConcreteLinearRegression(n_bits=16)
concrete_lr.fit(X_train, y_train)
y_pred_q = concrete_lr.predict(X_test)
y_pred_q = np.where((y_pred_q > 0.6), 1, 0)
# Compile the model
fhe_circuit = concrete_lr.compile(X_train)
fhe_circuit.client.keygen(force=False)
path_to_model = Path("./deployment_files_model2/").resolve()
if path_to_model.exists():
shutil.rmtree(path_to_model)
dev = FHEModelDev(path_to_model, concrete_lr)
dev.save(via_mlir=True)
def train_model_3():
N_BITS = 3
df = pd.read_csv('data/dataset_for_last_model.csv')
X_train = df.drop('label', axis=1) # Drop the 'label' column for features
y_train = df['label'] # The 'label' column is the target
print(X_train)
print(y_train)
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.0, random_state=0)
model = XGBClassifier(n_jobs=1, n_bits=N_BITS)
model.fit(X_train, y_train)
# Compile the model
fhe_circuit = model.compile(X_train)
fhe_circuit.client.keygen(force=False)
path_to_model = Path("./deployment_files_model3/").resolve()
if path_to_model.exists():
shutil.rmtree(path_to_model)
dev = FHEModelDev(path_to_model, model)
dev.save(via_mlir=True)
if __name__ == "__main__":
train_model_1()
train_model_2()
train_model_3()