from typing import Dict, List from crewai import Crew, Task import logging from utils.log_manager import LogManager from agents.conversation_agent import ConversationAgent from agents.assessment_agent import AssessmentAgent from agents.mindfulness_agent import MindfulnessAgent from agents.crisis_agent import CrisisAgent class WellnessOrchestrator: """Orchestrates the coordination between different agents""" def __init__(self, model_config: Dict): self.model_config = model_config self.log_manager = LogManager() self.logger = self.log_manager.get_agent_logger("orchestrator") # Initialize agents self.initialize_agents() # Initialize CrewAI self.initialize_crew() def initialize_agents(self): """Initialize all agents with their specific roles and tools""" self.logger.info("Initializing agents") try: # Common agent configuration agent_config = { "llm_config": self.model_config.get("conversation", {}), "temperature": 0.7, "max_iterations": 3 } # Initialize each agent self.conversation_agent = ConversationAgent( model_config=self.model_config, **agent_config ) self.assessment_agent = ConversationAgent( # Temporarily using ConversationAgent model_config=self.model_config, **agent_config ) self.mindfulness_agent = ConversationAgent( # Temporarily using ConversationAgent model_config=self.model_config, **agent_config ) self.crisis_agent = ConversationAgent( # Temporarily using ConversationAgent model_config=self.model_config, **agent_config ) self.logger.info("All agents initialized successfully") except Exception as e: self.logger.error(f"Error initializing agents: {str(e)}") raise def initialize_crew(self): """Initialize CrewAI with agents""" self.logger.info("Initializing CrewAI") try: # Create the crew with all agent instances self.crew = Crew( agents=[ self.conversation_agent, self.assessment_agent, self.mindfulness_agent, self.crisis_agent ] ) self.logger.info("CrewAI initialized successfully") except Exception as e: self.logger.error(f"Error initializing CrewAI: {str(e)}") raise def process_message(self, message: str, context: Dict = None) -> Dict: """Process user message through appropriate agents""" self.logger.info("Processing message through agents") context = context or {} try: # Create a task based on message type if self._is_crisis(message): task = Task( description=message, agent=self.crisis_agent, context=context, expected_output="Crisis intervention response" ) response = self.crisis_agent.execute_task(task) agent_type = "crisis" elif "assess" in message.lower() or "evaluate" in message.lower(): task = Task( description=message, agent=self.assessment_agent, context=context, expected_output="Mental health assessment response" ) response = self.assessment_agent.execute_task(task) agent_type = "assessment" elif "meditate" in message.lower() or "mindful" in message.lower(): task = Task( description=message, agent=self.mindfulness_agent, context=context, expected_output="Mindfulness guidance response" ) response = self.mindfulness_agent.execute_task(task) agent_type = "mindfulness" else: task = Task( description=message, agent=self.conversation_agent, context=context, expected_output="Therapeutic conversation response" ) response = self.conversation_agent.execute_task(task) agent_type = "conversation" return { "message": response, "agent_type": agent_type, "task_type": task.expected_output } except Exception as e: self.logger.error(f"Error processing message: {str(e)}") return { "message": "I apologize, but I encountered an error. Please try again.", "agent_type": "error", "task_type": "error_handling" } def _is_crisis(self, message: str) -> bool: """Check if message indicates a crisis situation""" crisis_indicators = [ "suicide", "kill myself", "end it all", "hurt myself", "give up", "can't go on", "emergency", "crisis", "urgent help" ] return any(indicator in message.lower() for indicator in crisis_indicators) def get_status(self) -> Dict: """Get status of all agents""" return { "conversation": self.conversation_agent.get_status(), "assessment": self.assessment_agent.get_status(), "mindfulness": self.mindfulness_agent.get_status(), "crisis": self.crisis_agent.get_status() }