import logging from logging.handlers import RotatingFileHandler import os from pathlib import Path from typing import Optional class LogManager: """Manages logging configuration and rotation for all agents""" def __init__(self, log_dir: str = "logs", max_bytes: int = 5_000_000, backup_count: int = 5): """Initialize log manager Args: log_dir: Directory to store log files max_bytes: Maximum size of each log file before rotation backup_count: Number of backup files to keep """ self.log_dir = Path(log_dir) self.max_bytes = max_bytes self.backup_count = backup_count # Create log directories self._create_log_dirs() # Configure root logger self._configure_root_logger() def _create_log_dirs(self): """Create necessary log directories""" dirs = [ self.log_dir, self.log_dir / "agents", self.log_dir / "system", self.log_dir / "analytics" ] for dir_path in dirs: dir_path.mkdir(parents=True, exist_ok=True) def _configure_root_logger(self): """Configure the root logger""" root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # System log handler system_handler = RotatingFileHandler( self.log_dir / "system" / "system.log", maxBytes=self.max_bytes, backupCount=self.backup_count ) system_handler.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) system_handler.setFormatter(formatter) root_logger.addHandler(system_handler) def get_agent_logger(self, agent_name: str, log_level: int = logging.INFO) -> logging.Logger: """Get a configured logger for an agent Args: agent_name: Name of the agent (used in log file name) log_level: Logging level for this logger Returns: Configured logger instance """ logger = logging.getLogger(f"agent.{agent_name}") logger.setLevel(log_level) # Remove any existing handlers logger.handlers = [] # Add rotating file handler handler = RotatingFileHandler( self.log_dir / "agents" / f"{agent_name}.log", maxBytes=self.max_bytes, backupCount=self.backup_count ) handler.setLevel(log_level) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def get_analytics_logger(self, name: str, log_level: int = logging.INFO) -> logging.Logger: """Get a configured logger for analytics Args: name: Analytics category name log_level: Logging level for this logger Returns: Configured logger instance """ logger = logging.getLogger(f"analytics.{name}") logger.setLevel(log_level) # Remove any existing handlers logger.handlers = [] # Add rotating file handler handler = RotatingFileHandler( self.log_dir / "analytics" / f"{name}.log", maxBytes=self.max_bytes, backupCount=self.backup_count ) handler.setLevel(log_level) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def cleanup_old_logs(self, max_age_days: int = 30): """Clean up log files older than specified days Args: max_age_days: Maximum age of log files in days """ import time current_time = time.time() max_age_seconds = max_age_days * 24 * 60 * 60 for root, _, files in os.walk(self.log_dir): for file in files: file_path = os.path.join(root, file) if os.path.getmtime(file_path) < (current_time - max_age_seconds): try: os.remove(file_path) logging.info(f"Removed old log file: {file_path}") except Exception as e: logging.error(f"Error removing old log file {file_path}: {str(e)}") def get_log_stats(self) -> dict: """Get statistics about log files Returns: Dictionary containing log statistics """ stats = { "total_size": 0, "file_count": 0, "categories": {} } for root, _, files in os.walk(self.log_dir): category = os.path.basename(root) if category not in stats["categories"]: stats["categories"][category] = { "size": 0, "file_count": 0 } for file in files: file_path = os.path.join(root, file) size = os.path.getsize(file_path) stats["total_size"] += size stats["file_count"] += 1 stats["categories"][category]["size"] += size stats["categories"][category]["file_count"] += 1 return stats