import torch import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, ndcg_score from typing import Dict, List, Tuple import json import os from train_model import HybridMusicRecommender, MusicRecommenderDataset from torch.utils.data import DataLoader import logging from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import ParameterGrid, train_test_split logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('model_evaluation.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class ModelEvaluator: def __init__(self, model_path: str, test_data: pd.DataFrame, batch_size: int = 32): self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.model_path = model_path self.test_data = test_data self.batch_size = batch_size # Load model and config torch.serialization.add_safe_globals([LabelEncoder]) self.checkpoint = torch.load(model_path, map_location=self.device, weights_only=False) self.config = self.checkpoint['config'] self.encoders = self.checkpoint['encoders'] # Initialize model self.model = self._initialize_model() self.test_loader = self._prepare_data() # Create metrics directory with absolute path self.metrics_dir = os.path.join(os.path.dirname(model_path), 'metrics') os.makedirs(self.metrics_dir, exist_ok=True) def _initialize_model(self, custom_config: Dict = None) -> HybridMusicRecommender: """Initialize and load the model from checkpoint.""" # Use custom config if provided, otherwise use default config = custom_config if custom_config else self.config model = HybridMusicRecommender( num_users=len(self.encoders['user_encoder'].classes_), num_music=len(self.encoders['music_encoder'].classes_), num_artists=len(self.encoders['artist_encoder'].classes_), num_genres=len(self.encoders['genre_encoder'].classes_), num_numerical=12, embedding_dim=config['embedding_dim'], layers=config['hidden_layers'], dropout=config['dropout'] ) # Only load state dict if using default config if not custom_config: model.load_state_dict(self.checkpoint['model_state_dict']) model = model.to(self.device) model.eval() return model def _prepare_data(self) -> DataLoader: """Prepare test data loader using saved encoders.""" # Create a custom dataset for test data with the saved encoders test_dataset = MusicRecommenderDataset( self.test_data, mode='test', encoders=self.encoders ) logger.info(f"Prepared test dataset with {len(self.test_data)} samples") return DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False) def calculate_metrics(self) -> Dict[str, float]: """Calculate various performance metrics.""" true_values = [] predictions = [] with torch.no_grad(): for batch in self.test_loader: batch = {k: v.to(self.device) for k, v in batch.items()} pred = self.model(batch) true_values.extend(batch['playcount'].cpu().numpy()) predictions.extend(pred.cpu().numpy()) true_values = np.array(true_values) predictions = np.array(predictions) metrics = { 'mse': float(mean_squared_error(true_values, predictions)), 'rmse': float(np.sqrt(mean_squared_error(true_values, predictions))), 'mae': float(mean_absolute_error(true_values, predictions)), 'r2': float(r2_score(true_values, predictions)) } # Calculate prediction distribution statistics metrics.update({ 'pred_mean': float(np.mean(predictions)), 'pred_std': float(np.std(predictions)), 'true_mean': float(np.mean(true_values)), 'true_std': float(np.std(true_values)) }) return metrics def analyze_prediction_bias(self) -> Dict[str, float]: """Analyze prediction bias across different value ranges.""" true_values = [] predictions = [] with torch.no_grad(): for batch in self.test_loader: batch = {k: v.to(self.device) for k, v in batch.items()} pred = self.model(batch) true_values.extend(batch['playcount'].cpu().numpy()) predictions.extend(pred.cpu().numpy()) true_values = np.array(true_values) predictions = np.array(predictions) # Calculate bias for different value ranges percentiles = np.percentile(true_values, [25, 50, 75]) ranges = [ (float('-inf'), percentiles[0]), (percentiles[0], percentiles[1]), (percentiles[1], percentiles[2]), (percentiles[2], float('inf')) ] bias_analysis = {} for i, (low, high) in enumerate(ranges): mask = (true_values >= low) & (true_values < high) if np.any(mask): bias = np.mean(predictions[mask] - true_values[mask]) bias_analysis[f'bias_range_{i+1}'] = float(bias) return bias_analysis def plot_prediction_distribution(self, save_dir: str = None): """Plot the distribution of predictions vs true values.""" if save_dir is None: save_dir = self.metrics_dir true_values = [] predictions = [] with torch.no_grad(): for batch in self.test_loader: batch = {k: v.to(self.device) for k, v in batch.items()} pred = self.model(batch) true_values.extend(batch['playcount'].cpu().numpy()) predictions.extend(pred.cpu().numpy()) true_values = np.array(true_values) predictions = np.array(predictions) # Create scatter plot plt.figure(figsize=(10, 6)) plt.scatter(true_values, predictions, alpha=0.5) plt.plot([true_values.min(), true_values.max()], [true_values.min(), true_values.max()], 'r--', lw=2) plt.xlabel('True Values') plt.ylabel('Predictions') plt.title('Prediction vs True Values') try: # Save plot with absolute path plot_path = os.path.join(save_dir, 'prediction_distribution.png') plt.savefig(plot_path) plt.close() logger.info(f"Saved prediction distribution plot to: {plot_path}") except Exception as e: logger.error(f"Error saving prediction distribution plot: {str(e)}") def plot_error_distribution(self, save_dir: str = None): """Plot the distribution of prediction errors.""" if save_dir is None: save_dir = self.metrics_dir true_values = [] predictions = [] with torch.no_grad(): for batch in self.test_loader: batch = {k: v.to(self.device) for k, v in batch.items()} pred = self.model(batch) true_values.extend(batch['playcount'].cpu().numpy()) predictions.extend(pred.cpu().numpy()) errors = np.array(predictions) - np.array(true_values) plt.figure(figsize=(10, 6)) sns.histplot(errors, kde=True) plt.xlabel('Prediction Error') plt.ylabel('Count') plt.title('Distribution of Prediction Errors') try: plot_path = os.path.join(save_dir, 'error_distribution.png') plt.savefig(plot_path) plt.close() logger.info(f"Saved error distribution plot to: {plot_path}") except Exception as e: logger.error(f"Error saving error distribution plot: {str(e)}") def evaluate_top_k_recommendations(self, k: int = 10) -> Dict[str, float]: """Evaluate top-K recommendation metrics.""" user_metrics = [] # Group by user to evaluate per-user recommendations for user_id in self.test_data['user_id'].unique(): user_mask = self.test_data['user_id'] == user_id user_data = self.test_data[user_mask] # Skip users with too few interactions if len(user_data) < k: continue user_dataset = MusicRecommenderDataset( user_data, mode='test', encoders=self.encoders ) user_loader = DataLoader(user_dataset, batch_size=len(user_data), shuffle=False) with torch.no_grad(): batch = next(iter(user_loader)) batch = {k: v.to(self.device) for k, v in batch.items()} predictions = self.model(batch).cpu().numpy() true_values = batch['playcount'].cpu().numpy() # Normalize predictions and true values to [0, 1] range true_values = (true_values - true_values.min()) / (true_values.max() - true_values.min() + 1e-8) predictions = (predictions - predictions.min()) / (predictions.max() - predictions.min() + 1e-8) # Calculate metrics for this user top_k_pred_idx = np.argsort(predictions)[-k:][::-1] top_k_true_idx = np.argsort(true_values)[-k:][::-1] # Calculate NDCG dcg = self._calculate_dcg(true_values, top_k_pred_idx, k) idcg = self._calculate_dcg(true_values, top_k_true_idx, k) # Handle edge case where idcg is 0 ndcg = dcg / idcg if idcg > 0 else 0.0 # Calculate precision and recall relevant_items = set(top_k_true_idx) recommended_items = set(top_k_pred_idx) precision = len(relevant_items & recommended_items) / k recall = len(relevant_items & recommended_items) / len(relevant_items) user_metrics.append({ 'ndcg': ndcg, 'precision': precision, 'recall': recall }) # Average metrics across users avg_metrics = { 'ndcg@10': float(np.mean([m['ndcg'] for m in user_metrics])), 'precision@10': float(np.mean([m['precision'] for m in user_metrics])), 'recall@10': float(np.mean([m['recall'] for m in user_metrics])) } return avg_metrics def _calculate_dcg(self, true_values: np.ndarray, indices: np.ndarray, k: int) -> float: """Helper method to calculate DCG with numerical stability.""" relevance = true_values[indices[:k]] # Cap the relevance values to prevent overflow max_relevance = 10 # Set a reasonable maximum value relevance = np.clip(relevance, 0, max_relevance) # Use log2(rank + 1) directly instead of creating array gains = (2 ** relevance - 1) / np.log2(np.arange(2, len(relevance) + 2)) return float(np.sum(gains)) def evaluate_cold_start(self, min_interactions: int = 5) -> Dict[str, Dict[str, float]]: """ Evaluate model performance on cold-start scenarios. Args: min_interactions: Minimum number of interactions to consider a user/item as non-cold Returns: Dictionary containing metrics for different cold-start scenarios """ # Get all unique users and items all_users = self.test_data['user_id'].unique() all_items = self.test_data['music_id'].unique() # Count interactions per user and item user_counts = self.test_data['user_id'].value_counts() item_counts = self.test_data['music_id'].value_counts() # Identify cold users and items cold_users = set(user_counts[user_counts < min_interactions].index) cold_items = set(item_counts[item_counts < min_interactions].index) # Create masks for different scenarios cold_user_mask = self.test_data['user_id'].isin(cold_users) cold_item_mask = self.test_data['music_id'].isin(cold_items) cold_user_warm_item = cold_user_mask & ~cold_item_mask warm_user_cold_item = ~cold_user_mask & cold_item_mask cold_both = cold_user_mask & cold_item_mask warm_both = ~cold_user_mask & ~cold_item_mask scenarios = { 'cold_user_warm_item': cold_user_warm_item, 'warm_user_cold_item': warm_user_cold_item, 'cold_both': cold_both, 'warm_both': warm_both } results = {} for scenario_name, mask in scenarios.items(): if not any(mask): logger.warning(f"No samples found for scenario: {scenario_name}") continue scenario_data = self.test_data[mask].copy() # Create a temporary dataset and dataloader for this scenario scenario_dataset = MusicRecommenderDataset( scenario_data, mode='test', encoders=self.encoders ) scenario_loader = DataLoader( scenario_dataset, batch_size=self.batch_size, shuffle=False ) # Collect predictions and true values true_values = [] predictions = [] with torch.no_grad(): for batch in scenario_loader: batch = {k: v.to(self.device) for k, v in batch.items()} pred = self.model(batch) true_values.extend(batch['playcount'].cpu().numpy()) predictions.extend(pred.cpu().numpy()) true_values = np.array(true_values) predictions = np.array(predictions) # Calculate metrics metrics = { 'count': len(true_values), 'mse': float(mean_squared_error(true_values, predictions)), 'rmse': float(np.sqrt(mean_squared_error(true_values, predictions))), 'mae': float(mean_absolute_error(true_values, predictions)), 'r2': float(r2_score(true_values, predictions)), 'pred_mean': float(np.mean(predictions)), 'pred_std': float(np.std(predictions)), 'true_mean': float(np.mean(true_values)), 'true_std': float(np.std(true_values)) } results[scenario_name] = metrics # Log results for this scenario logger.info(f"\n{scenario_name} Metrics (n={metrics['count']}):") for metric, value in metrics.items(): if metric != 'count': logger.info(f"{metric}: {value:.4f}") return results def save_evaluation_results(self, save_dir: str = 'metrics'): """Run all evaluations and save results.""" os.makedirs(save_dir, exist_ok=True) # Calculate all metrics results = { 'basic_metrics': self.calculate_metrics(), 'bias_analysis': self.analyze_prediction_bias(), 'top_k_metrics': self.evaluate_top_k_recommendations(), 'cold_start_metrics': self.evaluate_cold_start(min_interactions=5) } # Save results to JSON results_file = os.path.join(save_dir, 'evaluation_results.json') with open(results_file, 'w') as f: json.dump(results, f, indent=4) logger.info(f"Evaluation completed. Results saved to: {save_dir}") return results def tune_hyperparameters(self, param_grid: Dict[str, List], val_data: pd.DataFrame) -> Dict: """ Tune hyperparameters using validation set. Args: param_grid: Dictionary of parameters to try val_data: Validation data Returns: Best parameters found """ best_score = float('inf') best_params = None # Create validation dataset val_dataset = MusicRecommenderDataset(val_data, mode='test', encoders=self.encoders) val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False) # Try all parameter combinations for params in ParameterGrid(param_grid): # Create a new config with updated parameters current_config = self.config.copy() current_config.update(params) # Initialize model with current parameters self.model = self._initialize_model(custom_config=current_config) # Evaluate on validation set metrics = self.calculate_metrics() score = metrics['rmse'] # Use RMSE as scoring metric if score < best_score: best_score = score best_params = params logger.info(f"New best parameters found: {params} (RMSE: {score:.4f})") return best_params def main(): # Load test data and check for data compatibility ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) test_path = os.path.join(ROOT_DIR, 'data', 'test_data.csv') model_path = os.path.join(ROOT_DIR, 'data_engineered_v3', 'rs_main_v2_refactored', 'checkpoints', 'best_model.pth') test_data = pd.read_csv(test_path) logger.info(f"Loaded test data with {len(test_data)} samples") # Split test data into validation and test val_data, test_data = train_test_split(test_data, test_size=0.5, random_state=42) try: # Initialize evaluator evaluator = ModelEvaluator( model_path=model_path, test_data=test_data, batch_size=32 ) # Tune hyperparameters param_grid = { 'embedding_dim': [32, 64, 128], 'dropout': [0.1, 0.2, 0.3], 'hidden_layers': [[128, 64], [256, 128, 64], [512, 256, 128]] } best_params = evaluator.tune_hyperparameters(param_grid, val_data) logger.info(f"Best parameters: {best_params}") # Run evaluation results = evaluator.save_evaluation_results() # Print summary logger.info("\nEvaluation Summary:") logger.info("Basic Metrics:") for metric, value in results['basic_metrics'].items(): logger.info(f"{metric}: {value:.4f}") logger.info("\nTop-K Metrics:") for metric, value in results['top_k_metrics'].items(): logger.info(f"{metric}: {value:.4f}") logger.info("\nBias Analysis:") for range_name, bias in results['bias_analysis'].items(): logger.info(f"{range_name}: {bias:.4f}") except Exception as e: logger.error(f"Error during evaluation: {str(e)}") raise if __name__ == "__main__": main()