|
import asyncio |
|
import chainlit as cl |
|
import json |
|
import operator |
|
import os |
|
|
|
from typing import TypedDict, List, Annotated, Literal, Dict, Union, Optional |
|
from datetime import datetime |
|
|
|
|
|
|
|
from langchain_core.tools import tool |
|
from langchain_core.messages import AnyMessage, AIMessage, SystemMessage, HumanMessage, ToolMessage |
|
from langchain_core.pydantic_v1 import BaseModel, Field, conlist |
|
from langchain_openai import ChatOpenAI |
|
from langgraph.graph import StateGraph, START, END, add_messages |
|
|
|
from tavily import AsyncTavilyClient, TavilyClient |
|
|
|
from .utils_actions import offer_actions |
|
from .utils_pdf import generate_pdf_from_md |
|
|
|
|
|
async def get_latest_news(customer): |
|
|
|
session_state = cl.user_session.get("session_state", None) |
|
|
|
await cl.Message(content=f"*Researching Latest News on {customer}*").send() |
|
|
|
if session_state.do_customer_research: |
|
print("Searching for news items ...") |
|
workflow = StateGraph(ResearchState) |
|
|
|
|
|
workflow.add_node("research", research_model) |
|
workflow.add_node("tools", tool_node) |
|
workflow.add_node("curate", select_and_process) |
|
workflow.add_node("write", write_report) |
|
workflow.add_node("publish", generete_pdf) |
|
|
|
workflow.set_entry_point("research") |
|
|
|
|
|
workflow.add_conditional_edges( |
|
"research", |
|
|
|
should_continue, |
|
) |
|
|
|
|
|
|
|
workflow.add_edge("tools", "research") |
|
workflow.add_edge("curate","write") |
|
workflow.add_edge("write", "publish") |
|
workflow.add_edge("publish", END) |
|
|
|
app = workflow.compile() |
|
|
|
company = "HSBC" |
|
company_keywords = "banking, financial services, investment, wealth management, digital banking" |
|
|
|
|
|
exclude_keywords = "insurance" |
|
|
|
|
|
messages = [ |
|
SystemMessage(content="You are an expert researcher ready to begin the information gathering process.") |
|
|
|
] |
|
async for s in app.astream({"company": company, "company_keywords": company_keywords, "exclude_keywords": exclude_keywords, "messages":messages}, stream_mode="values"): |
|
message = s["messages"][-1] |
|
if isinstance(message, tuple): |
|
print(message) |
|
else: |
|
message.pretty_print() |
|
else: |
|
await cl.Message(content="*Searching for news items ...*").send() |
|
await asyncio.sleep(2) |
|
|
|
await cl.Message(content="*Curating 8 documents ...*").send() |
|
await asyncio.sleep(3) |
|
|
|
await cl.Message(content="*Research complete. Generating report*").send() |
|
await asyncio.sleep(1) |
|
|
|
app_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
markdown_file_path = os.path.abspath(os.path.join(app_folder, 'reports', session_state.customer_research_report_md)) |
|
|
|
await cl.Message(content=read_markdown_file(markdown_file_path)).send() |
|
|
|
await offer_actions() |
|
|
|
|
|
def read_markdown_file(file_path): |
|
try: |
|
print(file_path) |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
return content |
|
except FileNotFoundError: |
|
return f"Error: The file {file_path} was not found." |
|
except Exception as e: |
|
return f"An error occurred while reading the file: {str(e)}" |
|
|
|
|
|
|
|
class ResearchState(TypedDict): |
|
company: str |
|
company_keywords: str |
|
exclude_keywords: str |
|
report: str |
|
|
|
|
|
|
|
|
|
documents: Dict[str, Dict[Union[str, int], Union[str, float]]] |
|
RAG_docs: Dict[str, Dict[Union[str, int], Union[str, float]]] |
|
messages: Annotated[list[AnyMessage], add_messages] |
|
|
|
|
|
class Citation(BaseModel): |
|
source_id: str = Field( |
|
..., |
|
description="The url of a SPECIFIC source which justifies the answer.", |
|
) |
|
quote: str = Field( |
|
..., |
|
description="The VERBATIM quote from the specified source that justifies the answer.", |
|
) |
|
|
|
|
|
class QuotedAnswer(BaseModel): |
|
"""Answer the user question based only on the given sources, and cite the sources used.""" |
|
answer: str = Field( |
|
..., |
|
description="The answer to the user question, which is based only on the given sources. Include any relevant sources in the answer as markdown hyperlinks. For example: 'This is a sample text ([url website](url))'" |
|
) |
|
citations: List[Citation] = Field( |
|
..., description="Citations from the given sources that justify the answer." |
|
) |
|
|
|
|
|
class TavilyQuery(BaseModel): |
|
query: str = Field(description="web search query") |
|
topic: str = Field(description="type of search, should be 'general' or 'news'. Choose 'news' ONLY when the company you searching is publicly traded and is likely to be featured on popular news") |
|
days: int = Field(description="number of days back to run 'news' search") |
|
|
|
domains: Optional[List[str]] = Field(default=None, description="list of domains to include in the research. Useful when trying to gather information from trusted and relevant domains") |
|
|
|
|
|
|
|
class TavilySearchInput(BaseModel): |
|
sub_queries: List[TavilyQuery] = Field(description="set of sub-queries that can be answered in isolation") |
|
|
|
|
|
class TavilyExtractInput(BaseModel): |
|
urls: List[str] = Field(description="list of a single or several URLs for extracting raw content to gather additional information") |
|
|
|
|
|
@tool("tavily_search", args_schema=TavilySearchInput, return_direct=True) |
|
async def tavily_search(sub_queries: List[TavilyQuery]): |
|
"""Perform searches for each sub-query using the Tavily search tool concurrently.""" |
|
|
|
async def perform_search(itm): |
|
try: |
|
|
|
query_with_date = f"{itm.query} {datetime.now().strftime('%m-%Y')}" |
|
|
|
response = await tavily_client.search(query=query_with_date, topic=itm.topic, days=itm.days, max_results=1) |
|
return response['results'] |
|
except Exception as e: |
|
|
|
print(f"Error occurred during search for query '{itm.query}': {str(e)}") |
|
return [] |
|
|
|
|
|
search_tasks = [perform_search(itm) for itm in sub_queries] |
|
search_responses = await asyncio.gather(*search_tasks) |
|
|
|
|
|
search_results = [] |
|
for response in search_responses: |
|
search_results.extend(response) |
|
await cl.Message(content=f"Searching for news items ...").send() |
|
return search_results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tools = [tavily_search] |
|
tools_by_name = {tool.name: tool for tool in tools} |
|
tavily_client = AsyncTavilyClient() |
|
|
|
|
|
|
|
async def tool_node(state: ResearchState): |
|
docs = state.get('documents',{}) |
|
docs_str = "" |
|
msgs = [] |
|
for tool_call in state["messages"][-1].tool_calls: |
|
tool = tools_by_name[tool_call["name"]] |
|
new_docs = await tool.ainvoke(tool_call["args"]) |
|
for doc in new_docs: |
|
|
|
if not docs or doc['url'] not in docs: |
|
docs[doc['url']] = doc |
|
docs_str += json.dumps(doc) |
|
|
|
|
|
|
|
|
|
msgs.append(ToolMessage(content=f"Found the following new documents/information: {docs_str}", tool_call_id=tool_call["id"])) |
|
return {"messages": msgs, "documents": docs} |
|
|
|
|
|
def research_model(state: ResearchState): |
|
prompt = f"""Today's date is {datetime.now().strftime('%d/%m/%Y')}.\n |
|
You are an expert researcher tasked with gathering information for a quarterly report on recent developments in portfolio companies.\n |
|
Your current objective is to gather documents about any significant events that occurred in the past quarter for the following company: {state['company']}.\n |
|
The user has provided the following company keywords: {state['company_keywords']} to help you find documents relevant to the correct company.\n |
|
**Instructions:**\n |
|
- Use the 'tavily_search' tool to search for relevant documents |
|
- Focus on gathering documents by making appropriate tool calls |
|
- If you believe you have gathered enough information, state 'I have gathered enough information and am ready to proceed.' |
|
""" |
|
messages = state['messages'] + [SystemMessage(content=prompt)] |
|
model = ChatOpenAI(model="gpt-4o-mini",temperature=0) |
|
response = model.bind_tools(tools).invoke(messages) |
|
return {"messages": [response]} |
|
|
|
|
|
|
|
def should_continue(state: ResearchState) -> Literal["tools", "curate"]: |
|
messages = state['messages'] |
|
last_message = messages[-1] |
|
|
|
if last_message.tool_calls: |
|
return "tools" |
|
|
|
return "curate" |
|
|
|
async def select_and_process(state: ResearchState): |
|
msg = "Curating Documents ...\n" |
|
prompt = f"""You are an expert researcher specializing in analyzing portfolio companies.\n |
|
Your current task is to review a list of documents and select the most relevant URLs related to recent developments for the following company: {state['company']}.\n |
|
Be aware that some documents may refer to other companies with similar or identical names, potentially leading to conflicting information.\n |
|
Your objective is to choose the documents that pertain to the correct company and provide the most consistent and synchronized information, using the following keywords provided by the user to help identify the correct company as a guide:{state['company_keywords']}.\n""" |
|
|
|
if state['exclude_keywords'] != "": |
|
prompt += f"""Additionally, if any form of the following exclusion words are present in the documents, do not include them and filter out those documents: {state['exclude_keywords']}.\n""" |
|
|
|
prompt += f"""\nHere is the list of documents gathered for your review:\n{state['documents']}\n\n""" |
|
|
|
|
|
messages = [SystemMessage(content=prompt)] |
|
model = ChatOpenAI(model="gpt-4o-mini",temperature=0) |
|
relevant_urls = model.with_structured_output(TavilyExtractInput).invoke(messages) |
|
|
|
|
|
RAG_docs = {url: state['documents'][url] for url in relevant_urls.urls if url in state['documents']} |
|
|
|
try: |
|
|
|
response = await tavily_client.extract(urls=relevant_urls.urls) |
|
|
|
|
|
msg += "Extracted raw content for:\n" |
|
for itm in response['results']: |
|
url = itm['url'] |
|
msg += f"{url}\n" |
|
raw_content = itm['raw_content'] |
|
RAG_docs[url]['raw_content'] = raw_content |
|
except Exception as e: |
|
print(f"Error occurred during Tavily Extract request") |
|
|
|
msg += f"ֿֿ\n\nState of RAG documents that will be used for the report:\n\n{RAG_docs}" |
|
|
|
return {"messages": [AIMessage(content=msg)],"RAG_docs": RAG_docs} |
|
|
|
|
|
async def write_report(state: ResearchState): |
|
|
|
prompt = f"""Today's date is {datetime.now().strftime('%d/%m/%Y')}\n. |
|
You are an expert researcher, writing a quarterly report about recent events in portfolio companies.\n |
|
Your task is to write an in-depth, well-written, and detailed report on the following company: {state['company']}. in markdown syntax\n |
|
Here are all the documents you should base your answer on:\n{state['RAG_docs']}\n""" |
|
|
|
|
|
messages = [SystemMessage(content=prompt)] |
|
model = ChatOpenAI(model="gpt-4o-mini",temperature=0) |
|
response = model.with_structured_output(QuotedAnswer).invoke(messages) |
|
full_report = response.answer |
|
msg = "Curating Documents ...\n" |
|
await cl.Message(content=f"*Curating {len(response.citations)} documents ...*").send() |
|
|
|
full_report += "\n\n### Citations\n" |
|
for citation in response.citations: |
|
doc = state['RAG_docs'].get(citation.source_id) |
|
full_report += f"- [{doc.get('title',citation.source_id)}]({citation.source_id}): \"{citation.quote}\"\n" |
|
|
|
return {"messages": [AIMessage(content=f"Generated Report:\n{full_report}")], "report": full_report} |
|
|
|
async def generete_pdf(state: ResearchState): |
|
await cl.Message(content=f"*Research complete. Generating report*").send() |
|
directory = "reports" |
|
file_name = f"{state['company']} Quarterly Report {datetime.now().strftime('%Y-%m-%d')}" |
|
|
|
if not os.path.exists(directory): |
|
|
|
os.makedirs(directory) |
|
|
|
markdown_file_path = f'{directory}/{file_name}.md' |
|
pdf_file_path = f'{directory}/{file_name}.pdf' |
|
|
|
session_state = cl.user_session.get("session_state", None) |
|
session_state.customer_research_report_md = f"{file_name}.md" |
|
session_state.customer_research_report_pdf = f"{file_name}.pdf" |
|
|
|
for file_path in [markdown_file_path, pdf_file_path]: |
|
if os.path.exists(file_path): |
|
os.remove(file_path) |
|
print(f"Existing file deleted: {file_path}") |
|
with open(markdown_file_path, 'w', encoding='utf-8') as md_file: |
|
md_file.write(state['report']) |
|
|
|
await cl.Message(content=state['report']).send() |
|
|
|
msg = generate_pdf_from_md(state['report'], filename=pdf_file_path) |
|
|
|
return {"messages": [AIMessage(content=msg)]} |
|
|
|
|