Spaces:
Sleeping
Sleeping
import json | |
from multiprocessing import Pool, cpu_count | |
# import requests | |
# from tenacity import RetryError | |
import re | |
import logging | |
import chainlit as cl | |
from termcolor import colored | |
from typing import Any, Dict, Union, List | |
from typing import TypedDict, Annotated | |
from langgraph.graph.message import add_messages | |
from agents.base_agent import BaseAgent | |
from utils.read_markdown import read_markdown_file | |
from tools.google_serper import serper_search, serper_shopping_search | |
from utils.logging import log_function, setup_logging | |
from tools.offline_graph_rag_tool import run_rag | |
from prompt_engineering.guided_json_lib import ( | |
guided_json_search_query, | |
guided_json_best_url_two, | |
guided_json_router_decision, | |
guided_json_parse_expert, | |
guided_json_search_query_two | |
) | |
setup_logging(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
class MessageDict(TypedDict): | |
role: str | |
content: str | |
class State(TypedDict): | |
meta_prompt: Annotated[List[dict], add_messages] | |
conversation_history: Annotated[List[dict], add_messages] | |
requirements_gathering: Annotated[List[str], add_messages] | |
expert_plan: str | |
expert_research: Annotated[List[str], add_messages] | |
expert_research_shopping: Annotated[List[str], add_messages] | |
expert_writing: str | |
user_input: Annotated[List[str], add_messages] | |
previous_search_queries: Annotated[List[dict], add_messages] | |
router_decision: str | |
chat_limit: int | |
chat_finished: bool | |
recursion_limit: int | |
final_answer: str | |
state: State = { | |
"meta_prompt": [], | |
"conversation_history": [], | |
"requirements_gathering": [], | |
"expert_plan": [], | |
"expert_research": [], | |
"expert_research_shopping": [], | |
"expert_writing": [], | |
"user_input": [], | |
"previous_search_queries": [], | |
"router_decision": None, | |
"chat_limit": None, | |
"chat_finished": False, | |
"recursion_limit": None, | |
"final_answer": None | |
} | |
def chat_counter(state: State) -> State: | |
chat_limit = state.get("chat_limit") | |
if chat_limit is None: | |
chat_limit = 0 | |
chat_limit += 1 | |
state["chat_limit"] = chat_limit | |
return chat_limit | |
def routing_function(state: State) -> str: | |
decision = state["router_decision"] | |
print(colored(f"\n\n Routing function called. Decision: {decision}\n\n", 'green')) | |
return decision | |
def set_chat_finished(state: State) -> bool: | |
state["chat_finished"] = True | |
final_response = state["meta_prompt"][-1].content | |
print(colored(f"\n\n DEBUG FINAL RESPONSE: {final_response}\n\n", 'green')) | |
# Split the response at ">> FINAL ANSWER:" | |
parts = final_response.split(">> FINAL ANSWER:") | |
if len(parts) > 1: | |
answer_part = parts[1].strip() | |
# Remove any triple quotes | |
final_response_formatted = answer_part.strip('"""') | |
# Remove leading whitespace | |
final_response_formatted = final_response_formatted.lstrip() | |
# Remove the CoR dictionary at the end | |
cor_pattern = r'\nCoR\s*=\s*\{[\s\S]*\}\s*$' | |
final_response_formatted = re.sub(cor_pattern, '', final_response_formatted) | |
# Remove any trailing whitespace | |
final_response_formatted = final_response_formatted.rstrip() | |
# print(colored(f"\n\n DEBUG: {final_response_formatted}\n\n", 'green')) | |
print(colored(f"\n\n Jarvis👩💻: {final_response_formatted}", 'cyan')) | |
state["final_answer"] = f'''{final_response_formatted}''' | |
else: | |
print(colored("Error: Could not find '>> FINAL ANSWER:' in the response", 'red')) | |
state["final_answer"] = "Error: No final answer found" | |
return state | |
class Jar3d(BaseAgent[State]): | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop) | |
self.llm = self.get_llm(json_model=False) | |
def get_prompt(self, state: State = None) -> str: | |
system_prompt_md = read_markdown_file('prompt_engineering/jar3d_requirements_prompt.md') | |
system_prompt = f"{system_prompt_md}\n <Type2> {state.get('final_answer', '')} </Type2>" | |
return system_prompt | |
def process_response(self, response: Any, user_input: str, state: State = None) -> Dict[str, List[Dict[str, str]]]: | |
updates_conversation_history = { | |
"requirements_gathering": [ | |
{"role": "user", "content": f"{user_input}"}, | |
{"role": "assistant", "content": str(response)} | |
] | |
} | |
return updates_conversation_history | |
def get_conv_history(self, state: State) -> str: | |
conversation_history = state.get('requirements_gathering', []) | |
return "\n".join([f"{msg['role']}: {msg['content']}" for msg in conversation_history]) | |
def get_user_input(self) -> str: | |
pass | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self) -> Any: | |
pass | |
def run_chainlit(self, state: State, message: cl.Message) -> State: | |
user_message = message.content | |
# system_prompt = self.get_prompt() | |
user_input = f"cogor:{user_message}" | |
# user_input = f"{system_prompt}\n cogor {user_message}" | |
state = self.invoke(state=state, user_input=user_input) | |
response = state['requirements_gathering'][-1]["content"] | |
response = re.sub(r'^```python[\s\S]*?```\s*', '', response, flags=re.MULTILINE) | |
response = response.lstrip() | |
return state, response | |
class MetaExpert(BaseAgent[State]): | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop) | |
self.llm = self.get_llm(json_model=False) | |
def get_prompt(self, state:None) -> str: | |
system_prompt = read_markdown_file('prompt_engineering/jar3d_meta_prompt.md') | |
return system_prompt | |
def process_response(self, response: Any, user_input: str, state: State = None) -> Dict[str, List[MessageDict]]: | |
user_input = None | |
updates_conversation_history = { | |
"meta_prompt": [ | |
{"role": "user", "content": f"{user_input}"}, | |
{"role": "assistant", "content": str(response)} | |
] | |
} | |
return updates_conversation_history | |
# @log_function(logger) | |
def get_conv_history(self, state: State) -> str: | |
all_expert_research = [] | |
if state["expert_research"]: | |
expert_research = state["expert_research"] | |
all_expert_research.extend(expert_research) | |
else: | |
all_expert_research = [] | |
expert_message_history = f""" | |
<expert_plan> | |
## Your Expert Plan:\n{state.get("expert_plan", [])}\n | |
</expert_plan> | |
<expert_writing> | |
## Your Expert Writing:\n{state.get("expert_writing", [])}\n | |
</expert_writing> | |
<internet_research_shopping_list> | |
## Your Expert Shopping List:\n{state.get("expert_research_shopping", [])}\n | |
</internet_research_shopping_list> | |
<internet_research> | |
## Your Expert Research:{all_expert_research}\n | |
</internet_research> | |
""" | |
return expert_message_history | |
def get_user_input(self) -> str: | |
user_input = input("Enter your query: ") | |
return user_input | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self) -> Any: | |
pass | |
def run(self, state: State) -> State: | |
counter = chat_counter(state) # Counts every time we invoke the Meta Agent | |
recursion_limit = state.get("recursion_limit") | |
recursions = 3*counter - 2 | |
print(colored(f"\n\n * We have envoked the Meta-Agent {counter} times.\n * we have run {recursions} max total iterations: {recursion_limit}\n\n", "green")) | |
upper_limit_recursions = recursion_limit | |
lower_limit_recursions = recursion_limit - 2 | |
if recursions >= lower_limit_recursions and recursions <= upper_limit_recursions: | |
final_answer = "**You are being explicitly told to produce your [Type 2] work now!**" | |
elif recursions > upper_limit_recursions: | |
extra_recursions = recursions - upper_limit_recursions | |
base_message = "**You are being explicitly told to produce your [Type 2] work now!**" | |
final_answer = (base_message + "\n") * (extra_recursions + 1) | |
else: | |
final_answer = None | |
try: | |
requirements = state['requirements_gathering'][-1]["content"] | |
except: | |
requirements = state['requirements_gathering'][-1].content | |
formatted_requirements = '\n\n'.join(re.findall(r'```python\s*([\s\S]*?)\s*```', requirements, re.MULTILINE)) | |
print(colored(f"\n\n User Requirements: {formatted_requirements}\n\n", 'green')) | |
if state.get("meta_prompt"): | |
try: | |
meta_prompt = state['meta_prompt'][-1]["content"] | |
except: | |
meta_prompt = state['meta_prompt'][-1].content | |
# print(colored(f"\n\n DEBUG Meta-Prompt: {meta_prompt}\n\n", 'yellow')) | |
cor_match = '\n\n'.join(re.findall(r'```python\s*([\s\S]*?)\s*```', meta_prompt, re.MULTILINE)) | |
# print(colored(f"\n\n DEBUG CoR Match: {cor_match}\n\n", 'yellow')) | |
user_input = f"<requirements>{formatted_requirements}</requirements> \n\n Here is your last CoR {cor_match} update your next CoR from here." | |
else: | |
user_input = formatted_requirements | |
state = self.invoke(state=state, user_input=user_input, final_answer=final_answer) | |
meta_prompt_cor = state['meta_prompt'][-1]["content"] | |
print(colored(f"\n\n Meta-Prompt Chain of Reasoning: {meta_prompt_cor}\n\n", 'green')) | |
return state | |
class NoToolExpert(BaseAgent[State]): | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop) | |
self.llm = self.get_llm(json_model=False) | |
def get_prompt(self, state) -> str: | |
# print(f"\nn{state}\n") | |
# The prompt not from the markdown, but form the meta-expert generated | |
system_prompt = state["meta_prompt"][-1].content | |
return system_prompt | |
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]: | |
# meta_prompts = state.get("meta_prompt", []) | |
associated_meta_prompt = state["meta_prompt"][-1].content | |
parse_expert = self.get_llm(json_model=True) | |
parse_expert_prompt = """ | |
You must parse the expert from the text. The expert will be one of the following. | |
1. Expert Planner | |
2. Expert Writer | |
Return your response as the following JSON | |
{{"expert": "Expert Planner" or "Expert Writer"}} | |
""" | |
input = [ | |
{"role": "user", "content": associated_meta_prompt}, | |
{"role": "assistant", "content": f"system_prompt:{parse_expert_prompt}"} | |
] | |
retries = 0 | |
associated_expert = None | |
while retries < 4 and associated_expert is None: | |
retries += 1 | |
if self.server == 'vllm': | |
guided_json = guided_json_parse_expert | |
parse_expert_response = parse_expert.invoke(input, guided_json) | |
else: | |
parse_expert_response = parse_expert.invoke(input) | |
associated_expert_json = json.loads(parse_expert_response) | |
associated_expert = associated_expert_json.get("expert") | |
# associated_expert = parse_expert_text(associated_meta_prompt) | |
print(colored(f"\n\n Expert: {associated_expert}\n\n", 'green')) | |
if associated_expert == "Expert Planner": | |
expert_update_key = "expert_plan" | |
if associated_expert == "Expert Writer": | |
expert_update_key = "expert_writing" | |
updates_conversation_history = { | |
"conversation_history": [ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": f"{str(response)}"} | |
], | |
expert_update_key: {"role": "assistant", "content": f"{str(response)}"} | |
} | |
return updates_conversation_history | |
def get_conv_history(self, state: State) -> str: | |
pass | |
def get_user_input(self) -> str: | |
pass | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self) -> Any: | |
pass | |
# @log_function(logger) | |
def run(self, state: State) -> State: | |
# chat_counter(state) | |
all_expert_research = [] | |
meta_prompt = state["meta_prompt"][1].content | |
if state.get("expert_research"): | |
expert_research = state["expert_research"] | |
all_expert_research.extend(expert_research) | |
research_prompt = f"\n Your response must be delivered considering following research.\n ## Research\n {all_expert_research} " | |
user_input = f"{meta_prompt}\n{research_prompt}" | |
else: | |
user_input = meta_prompt | |
state = self.invoke(state=state, user_input=user_input) | |
return state | |
class ToolExpert(BaseAgent[State]): | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None, location: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop, location) | |
print(f"\n\n DEBUG LOCATION: {self.location}") | |
self.llm = self.get_llm(json_model=False) | |
def get_prompt(self, state) -> str: | |
system_prompt = state["meta_prompt"][-1].content | |
return system_prompt | |
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]: | |
updates_conversation_history = { | |
"conversation_history": [ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": f"{str(response)}"} | |
], | |
"expert_research": {"role": "assistant", "content": f"{str(response)}"} | |
} | |
return updates_conversation_history | |
def get_conv_history(self, state: State) -> str: | |
pass | |
def get_user_input(self) -> str: | |
pass | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self, mode: str, engine: str, tool_input: str, query: str = None) -> Any: | |
if mode == "serper": | |
if engine == "search": | |
results = serper_search(tool_input, self.location) | |
return {"results": results, "is_shopping": False} | |
elif engine == "shopping": | |
results = serper_shopping_search(tool_input, self.location) | |
return {"results": results, "is_shopping": True} | |
elif mode == "rag": | |
results = run_rag(urls=tool_input, query=query) | |
return {"results": results, "is_shopping": False} | |
def generate_search_queries(self, meta_prompt: str, num_queries: int = 5) -> List[str]: | |
refine_query_template = """ | |
# Objective | |
Your mission is to systematically address your manager's instructions by determining | |
the most appropriate search queries to use **AND** to determine the best engine to use for each query. | |
Your engine choice is either search, or shopping. You must return either the search or shopping engine for each query. | |
You will generate {num_queries} different search queries. | |
# Manager's Instructions | |
{manager_instructions} | |
# Flexible Search Algorithm for Simple and Complex Questions | |
1. Initial search: | |
- For a simple question: "[Question keywords]" | |
- For a complex topic: "[Main topic] overview" | |
2. For each subsequent search: | |
- Choose one of these strategies: | |
a. Specify: | |
Add a more specific term or aspect related to the topic. | |
b. Broaden: | |
Remove a specific term or add "general" or "overview" to the query. | |
c. Pivot: | |
Choose a different but related term from the topic. | |
d. Compare: | |
Add "vs" or "compared to" along with a related term. | |
e. Question: | |
Rephrase the query as a question by adding "what", "how", "why", etc. | |
# Response Format | |
**Return the following JSON:** | |
{{ | |
"search_queries": [ | |
{{"engine": "search", "query": "Query 1"}}, | |
{{"engine": "shopping", "query": "Query 2"}}, | |
... | |
{{"engine": "search", "query": "Query {num_queries}"}} | |
] | |
}} | |
Remember: | |
- Generate {num_queries} unique and diverse search queries. | |
- Each query should explore a different aspect or approach to the topic. | |
- Ensure the queries cover various aspects of the manager's instructions. | |
- The "engine" field should be either "search" or "shopping" for each query. | |
""" | |
refine_query = self.get_llm(json_model=True) | |
refine_prompt = refine_query_template.format(manager_instructions=meta_prompt, num_queries=num_queries) | |
input = [ | |
{"role": "user", "content": "Generate search queries"}, | |
{"role": "assistant", "content": f"system_prompt:{refine_prompt}"} | |
] | |
guided_json = guided_json_search_query_two | |
if self.server == 'vllm': | |
refined_queries = refine_query.invoke(input, guided_json) | |
else: | |
refined_queries = refine_query.invoke(input) | |
refined_queries_json = json.loads(refined_queries) | |
return refined_queries_json.get("search_queries", []) | |
def process_serper_result(self, query, serper_response ): | |
best_url_template = """ | |
Given the serper results, and the search query, select the best URL | |
# Search Query | |
{search_query} | |
# Serper Results | |
{serper_results} | |
**Return the following JSON:** | |
{{"best_url": The URL of the serper results that aligns most with the search query.}} | |
""" | |
best_url = self.get_llm(json_model=True) | |
best_url_prompt = best_url_template.format(search_query=query["query"], serper_results=serper_response) | |
input = [ | |
{"role": "user", "content": serper_response}, | |
{"role": "assistant", "content": f"system_prompt:{best_url_prompt}"} | |
] | |
guided_json = guided_json_best_url_two | |
if self.server == 'vllm': | |
best_url = best_url.invoke(input, guided_json) | |
else: | |
best_url = best_url.invoke(input) | |
best_url_json = json.loads(best_url) | |
return {"query": query, "url": best_url_json.get("best_url")} | |
# return best_url_json.get("best_url") | |
def run(self, state: State) -> State: | |
meta_prompt = state["meta_prompt"][-1].content | |
print(colored(f"\n\n Meta-Prompt: {meta_prompt}\n\n", 'green')) | |
# Generate multiple search queries | |
search_queries = self.generate_search_queries(meta_prompt, num_queries=5) | |
print(colored(f"\n\n Generated Search Queries: {search_queries}\n\n", 'green')) | |
try: | |
# Use multiprocessing to call Serper tool for each query in parallel | |
with Pool(processes=min(cpu_count(), len(search_queries))) as pool: | |
serper_results = pool.starmap( | |
self.use_tool, | |
[("serper", query["engine"], query["query"], None) for query in search_queries] | |
) | |
# Collect shopping results separately | |
shopping_results = [result["results"] for result in serper_results if result["is_shopping"]] | |
if shopping_results: | |
state["expert_research_shopping"] = shopping_results | |
# Process Serper results to get best URLs | |
with Pool(processes=min(cpu_count(), len(serper_results))) as pool: | |
best_urls = pool.starmap( | |
self.process_serper_result, | |
[(query, result["results"]) for query, result in zip(search_queries, serper_results)] | |
# zip(search_queries, serper_results) | |
) | |
except Exception as e: | |
print(colored(f"Error in multithreaded processing: {str(e)}. Falling back to non-multithreaded approach.", "yellow")) | |
# Fallback to non-multithreaded approach | |
serper_results = [self.use_tool("serper", query["engine"], query["query"], None) for query in search_queries] | |
shopping_results = [result["results"] for result in serper_results if result["is_shopping"]] | |
if shopping_results: | |
state["expert_research_shopping"] = shopping_results | |
best_urls = [self.process_serper_result(query, result) for query, result in zip(search_queries, serper_results)] | |
# Remove duplicates from the list of URLs | |
unique_urls = list(dict.fromkeys(result["url"] for result in best_urls if result["url"] and result["query"]["engine"] == "search")) | |
# unique_urls = list(dict.fromkeys(url for url in best_urls if url)) | |
print(colored("\n\n Sourced data from {} sources:".format(len(unique_urls)), 'green')) | |
for i, url in enumerate(unique_urls, 1): | |
print(colored(" {}. {}".format(i, url), 'green')) | |
print() | |
try: | |
scraper_response = self.use_tool("rag", engine=None, tool_input=unique_urls, query=meta_prompt) | |
except Exception as e: | |
scraper_response = {"results": f"Error {e}: Failed to scrape results", "is_shopping": False} | |
updates = self.process_response(scraper_response, user_input="Research") | |
for key, value in updates.items(): | |
state = self.update_state(key, value, state) | |
return state | |
class Router(BaseAgent[State]): | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop) | |
self.llm = self.get_llm(json_model=True) | |
def get_prompt(self, state) -> str: | |
system_prompt = state["meta_prompt"][-1].content | |
return system_prompt | |
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]: | |
updates_conversation_history = { | |
"router_decision": [ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": f"{str(response)}"} | |
] | |
} | |
return updates_conversation_history | |
def get_conv_history(self, state: State) -> str: | |
pass | |
def get_user_input(self) -> str: | |
pass | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self, tool_input: str, mode: str) -> Any: | |
pass | |
# @log_function(logger) | |
def run(self, state: State) -> State: | |
router_template = """ | |
Given these instructions from your manager. | |
# Response from Manager | |
{manager_response} | |
**Return the following JSON:** | |
{{""router_decision: Return the next agent to pass control to.}} | |
**strictly** adhere to these **guidelines** for routing. | |
If your maneger's response contains "Expert Internet Researcher", return "tool_expert". | |
If your manager's response contains "Expert Planner" or "Expert Writer", return "no_tool_expert". | |
If your manager's response contains '>> FINAL ANSWER:', return "end_chat". | |
""" | |
system_prompt = router_template.format(manager_response=state["meta_prompt"][-1].content) | |
input = [ | |
{"role": "user", "content": ""}, | |
{"role": "assistant", "content": f"system_prompt:{system_prompt}"} | |
] | |
router = self.get_llm(json_model=True) | |
if self.server == 'vllm': | |
guided_json = guided_json_router_decision | |
router_response = router.invoke(input, guided_json) | |
else: | |
router_response = router.invoke(input) | |
router_response = json.loads(router_response) | |
router_response = router_response.get("router_decision") | |
state = self.update_state("router_decision", router_response, state) | |
return state |