import json
import logging
from termcolor import colored
from datetime import datetime
from typing import Any, Dict, Union, List
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from agents.base_agent import BaseAgent
from utils.read_markdown import read_markdown_file
from tools.advanced_scraper import scraper
from tools.google_serper import serper_search
from utils.logging import log_function, setup_logging
from utils.message_handling import get_ai_message_contents
from prompt_engineering.guided_json_lib import guided_json_search_query, guided_json_best_url, guided_json_router_decision
setup_logging(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class MessageDict(TypedDict):
role: str
content: str
class State(TypedDict):
meta_prompt: Annotated[List[MessageDict], add_messages]
conversation_history: Annotated[List[dict], add_messages]
user_input: Annotated[List[str], add_messages]
router_decision: bool
chat_limit: int
chat_finished: bool
recursion_limit: int
state: State = {
"meta_prompt": [],
"conversation_history": [],
"user_input": [],
"router_decision": None,
"chat_limit": None,
"chat_finished": False,
"recursion_limit": None
}
# class State(TypedDict):
# meta_prompt: Annotated[List[MessageDict], add_messages]
# conversation_history: Annotated[List[dict], add_messages]
# user_input: Annotated[List[str], add_messages]
# router_decision: bool
# chat_limit: int
# chat_finished: bool
# state: State = {
# "meta_prompt": [],
# "conversation_history": [],
# "user_input": [],
# "router_decision": None,
# "chat_limit": None,
# "chat_finished": False
# }
# def chat_counter(state: State) -> State:
# chat_limit = state.get("chat_limit")
# if chat_limit is None:
# chat_limit = 0
# chat_limit += 1
# state["chat_limit"] = chat_limit
# return state
# def chat_counter(state: State) -> State:
# chat_limit = state.get("chat_limit")
# if chat_limit is None:
# chat_limit = 0
# chat_limit += 1
# state["chat_limit"] = chat_limit
# return chat_limit
def routing_function(state: State) -> str:
if state["router_decision"]:
return "no_tool_expert"
else:
return "tool_expert"
def set_chat_finished(state: State) -> bool:
state["chat_finished"] = True
final_response = state["meta_prompt"][-1].content
print(colored(f"\n\n Meta Agent 🧙♂️: {final_response}", 'cyan'))
return state
class MetaExpert(BaseAgent[State]):
def __init__(self, model: str = None, server: str = None, temperature: float = 0,
model_endpoint: str = None, stop: str = None):
super().__init__(model, server, temperature, model_endpoint, stop)
self.llm = self.get_llm(json_model=False)
def get_prompt(self, state:None) -> str:
system_prompt = read_markdown_file('prompt_engineering/meta_prompt.md')
return system_prompt
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, List[MessageDict]]:
user_input = None
updates_conversation_history = {
"meta_prompt": [
{"role": "user", "content": f"{user_input}"},
{"role": "assistant", "content": str(response)}
]
}
return updates_conversation_history
@log_function(logger)
def get_conv_history(self, state: State) -> str:
conversation_history = state.get("conversation_history", [])
expert_message_history = get_ai_message_contents(conversation_history)
print(f"Expert Data Collected: {expert_message_history}")
expert_message_history = f"Expert Data Collected: {expert_message_history}"
return expert_message_history
def get_user_input(self) -> str:
user_input = input("Enter your query: ")
return user_input
def get_guided_json(self, state: State) -> Dict[str, Any]:
pass
def use_tool(self) -> Any:
pass
@log_function(logger)
def run(self, state: State) -> State:
# counter = chat_counter(state)
user_input = state.get("user_input")
state = self.invoke(state=state, user_input=user_input)
return state
class NoToolExpert(BaseAgent[State]):
def __init__(self, model: str = None, server: str = None, temperature: float = 0,
model_endpoint: str = None, stop: str = None):
super().__init__(model, server, temperature, model_endpoint, stop)
self.llm = self.get_llm(json_model=False)
def get_prompt(self, state) -> str:
# print(f"\nn{state}\n")
system_prompt = state["meta_prompt"][-1].content
return system_prompt
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]:
updates_conversation_history = {
"conversation_history": [
{"role": "user", "content": user_input},
{"role": "assistant", "content": f"{str(response)}"}
]
}
return updates_conversation_history
def get_conv_history(self, state: State) -> str:
pass
def get_user_input(self) -> str:
pass
def get_guided_json(self, state: State) -> Dict[str, Any]:
pass
def use_tool(self) -> Any:
pass
# @log_function(logger)
def run(self, state: State) -> State:
# chat_counter(state)
user_input = state["meta_prompt"][1].content
state = self.invoke(state=state, user_input=user_input)
return state
class ToolExpert(BaseAgent[State]):
def __init__(self, model: str = None, server: str = None, temperature: float = 0,
model_endpoint: str = None, stop: str = None):
super().__init__(model, server, temperature, model_endpoint, stop)
self.llm = self.get_llm(json_model=False)
def get_prompt(self, state) -> str:
system_prompt = state["meta_prompt"][-1].content
return system_prompt
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]:
updates_conversation_history = {
"conversation_history": [
{"role": "user", "content": user_input},
{"role": "assistant", "content": f"{str(response)}"}
]
}
return updates_conversation_history
def get_conv_history(self, state: State) -> str:
pass
def get_user_input(self) -> str:
pass
def get_guided_json(self, state: State) -> Dict[str, Any]:
pass
# Use Serper to search
def use_tool(self, mode: str, tool_input: str, doc_type: str = None) -> Any:
if mode == "serper":
results = serper_search(tool_input, self.location)
return results
elif mode == "scraper":
results = scraper(tool_input, doc_type)
return results
# @log_function(logger)
def run(self, state: State) -> State:
# counter = chat_counter(state)
refine_query_template = """
Given the response from your manager.
# Response from Manager
{manager_response}
**Return the following JSON:**
{{"search_query": The refined google search engine query that aligns with the response from your managers.}}
"""
best_url_template = """
Given the serper results, and the instructions from your manager. Select the best URL
# Manger Instructions
{manager_response}
# Serper Results
{serper_results}
**Return the following JSON:**
{{"best_url": The URL of the serper results that aligns most with the instructions from your manager.,
"pdf": A boolean value indicating whether the URL is a PDF or not. This should be True if the URL is a PDF, and False otherwise.}}
"""
user_input = state["meta_prompt"][-1].content
state = self.invoke(state=state, user_input=user_input)
full_query = state["conversation_history"][-1].get("content")
refine_query = self.get_llm(json_model=True)
refine_prompt = refine_query_template.format(manager_response=full_query)
input = [
{"role": "user", "content": full_query},
{"role": "assistant", "content": f"system_prompt:{refine_prompt}"}
]
if self.server == 'vllm':
guided_json = guided_json_search_query
refined_query = refine_query.invoke(input, guided_json)
else:
refined_query = refine_query.invoke(input)
refined_query_json = json.loads(refined_query)
refined_query = refined_query_json.get("search_query")
serper_response = self.use_tool("serper", refined_query)
best_url = self.get_llm(json_model=True)
best_url_prompt = best_url_template.format(manager_response=full_query, serper_results=serper_response)
input = [
{"role": "user", "content": serper_response},
{"role": "assistant", "content": f"system_prompt:{best_url_prompt}"}
]
if self.server == 'vllm':
guided_json = guided_json_best_url
best_url = best_url.invoke(input, guided_json)
else:
best_url = best_url.invoke(input)
best_url_json = json.loads(best_url)
best_url = best_url_json.get("best_url")
doc_type = best_url_json.get("pdf")
if doc_type == "True" or doc_type == True:
doc_type = "pdf"
else:
doc_type = "html"
scraper_response = self.use_tool("scraper", best_url, doc_type)
updates = self.process_response(scraper_response, user_input)
for key, value in updates.items():
state = self.update_state(key, value, state)
return state
class Router(BaseAgent[State]):
def __init__(self, model: str = None, server: str = None, temperature: float = 0,
model_endpoint: str = None, stop: str = None):
super().__init__(model, server, temperature, model_endpoint, stop)
self.llm = self.get_llm(json_model=True)
def get_prompt(self, state) -> str:
system_prompt = state["meta_prompt"][-1].content
return system_prompt
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]:
updates_conversation_history = {
"router_decision": [
{"role": "user", "content": user_input},
{"role": "assistant", "content": f"{str(response)} Todays date is {datetime.now()}"}
]
}
return updates_conversation_history
def get_conv_history(self, state: State) -> str:
pass
def get_user_input(self) -> str:
pass
def get_guided_json(self, state: State) -> Dict[str, Any]:
pass
def use_tool(self, tool_input: str, mode: str) -> Any:
pass
# @log_function(logger)
def run(self, state: State) -> State:
# router_template = """
# Given these instructions from your manager.
# # Response from Manager
# {manager_response}
# **Return the following JSON:**
# {{""router_decision: Return the next agent to pass control to.}}
# **strictly** adhere to these **guidelines** for routing.
# If your manager's response suggests a tool might be required to answer the query, return "tool_expert".
# If your manager's response suggests no tool is required to answer the query, return "no_tool_expert".
# If your manager's response suggest they have provided a final answer, return "end_chat".
# """
# chat_counter(state)
router_template = """
Given these instructions from your manager.
# Response from Manager
{manager_response}
**Return the following JSON:**
{{""router_decision: Return the next agent to pass control to.}}
**strictly** adhere to these **guidelines** for routing.
If your manager's response suggests the Expert Internet Researcher or the suggests the internet might be required, return "tool_expert".
If your manager's response suggests that the internet is not required, return "no_tool_expert".
If your manager's response suggest they have provided a final answer, return "end_chat".
"""
system_prompt = router_template.format(manager_response=state["meta_prompt"][-1].content)
input = [
{"role": "user", "content": ""},
{"role": "assistant", "content": f"system_prompt:{system_prompt}"}
]
router = self.get_llm(json_model=True)
if self.server == 'vllm':
guided_json = guided_json_router_decision
router_response = router.invoke(input, guided_json)
else:
router_response = router.invoke(input)
router_response = json.loads(router_response)
router_response = router_response.get("router_decision")
state = self.update_state("router_decision", router_response, state)
return state
# Example usage
if __name__ == "__main__":
from langgraph.graph import StateGraph
# For Claude
agent_kwargs = {
"model": "claude-3-haiku-20240307",
"server": "claude",
"temperature": 0.5
}
agent_kwargs = {
"model": "gpt-4o-mini",
"server": "openai",
"temperature": 0.1
}
# Ollama
# agent_kwargs = {
# "model": "phi3:instruct",
# "server": "ollama",
# "temperature": 0.5
# }
# Groq
# agent_kwargs = {
# "model": "mixtral-8x7b-32768",
# "server": "groq",
# "temperature": 0.5
# }
# # Gemnin - Not currently working, I will be debugging this soon.
# agent_kwargs = {
# "model": "gemini-1.5-pro",
# "server": "gemini",
# "temperature": 0.5
# }
# # Vllm
# agent_kwargs = {
# "model": "meta-llama/Meta-Llama-3-70B-Instruct",
# "server": "vllm",
# "temperature": 0.5,
# "model_endpoint": "https://vpzatdgopr2pmx-8000.proxy.runpod.net/",
# }
tools_router_agent_kwargs = agent_kwargs.copy()
tools_router_agent_kwargs["temperature"] = 0
def routing_function(state: State) -> str:
decision = state["router_decision"]
print(colored(f"\n\n Routing function called. Decision: {decision}", 'red'))
return decision
graph = StateGraph(State)
graph.add_node("meta_expert", lambda state: MetaExpert(**agent_kwargs).run(state=state))
graph.add_node("router", lambda state: Router(**tools_router_agent_kwargs).run(state=state))
graph.add_node("no_tool_expert", lambda state: NoToolExpert(**agent_kwargs).run(state=state))
graph.add_node("tool_expert", lambda state: ToolExpert(**tools_router_agent_kwargs).run(state=state))
graph.add_node("end_chat", lambda state: set_chat_finished(state))
graph.set_entry_point("meta_expert")
graph.set_finish_point("end_chat")
graph.add_edge("meta_expert", "router")
graph.add_edge("tool_expert", "meta_expert")
graph.add_edge("no_tool_expert", "meta_expert")
graph.add_conditional_edges(
"router",
lambda state: routing_function(state),
)
workflow = graph.compile()
while True:
query = input("Ask me anything: ")
if query.lower() == "exit":
break
# current_time = datetime.now()
recursion_limit = 30
state["recursion_limit"] = recursion_limit
state["user_input"] = query
limit = {"recursion_limit": recursion_limit}
for event in workflow.stream(state, limit):
pass