Spaces:
Runtime error
Runtime error
from typing import Dict, List | |
from crewai import Crew, Task | |
import logging | |
from utils.log_manager import LogManager | |
from agents.conversation_agent import ConversationAgent | |
from agents.assessment_agent import AssessmentAgent | |
from agents.mindfulness_agent import MindfulnessAgent | |
from agents.crisis_agent import CrisisAgent | |
class WellnessOrchestrator: | |
"""Orchestrates the coordination between different agents""" | |
def __init__(self, model_config: Dict): | |
self.model_config = model_config | |
self.log_manager = LogManager() | |
self.logger = self.log_manager.get_agent_logger("orchestrator") | |
# Initialize agents | |
self.initialize_agents() | |
# Initialize CrewAI | |
self.initialize_crew() | |
def initialize_agents(self): | |
"""Initialize all agents with their specific roles and tools""" | |
self.logger.info("Initializing agents") | |
try: | |
# Common agent configuration | |
agent_config = { | |
"llm_config": self.model_config.get("conversation", {}), | |
"temperature": 0.7, | |
"max_iterations": 3 | |
} | |
# Initialize each agent | |
self.conversation_agent = ConversationAgent( | |
model_config=self.model_config, | |
**agent_config | |
) | |
self.assessment_agent = ConversationAgent( # Temporarily using ConversationAgent | |
model_config=self.model_config, | |
**agent_config | |
) | |
self.mindfulness_agent = ConversationAgent( # Temporarily using ConversationAgent | |
model_config=self.model_config, | |
**agent_config | |
) | |
self.crisis_agent = ConversationAgent( # Temporarily using ConversationAgent | |
model_config=self.model_config, | |
**agent_config | |
) | |
self.logger.info("All agents initialized successfully") | |
except Exception as e: | |
self.logger.error(f"Error initializing agents: {str(e)}") | |
raise | |
def initialize_crew(self): | |
"""Initialize CrewAI with agents""" | |
self.logger.info("Initializing CrewAI") | |
try: | |
# Create the crew with all agent instances | |
self.crew = Crew( | |
agents=[ | |
self.conversation_agent, | |
self.assessment_agent, | |
self.mindfulness_agent, | |
self.crisis_agent | |
] | |
) | |
self.logger.info("CrewAI initialized successfully") | |
except Exception as e: | |
self.logger.error(f"Error initializing CrewAI: {str(e)}") | |
raise | |
def process_message(self, message: str, context: Dict = None) -> Dict: | |
"""Process user message through appropriate agents""" | |
self.logger.info("Processing message through agents") | |
context = context or {} | |
try: | |
# Create a task based on message type | |
if self._is_crisis(message): | |
task = Task( | |
description=message, | |
agent=self.crisis_agent, | |
context=context, | |
expected_output="Crisis intervention response" | |
) | |
response = self.crisis_agent.execute_task(task) | |
agent_type = "crisis" | |
elif "assess" in message.lower() or "evaluate" in message.lower(): | |
task = Task( | |
description=message, | |
agent=self.assessment_agent, | |
context=context, | |
expected_output="Mental health assessment response" | |
) | |
response = self.assessment_agent.execute_task(task) | |
agent_type = "assessment" | |
elif "meditate" in message.lower() or "mindful" in message.lower(): | |
task = Task( | |
description=message, | |
agent=self.mindfulness_agent, | |
context=context, | |
expected_output="Mindfulness guidance response" | |
) | |
response = self.mindfulness_agent.execute_task(task) | |
agent_type = "mindfulness" | |
else: | |
task = Task( | |
description=message, | |
agent=self.conversation_agent, | |
context=context, | |
expected_output="Therapeutic conversation response" | |
) | |
response = self.conversation_agent.execute_task(task) | |
agent_type = "conversation" | |
return { | |
"message": response, | |
"agent_type": agent_type, | |
"task_type": task.expected_output | |
} | |
except Exception as e: | |
self.logger.error(f"Error processing message: {str(e)}") | |
return { | |
"message": "I apologize, but I encountered an error. Please try again.", | |
"agent_type": "error", | |
"task_type": "error_handling" | |
} | |
def _is_crisis(self, message: str) -> bool: | |
"""Check if message indicates a crisis situation""" | |
crisis_indicators = [ | |
"suicide", "kill myself", "end it all", | |
"hurt myself", "give up", "can't go on", | |
"emergency", "crisis", "urgent help" | |
] | |
return any(indicator in message.lower() for indicator in crisis_indicators) | |
def get_status(self) -> Dict: | |
"""Get status of all agents""" | |
return { | |
"conversation": self.conversation_agent.get_status(), | |
"assessment": self.assessment_agent.get_status(), | |
"mindfulness": self.mindfulness_agent.get_status(), | |
"crisis": self.crisis_agent.get_status() | |
} |