Streamline-Analyst / app /src /model_service.py
Wilson-ZheLin
Initial commit
9183c57
import io
import numpy as np
import streamlit as st
from collections import Counter
from sklearn import metrics
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN
from joblib import dump
from sklearn.metrics import roc_curve, silhouette_score, calinski_harabasz_score, davies_bouldin_score, f1_score, r2_score, mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split
def split_data(X, Y, test_size = 0.2, random_state = 42, perform_pca = False):
"""
Splits the dataset into training and testing sets, optionally standardizing the data if PCA is not performed.
:param X: Feature matrix.
:param Y: Target vector.
:param test_size: Proportion of the dataset to include in the test split.
:param random_state: Controls the shuffling applied to the data before applying the split.
:param perform_pca: Has PCA been performed or not. If not, standardizes the data.
:return: A tuple containing split and optionally transformed datasets: X_train, X_test, Y_train, Y_test.
"""
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=test_size, random_state=random_state)
if not perform_pca:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
return X_train, X_test, Y_train, Y_test
def check_and_balance(X, Y, balance_threshold=0.5, method=1):
"""
Check if the dataset is imbalanced and perform oversampling if necessary using RandomOverSampler, SMOTE, or ADASYN.
Args:
X (DataFrame): Feature set.
Y (Series): Target variable.
balance_threshold (float): Threshold for class balance.
method (int): Method for oversampling. Options are 'random', 'smote', or 'adasyn'.
Returns:
X_resampled, Y_resampled (DataFrame/Series): Resampled data if imbalance is detected, else original data.
"""
try:
# Check the distribution of the target variable
class_distribution = Counter(Y)
# Determine if the dataset is imbalanced
min_class_samples = min(class_distribution.values())
max_class_samples = max(class_distribution.values())
is_imbalanced = min_class_samples / max_class_samples < balance_threshold
if is_imbalanced and method != 4:
if method == 1:
oversampler = RandomOverSampler(random_state=0)
elif method == 2:
oversampler = SMOTE(random_state=0)
elif method == 3:
oversampler = ADASYN(random_state=0)
X_resampled, Y_resampled = oversampler.fit_resample(X, Y)
return X_resampled, Y_resampled
else:
return X, Y
except Exception as e:
st.error("The target attribute may be continuous. Please check the data type.")
st.stop()
def estimate_optimal_clusters(df):
"""
Estimates the optimal number of clusters for KMeans clustering using the elbow method and silhouette scores.
:param df: DataFrame containing the dataset to cluster.
:return: The estimated optimal number of clusters.
"""
sse = {}
for k in range(2, 11):
kmeans = KMeans(n_clusters=k, random_state=42).fit(df)
sse[k] = kmeans.inertia_
# Find the elbow point: compute the first and second differences of the SSE
sse_values = list(sse.values())
first_diff = np.diff(sse_values) # first difference
second_diff = np.diff(first_diff) # second difference
knee_point = np.argmax(second_diff) + 2
# find the optimal number of clusters around the knee point
silhouette_avg_scores = {}
for k in range(knee_point - 1, knee_point + 2):
if k >= 2: # make sure k is at least 2
kmeans = KMeans(n_clusters=k, random_state=42).fit(df)
silhouette_avg_scores[k] = silhouette_score(df, kmeans.labels_)
# Find the optimal number of clusters based on the highest average silhouette score
optimal_clusters = max(silhouette_avg_scores, key=silhouette_avg_scores.get)
return optimal_clusters
def calculate_f1_score(model, X_test, Y_test, binary_classification=True):
"""
Calculates the F1 score for the predictions made by a model on a test set.
The function supports both binary and multi-class settings by adjusting the 'average' parameter in the f1_score calculation.
:param model: The trained machine learning model used for predictions.
:param X_test: The feature matrix for the test set.
:param Y_test: The true labels for the test set.
:param binary_classification: If True, calculates the F1 score for binary classification. Otherwise, calculates for multi-class classification using the 'macro' average.
:return: The F1 score of the model predictions.
"""
y_pred = model.predict(X_test)
if binary_classification:
f1 = f1_score(Y_test, y_pred, average='binary')
else:
f1 = f1_score(Y_test, y_pred, average='macro')
return f1
def model_score(model, X_test, Y_test):
"""
Calculate the model score for classification models.
"""
score = model.score(X_test, Y_test)
return score
def fpr_and_tpr(model, X_test, Y_test):
"""
Calculate the false positive rate and true positive rate for classification models.
"""
Y_pred = model.predict_proba(X_test)[:, 1]
fpr, tpr, _ = roc_curve(Y_test, Y_pred)
return fpr, tpr
def auc(fpr, tpr):
"""
Calculate the area under the ROC curve for classification models.
"""
auc = metrics.auc(fpr, tpr)
return auc
def calculate_silhouette_score(X, labels):
"""
Calculate the silhouette score for clustering models.
"""
return silhouette_score(X, labels)
def calculate_calinski_harabasz_score(X, labels):
"""
Calculate the calinski harabasz score for clustering models.
"""
return calinski_harabasz_score(X, labels)
def calculate_davies_bouldin_score(X, labels):
"""
Calculate the davies bouldin score for clustering models.
"""
return davies_bouldin_score(X, labels)
def gmm_predict(X, model):
"""
Get the predicted labels for a GMM model.
"""
labels = model.predict(X)
return labels
def calculate_r2_score(y_pred, Y_test):
"""
Calculate the r2 score for regression models.
"""
r2 = r2_score(Y_test, y_pred)
return r2
def calculate_mse_and_rmse(y_pred, Y_test):
"""
Calculate the mean squared error and root mean squared error for regression models.
"""
mse = mean_squared_error(Y_test, y_pred)
rmse = np.sqrt(mse)
return mse, rmse
def calculate_mae(y_pred, Y_test):
"""
Calculate the mean absolute error for regression models.
"""
mae = mean_absolute_error(Y_test, y_pred)
return mae
def save_model(model):
"""
Serializes a machine learning model into a binary format using joblib's dump function and stores it in a BytesIO buffer.
"""
buffer = io.BytesIO()
dump(model, buffer)
buffer.seek(0)
return buffer.getvalue()