Streamline-Analyst / app /src /
Initial commit
import io
import numpy as np
import streamlit as st
from collections import Counter
from sklearn import metrics
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN
from joblib import dump
from sklearn.metrics import roc_curve, silhouette_score, calinski_harabasz_score, davies_bouldin_score, f1_score, r2_score, mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split
def split_data(X, Y, test_size = 0.2, random_state = 42, perform_pca = False):
Splits the dataset into training and testing sets, optionally standardizing the data if PCA is not performed.
:param X: Feature matrix.
:param Y: Target vector.
:param test_size: Proportion of the dataset to include in the test split.
:param random_state: Controls the shuffling applied to the data before applying the split.
:param perform_pca: Has PCA been performed or not. If not, standardizes the data.
:return: A tuple containing split and optionally transformed datasets: X_train, X_test, Y_train, Y_test.
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=test_size, random_state=random_state)
if not perform_pca:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
return X_train, X_test, Y_train, Y_test
def check_and_balance(X, Y, balance_threshold=0.5, method=1):
Check if the dataset is imbalanced and perform oversampling if necessary using RandomOverSampler, SMOTE, or ADASYN.
X (DataFrame): Feature set.
Y (Series): Target variable.
balance_threshold (float): Threshold for class balance.
method (int): Method for oversampling. Options are 'random', 'smote', or 'adasyn'.
X_resampled, Y_resampled (DataFrame/Series): Resampled data if imbalance is detected, else original data.
# Check the distribution of the target variable
class_distribution = Counter(Y)
# Determine if the dataset is imbalanced
min_class_samples = min(class_distribution.values())
max_class_samples = max(class_distribution.values())
is_imbalanced = min_class_samples / max_class_samples < balance_threshold
if is_imbalanced and method != 4:
if method == 1:
oversampler = RandomOverSampler(random_state=0)
elif method == 2:
oversampler = SMOTE(random_state=0)
elif method == 3:
oversampler = ADASYN(random_state=0)
X_resampled, Y_resampled = oversampler.fit_resample(X, Y)
return X_resampled, Y_resampled
return X, Y
except Exception as e:
st.error("The target attribute may be continuous. Please check the data type.")
def estimate_optimal_clusters(df):
Estimates the optimal number of clusters for KMeans clustering using the elbow method and silhouette scores.
:param df: DataFrame containing the dataset to cluster.
:return: The estimated optimal number of clusters.
sse = {}
for k in range(2, 11):
kmeans = KMeans(n_clusters=k, random_state=42).fit(df)
sse[k] = kmeans.inertia_
# Find the elbow point: compute the first and second differences of the SSE
sse_values = list(sse.values())
first_diff = np.diff(sse_values) # first difference
second_diff = np.diff(first_diff) # second difference
knee_point = np.argmax(second_diff) + 2
# find the optimal number of clusters around the knee point
silhouette_avg_scores = {}
for k in range(knee_point - 1, knee_point + 2):
if k >= 2: # make sure k is at least 2
kmeans = KMeans(n_clusters=k, random_state=42).fit(df)
silhouette_avg_scores[k] = silhouette_score(df, kmeans.labels_)
# Find the optimal number of clusters based on the highest average silhouette score
optimal_clusters = max(silhouette_avg_scores, key=silhouette_avg_scores.get)
return optimal_clusters
def calculate_f1_score(model, X_test, Y_test, binary_classification=True):
Calculates the F1 score for the predictions made by a model on a test set.
The function supports both binary and multi-class settings by adjusting the 'average' parameter in the f1_score calculation.
:param model: The trained machine learning model used for predictions.
:param X_test: The feature matrix for the test set.
:param Y_test: The true labels for the test set.
:param binary_classification: If True, calculates the F1 score for binary classification. Otherwise, calculates for multi-class classification using the 'macro' average.
:return: The F1 score of the model predictions.
y_pred = model.predict(X_test)
if binary_classification:
f1 = f1_score(Y_test, y_pred, average='binary')
f1 = f1_score(Y_test, y_pred, average='macro')
return f1
def model_score(model, X_test, Y_test):
Calculate the model score for classification models.
score = model.score(X_test, Y_test)
return score
def fpr_and_tpr(model, X_test, Y_test):
Calculate the false positive rate and true positive rate for classification models.
Y_pred = model.predict_proba(X_test)[:, 1]
fpr, tpr, _ = roc_curve(Y_test, Y_pred)
return fpr, tpr
def auc(fpr, tpr):
Calculate the area under the ROC curve for classification models.
auc = metrics.auc(fpr, tpr)
return auc
def calculate_silhouette_score(X, labels):
Calculate the silhouette score for clustering models.
return silhouette_score(X, labels)
def calculate_calinski_harabasz_score(X, labels):
Calculate the calinski harabasz score for clustering models.
return calinski_harabasz_score(X, labels)
def calculate_davies_bouldin_score(X, labels):
Calculate the davies bouldin score for clustering models.
return davies_bouldin_score(X, labels)
def gmm_predict(X, model):
Get the predicted labels for a GMM model.
labels = model.predict(X)
return labels
def calculate_r2_score(y_pred, Y_test):
Calculate the r2 score for regression models.
r2 = r2_score(Y_test, y_pred)
return r2
def calculate_mse_and_rmse(y_pred, Y_test):
Calculate the mean squared error and root mean squared error for regression models.
mse = mean_squared_error(Y_test, y_pred)
rmse = np.sqrt(mse)
return mse, rmse
def calculate_mae(y_pred, Y_test):
Calculate the mean absolute error for regression models.
mae = mean_absolute_error(Y_test, y_pred)
return mae
def save_model(model):
Serializes a machine learning model into a binary format using joblib's dump function and stores it in a BytesIO buffer.
buffer = io.BytesIO()
dump(model, buffer)
return buffer.getvalue()