Spaces:
Sleeping
Sleeping
import io | |
import numpy as np | |
import streamlit as st | |
from collections import Counter | |
from sklearn import metrics | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.cluster import KMeans | |
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN | |
from joblib import dump | |
from sklearn.metrics import roc_curve, silhouette_score, calinski_harabasz_score, davies_bouldin_score, f1_score, r2_score, mean_squared_error, mean_absolute_error | |
from sklearn.model_selection import train_test_split | |
def split_data(X, Y, test_size = 0.2, random_state = 42, perform_pca = False): | |
""" | |
Splits the dataset into training and testing sets, optionally standardizing the data if PCA is not performed. | |
:param X: Feature matrix. | |
:param Y: Target vector. | |
:param test_size: Proportion of the dataset to include in the test split. | |
:param random_state: Controls the shuffling applied to the data before applying the split. | |
:param perform_pca: Has PCA been performed or not. If not, standardizes the data. | |
:return: A tuple containing split and optionally transformed datasets: X_train, X_test, Y_train, Y_test. | |
""" | |
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=test_size, random_state=random_state) | |
if not perform_pca: | |
scaler = StandardScaler() | |
X_train = scaler.fit_transform(X_train) | |
X_test = scaler.transform(X_test) | |
return X_train, X_test, Y_train, Y_test | |
def check_and_balance(X, Y, balance_threshold=0.5, method=1): | |
""" | |
Check if the dataset is imbalanced and perform oversampling if necessary using RandomOverSampler, SMOTE, or ADASYN. | |
Args: | |
X (DataFrame): Feature set. | |
Y (Series): Target variable. | |
balance_threshold (float): Threshold for class balance. | |
method (int): Method for oversampling. Options are 'random', 'smote', or 'adasyn'. | |
Returns: | |
X_resampled, Y_resampled (DataFrame/Series): Resampled data if imbalance is detected, else original data. | |
""" | |
try: | |
# Check the distribution of the target variable | |
class_distribution = Counter(Y) | |
# Determine if the dataset is imbalanced | |
min_class_samples = min(class_distribution.values()) | |
max_class_samples = max(class_distribution.values()) | |
is_imbalanced = min_class_samples / max_class_samples < balance_threshold | |
if is_imbalanced and method != 4: | |
if method == 1: | |
oversampler = RandomOverSampler(random_state=0) | |
elif method == 2: | |
oversampler = SMOTE(random_state=0) | |
elif method == 3: | |
oversampler = ADASYN(random_state=0) | |
X_resampled, Y_resampled = oversampler.fit_resample(X, Y) | |
return X_resampled, Y_resampled | |
else: | |
return X, Y | |
except Exception as e: | |
st.error("The target attribute may be continuous. Please check the data type.") | |
st.stop() | |
def estimate_optimal_clusters(df): | |
""" | |
Estimates the optimal number of clusters for KMeans clustering using the elbow method and silhouette scores. | |
:param df: DataFrame containing the dataset to cluster. | |
:return: The estimated optimal number of clusters. | |
""" | |
sse = {} | |
for k in range(2, 11): | |
kmeans = KMeans(n_clusters=k, random_state=42).fit(df) | |
sse[k] = kmeans.inertia_ | |
# Find the elbow point: compute the first and second differences of the SSE | |
sse_values = list(sse.values()) | |
first_diff = np.diff(sse_values) # first difference | |
second_diff = np.diff(first_diff) # second difference | |
knee_point = np.argmax(second_diff) + 2 | |
# find the optimal number of clusters around the knee point | |
silhouette_avg_scores = {} | |
for k in range(knee_point - 1, knee_point + 2): | |
if k >= 2: # make sure k is at least 2 | |
kmeans = KMeans(n_clusters=k, random_state=42).fit(df) | |
silhouette_avg_scores[k] = silhouette_score(df, kmeans.labels_) | |
# Find the optimal number of clusters based on the highest average silhouette score | |
optimal_clusters = max(silhouette_avg_scores, key=silhouette_avg_scores.get) | |
return optimal_clusters | |
def calculate_f1_score(model, X_test, Y_test, binary_classification=True): | |
""" | |
Calculates the F1 score for the predictions made by a model on a test set. | |
The function supports both binary and multi-class settings by adjusting the 'average' parameter in the f1_score calculation. | |
:param model: The trained machine learning model used for predictions. | |
:param X_test: The feature matrix for the test set. | |
:param Y_test: The true labels for the test set. | |
:param binary_classification: If True, calculates the F1 score for binary classification. Otherwise, calculates for multi-class classification using the 'macro' average. | |
:return: The F1 score of the model predictions. | |
""" | |
y_pred = model.predict(X_test) | |
if binary_classification: | |
f1 = f1_score(Y_test, y_pred, average='binary') | |
else: | |
f1 = f1_score(Y_test, y_pred, average='macro') | |
return f1 | |
def model_score(model, X_test, Y_test): | |
""" | |
Calculate the model score for classification models. | |
""" | |
score = model.score(X_test, Y_test) | |
return score | |
def fpr_and_tpr(model, X_test, Y_test): | |
""" | |
Calculate the false positive rate and true positive rate for classification models. | |
""" | |
Y_pred = model.predict_proba(X_test)[:, 1] | |
fpr, tpr, _ = roc_curve(Y_test, Y_pred) | |
return fpr, tpr | |
def auc(fpr, tpr): | |
""" | |
Calculate the area under the ROC curve for classification models. | |
""" | |
auc = metrics.auc(fpr, tpr) | |
return auc | |
def calculate_silhouette_score(X, labels): | |
""" | |
Calculate the silhouette score for clustering models. | |
""" | |
return silhouette_score(X, labels) | |
def calculate_calinski_harabasz_score(X, labels): | |
""" | |
Calculate the calinski harabasz score for clustering models. | |
""" | |
return calinski_harabasz_score(X, labels) | |
def calculate_davies_bouldin_score(X, labels): | |
""" | |
Calculate the davies bouldin score for clustering models. | |
""" | |
return davies_bouldin_score(X, labels) | |
def gmm_predict(X, model): | |
""" | |
Get the predicted labels for a GMM model. | |
""" | |
labels = model.predict(X) | |
return labels | |
def calculate_r2_score(y_pred, Y_test): | |
""" | |
Calculate the r2 score for regression models. | |
""" | |
r2 = r2_score(Y_test, y_pred) | |
return r2 | |
def calculate_mse_and_rmse(y_pred, Y_test): | |
""" | |
Calculate the mean squared error and root mean squared error for regression models. | |
""" | |
mse = mean_squared_error(Y_test, y_pred) | |
rmse = np.sqrt(mse) | |
return mse, rmse | |
def calculate_mae(y_pred, Y_test): | |
""" | |
Calculate the mean absolute error for regression models. | |
""" | |
mae = mean_absolute_error(Y_test, y_pred) | |
return mae | |
def save_model(model): | |
""" | |
Serializes a machine learning model into a binary format using joblib's dump function and stores it in a BytesIO buffer. | |
""" | |
buffer = io.BytesIO() | |
dump(model, buffer) | |
buffer.seek(0) | |
return buffer.getvalue() |