import io import numpy as np import streamlit as st from collections import Counter from sklearn import metrics from sklearn.preprocessing import StandardScaler from sklearn.cluster import KMeans from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN from joblib import dump from sklearn.metrics import roc_curve, silhouette_score, calinski_harabasz_score, davies_bouldin_score, f1_score, r2_score, mean_squared_error, mean_absolute_error from sklearn.model_selection import train_test_split def split_data(X, Y, test_size = 0.2, random_state = 42, perform_pca = False): """ Splits the dataset into training and testing sets, optionally standardizing the data if PCA is not performed. :param X: Feature matrix. :param Y: Target vector. :param test_size: Proportion of the dataset to include in the test split. :param random_state: Controls the shuffling applied to the data before applying the split. :param perform_pca: Has PCA been performed or not. If not, standardizes the data. :return: A tuple containing split and optionally transformed datasets: X_train, X_test, Y_train, Y_test. """ X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=test_size, random_state=random_state) if not perform_pca: scaler = StandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) return X_train, X_test, Y_train, Y_test def check_and_balance(X, Y, balance_threshold=0.5, method=1): """ Check if the dataset is imbalanced and perform oversampling if necessary using RandomOverSampler, SMOTE, or ADASYN. Args: X (DataFrame): Feature set. Y (Series): Target variable. balance_threshold (float): Threshold for class balance. method (int): Method for oversampling. Options are 'random', 'smote', or 'adasyn'. Returns: X_resampled, Y_resampled (DataFrame/Series): Resampled data if imbalance is detected, else original data. """ try: # Check the distribution of the target variable class_distribution = Counter(Y) # Determine if the dataset is imbalanced min_class_samples = min(class_distribution.values()) max_class_samples = max(class_distribution.values()) is_imbalanced = min_class_samples / max_class_samples < balance_threshold if is_imbalanced and method != 4: if method == 1: oversampler = RandomOverSampler(random_state=0) elif method == 2: oversampler = SMOTE(random_state=0) elif method == 3: oversampler = ADASYN(random_state=0) X_resampled, Y_resampled = oversampler.fit_resample(X, Y) return X_resampled, Y_resampled else: return X, Y except Exception as e: st.error("The target attribute may be continuous. Please check the data type.") st.stop() def estimate_optimal_clusters(df): """ Estimates the optimal number of clusters for KMeans clustering using the elbow method and silhouette scores. :param df: DataFrame containing the dataset to cluster. :return: The estimated optimal number of clusters. """ sse = {} for k in range(2, 11): kmeans = KMeans(n_clusters=k, random_state=42).fit(df) sse[k] = kmeans.inertia_ # Find the elbow point: compute the first and second differences of the SSE sse_values = list(sse.values()) first_diff = np.diff(sse_values) # first difference second_diff = np.diff(first_diff) # second difference knee_point = np.argmax(second_diff) + 2 # find the optimal number of clusters around the knee point silhouette_avg_scores = {} for k in range(knee_point - 1, knee_point + 2): if k >= 2: # make sure k is at least 2 kmeans = KMeans(n_clusters=k, random_state=42).fit(df) silhouette_avg_scores[k] = silhouette_score(df, kmeans.labels_) # Find the optimal number of clusters based on the highest average silhouette score optimal_clusters = max(silhouette_avg_scores, key=silhouette_avg_scores.get) return optimal_clusters def calculate_f1_score(model, X_test, Y_test, binary_classification=True): """ Calculates the F1 score for the predictions made by a model on a test set. The function supports both binary and multi-class settings by adjusting the 'average' parameter in the f1_score calculation. :param model: The trained machine learning model used for predictions. :param X_test: The feature matrix for the test set. :param Y_test: The true labels for the test set. :param binary_classification: If True, calculates the F1 score for binary classification. Otherwise, calculates for multi-class classification using the 'macro' average. :return: The F1 score of the model predictions. """ y_pred = model.predict(X_test) if binary_classification: f1 = f1_score(Y_test, y_pred, average='binary') else: f1 = f1_score(Y_test, y_pred, average='macro') return f1 def model_score(model, X_test, Y_test): """ Calculate the model score for classification models. """ score = model.score(X_test, Y_test) return score def fpr_and_tpr(model, X_test, Y_test): """ Calculate the false positive rate and true positive rate for classification models. """ Y_pred = model.predict_proba(X_test)[:, 1] fpr, tpr, _ = roc_curve(Y_test, Y_pred) return fpr, tpr def auc(fpr, tpr): """ Calculate the area under the ROC curve for classification models. """ auc = metrics.auc(fpr, tpr) return auc def calculate_silhouette_score(X, labels): """ Calculate the silhouette score for clustering models. """ return silhouette_score(X, labels) def calculate_calinski_harabasz_score(X, labels): """ Calculate the calinski harabasz score for clustering models. """ return calinski_harabasz_score(X, labels) def calculate_davies_bouldin_score(X, labels): """ Calculate the davies bouldin score for clustering models. """ return davies_bouldin_score(X, labels) def gmm_predict(X, model): """ Get the predicted labels for a GMM model. """ labels = model.predict(X) return labels def calculate_r2_score(y_pred, Y_test): """ Calculate the r2 score for regression models. """ r2 = r2_score(Y_test, y_pred) return r2 def calculate_mse_and_rmse(y_pred, Y_test): """ Calculate the mean squared error and root mean squared error for regression models. """ mse = mean_squared_error(Y_test, y_pred) rmse = np.sqrt(mse) return mse, rmse def calculate_mae(y_pred, Y_test): """ Calculate the mean absolute error for regression models. """ mae = mean_absolute_error(Y_test, y_pred) return mae def save_model(model): """ Serializes a machine learning model into a binary format using joblib's dump function and stores it in a BytesIO buffer. """ buffer = io.BytesIO() dump(model, buffer) buffer.seek(0) return buffer.getvalue()