Spaces:
Runtime error
Runtime error
import math | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import gradio as gr | |
from datasets import load_dataset | |
from sklearn.datasets import fetch_openml | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import classification_report | |
LOGS_DATASET_URI = 'pgurazada1/machine-failure-mlops-demo-logs' | |
# Load and cache training data | |
dataset = fetch_openml(data_id=42890, as_frame=True, parser="auto") | |
data_df = dataset.data | |
target = 'Machine failure' | |
numeric_features = [ | |
'Air temperature [K]', | |
'Process temperature [K]', | |
'Rotational speed [rpm]', | |
'Torque [Nm]', | |
'Tool wear [min]' | |
] | |
categorical_features = ['Type'] | |
X = data_df[numeric_features + categorical_features] | |
y = data_df[target] | |
Xtrain, Xtest, ytrain, ytest = train_test_split( | |
X, y, | |
test_size=0.2, | |
random_state=42 | |
) | |
def get_data(): | |
""" | |
Connect to the HuggingFace dataset where the logs are stored. | |
Pull the data into a dataframe | |
""" | |
data = load_dataset(LOGS_DATASET_URI) | |
sample_df = data['train'].to_pandas().sample(100) | |
return sample_df | |
def check_model_drift(): | |
""" | |
Check proportion of machine failure as compared to | |
its proportion in training data. If the deviation is more than | |
2 standard deviations, flag a model drift. | |
""" | |
sample_df = get_data() | |
p_pos_label_training_data = 0.03475 | |
training_data_size = 8000 | |
n_0 = sample_df.prediction.value_counts()[0] | |
try: | |
n_1 = sample_df.prediction.value_counts()[1] | |
except Exception as e: | |
n_1 = 0 | |
p_pos_label_sample_logs = n_1/(n_0+n_1) | |
variance = (p_pos_label_training_data * (1-p_pos_label_training_data))/training_data_size | |
p_diff = abs(p_pos_label_training_data - p_pos_label_sample_logs) | |
if p_diff > 2 * math.sqrt(variance): | |
return "Model Drift Detected! Check Logs!" | |
else: | |
return "No Model Drift!" | |
def plot_target_distributions(): | |
sample_df = get_data() | |
figure, axes = plt.subplots(2, 1, figsize=(9, 7)) | |
sns.countplot(x=ytrain, stat='proportion', ax=axes[0]) | |
axes[0].set_title("Distribution of targets in training data") | |
axes[0].set_xlabel('') | |
sns.countplot(x=sample_df.prediction, stat='proportion', ax=axes[1]) | |
axes[1].set_title("Distribution of predicted targets from the deployed model") | |
axes[1].set_xlabel('') | |
plt.close() | |
return figure | |
def psi(actual_proportions, expected_proportions): | |
psi_values = (actual_proportions - expected_proportions) * \ | |
np.log(actual_proportions / expected_proportions) | |
return sum(psi_values) | |
def check_data_drift(): | |
""" | |
Compare training data features and live features. If the deviation is | |
more than 2 standard deviations, flag data drift. | |
Numeric features and catagorical features are dealt with separately. | |
""" | |
sample_df = get_data() | |
data_drift_status = {} | |
numeric_features = [ | |
'Air temperature [K]', | |
'Process temperature [K]', | |
'Rotational speed [rpm]', | |
'Torque [Nm]', | |
'Tool wear [min]' | |
] | |
categorical_features = ['Type'] | |
# Numeric features | |
for feature in numeric_features: | |
mean_feature_training_data = Xtrain[feature].mean() | |
std_feature_training_data = Xtrain[feature].std() | |
mean_feature_sample_logs = sample_df[feature].mean() | |
mean_diff = abs(mean_feature_training_data - mean_feature_sample_logs) | |
if mean_diff > 2 * std_feature_training_data: | |
data_drift_status[feature] = ["Data Drift Detected! Check Logs!"] | |
else: | |
data_drift_status[feature] = ["No Data Drift!"] | |
# Categorical feature Type | |
live_proportions = sample_df['Type'].value_counts(normalize=True).values | |
training_proportions = Xtrain['Type'].value_counts(normalize=True).values | |
psi_value = psi(live_proportions, training_proportions) | |
if psi_value > 0.1: | |
data_drift_status['Type'] = ["Data Drift Detected! Check Logs!"] | |
else: | |
data_drift_status['Type'] = ["No Data Drift!"] | |
return pd.DataFrame.from_dict(data_drift_status) | |
with gr.Blocks(theme=gr.themes.Base()) as demo: | |
gr.Markdown("# Real-time Monitoring Dashboard") | |
gr.Markdown("## Model drift detection (every 5 seconds)") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Textbox(check_model_drift, every=5, label="Model Drift Status") | |
gr.Markdown("## Distribution of Training Targets") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Plot(plot_target_distributions, every=86400, label="Target Data Distributions") | |
gr.Markdown("## Data drift detection (every 5 seconds)") | |
with gr.Row(): | |
with gr.Column(): | |
gr.DataFrame(check_data_drift, every=5, min_width=240, label="Data Drift Status") | |
demo.queue().launch() |