Spaces:
Sleeping
Sleeping
File size: 6,359 Bytes
718b812 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
"""Generating deployment files."""
import shutil
import numpy as np
from pathlib import Path
from scipy import signal
from scipy.ndimage import label
from scipy.stats import zscore
from scipy.interpolate import interp1d
from scipy.integrate import trapz
from scipy.interpolate import interp1d
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression as SklearnLinearRegression
from concrete.ml.sklearn import LinearRegression as ConcreteLinearRegression
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from concrete.ml.sklearn.xgb import XGBClassifier
from concrete.ml.sklearn import LogisticRegression as ConcreteLogisticRegression
from concrete.ml.deployment import FHEModelDev
def interpolation(data, fs = 4.0):
rr_interpolated=[]
for i in range(len(data)):
rr_manual=data[i]
# Cumulative sum of data
x = np.cumsum(rr_manual) / 1000.0
f = interp1d(x, rr_manual, kind='cubic',fill_value="extrapolate")
steps = 1 / fs
xx = np.arange(1, np.max(x), steps)
rr_interpolated.append(f(xx))
return rr_interpolated
def frequency_domain(rri, fs=4):
'''
Segement found frequencies in the bands
- Very Low Frequency (VLF): 0-0.04Hz
- Low Frequency (LF): 0.04-0.15Hz
- High Frequency (HF): 0.15-0.4Hz
'''
# Estimate the spectral density using Welch's method
fxx, pxx = signal.welch(x=rri, fs=fs)
cond_vlf = (fxx >= 0) & (fxx < 0.04)
cond_lf = (fxx >= 0.04) & (fxx < 0.15)
cond_hf = (fxx >= 0.15) & (fxx < 0.4)
# calculate power in each band by integrating the spectral density
vlf = trapz(pxx[cond_vlf], fxx[cond_vlf])
lf = trapz(pxx[cond_lf], fxx[cond_lf])
hf = trapz(pxx[cond_hf], fxx[cond_hf])
# sum these up to get total power
total_power = vlf + lf + hf
# find which frequency has the most power in each band
peak_vlf = fxx[cond_vlf][np.argmax(pxx[cond_vlf])]
peak_lf = fxx[cond_lf][np.argmax(pxx[cond_lf])]
peak_hf = fxx[cond_hf][np.argmax(pxx[cond_hf])]
result=[vlf, lf, hf, total_power, lf/hf, peak_vlf, peak_lf, peak_hf]
return np.array(result)
def abs_sum_diff(x):
# Sum of absolute differences (SAD) is a measure of the similarity between signal
return sum(np.abs(np.diff(x)))
def statistics(df):
statistics = []
labels = []
for i in range(0, len(np.unique(df['User ID']))):
mean = np.mean(df[df['User ID'] == i]['Stride Length (m)'])
var = np.var(df[df['User ID'] == i]['Stride Length (m)'])
median = np.median(df[df['User ID'] == i]['Stride Length (m)'])
max_s = np.max(df[df['User ID'] == i]['Stride Length (m)'])
min_s = np.min(df[df['User ID'] == i]['Stride Length (m)'])
abs_sum_diff_s = abs_sum_diff(df[df['User ID'] == i]['Stride Length (m)'])
statistics.append([mean, var, median, max_s, min_s, abs_sum_diff_s])
labels.append(df[df['User ID'] == i]['Injury Risk Score'].iloc[0])
return statistics, labels
def train_model_1():
# Running quality
df = pd.read_csv('data/200_Users_Running_Dataset.csv')
data, risk = statistics(df)
data = pd.DataFrame(data)
X_train, X_test, y_train, y_test = train_test_split(data, risk, test_size=0.2, random_state=42)
sklearn_lr = SklearnLinearRegression()
sklearn_lr.fit(X_train, y_train)
concrete_lr = ConcreteLinearRegression(n_bits=8)
concrete_lr.fit(X_train, y_train)
# Compile the model
fhe_circuit = concrete_lr.compile(X_train)
fhe_circuit.client.keygen(force=False)
path_to_model = Path("./deployment_files_model1/").resolve()
if path_to_model.exists():
shutil.rmtree(path_to_model)
dev = FHEModelDev(path_to_model, concrete_lr)
dev.save(via_mlir=True)
def train_model_2():
df = pd.read_csv('data/data_mental.csv')
labels = df.iloc[:,1].T
data = df.iloc[:,2::].T
data.dropna(how='any', inplace=True, axis=0)
labels = labels.T
data = data.T
data = np.where((data.values > 1000) | (data.values<600), np.median(data.values), data.values)
labels = np.where((labels.values == 'stress'), 1, 0)
rr_interpolated = interpolation(data, 4.0)
results = []
for i in range(len(data)):
results.append(frequency_domain(rr_interpolated[i]))
freq_col=['vlf','lf','hf','tot_pow','lf_hf_ratio','peak_vlf','peak_lf','peak_hf']
freq_features = pd.DataFrame(results, columns = freq_col)
X_train, X_test, y_train, y_test = train_test_split(freq_features, labels, test_size=0.3, shuffle=True, stratify=labels)
scaling = StandardScaler()
X_train = scaling.fit_transform(X_train)
X_test = scaling.transform(X_test)
concrete_lr = ConcreteLinearRegression(n_bits=16)
concrete_lr.fit(X_train, y_train)
y_pred_q = concrete_lr.predict(X_test)
y_pred_q = np.where((y_pred_q > 0.6), 1, 0)
# Compile the model
fhe_circuit = concrete_lr.compile(X_train)
fhe_circuit.client.keygen(force=False)
path_to_model = Path("./deployment_files_model2/").resolve()
if path_to_model.exists():
shutil.rmtree(path_to_model)
dev = FHEModelDev(path_to_model, concrete_lr)
dev.save(via_mlir=True)
def train_model_3():
N_BITS = 3
df = pd.read_csv('data/dataset_for_last_model.csv')
X_train = df.drop('label', axis=1) # Drop the 'label' column for features
y_train = df['label'] # The 'label' column is the target
print(X_train)
print(y_train)
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.0, random_state=0)
model = XGBClassifier(n_jobs=1, n_bits=N_BITS)
model.fit(X_train, y_train)
# Compile the model
fhe_circuit = model.compile(X_train)
fhe_circuit.client.keygen(force=False)
path_to_model = Path("./deployment_files_model3/").resolve()
if path_to_model.exists():
shutil.rmtree(path_to_model)
dev = FHEModelDev(path_to_model, model)
dev.save(via_mlir=True)
if __name__ == "__main__":
train_model_1()
train_model_2()
train_model_3() |