File size: 17,227 Bytes
7781557 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
import asyncio
import chainlit as cl
import json
import operator
import os
from typing import TypedDict, List, Annotated, Literal, Dict, Union, Optional
from datetime import datetime
from langchain_core.tools import tool
from langchain_core.messages import AnyMessage, AIMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_core.pydantic_v1 import BaseModel, Field, conlist
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END, add_messages
from tavily import AsyncTavilyClient, TavilyClient
from .utils_actions import offer_actions
from .utils_pdf import generate_pdf_from_md
async def get_latest_news(customer):
session_state = cl.user_session.get("session_state", None)
await cl.Message(content=f"*Researching Latest News on {customer}*").send()
if session_state.do_customer_research:
print("Searching for news items ...")
workflow = StateGraph(ResearchState)
# Add nodes
workflow.add_node("research", research_model)
workflow.add_node("tools", tool_node)
workflow.add_node("curate", select_and_process)
workflow.add_node("write", write_report)
workflow.add_node("publish", generete_pdf)
# Set the entrypoint as route_query
workflow.set_entry_point("research")
# Determine which node is called next
workflow.add_conditional_edges(
"research",
# Next, we pass in the function that will determine which node is called next.
should_continue,
)
# Add a normal edge from `tools` to `research`.
# This means that after `tools` is called, `research` node is called next in order to determine if we should keep or move to the 'curate' step
workflow.add_edge("tools", "research")
workflow.add_edge("curate","write")
workflow.add_edge("write", "publish") # Option in the future, to add another step and filter the documents retrieved using rerhank before writing the report
workflow.add_edge("publish", END) # Option in the future, to add another step and filter the documents retrieved using rerhank before writing the report
app = workflow.compile()
company = "HSBC"
company_keywords = "banking, financial services, investment, wealth management, digital banking"
# (Optional) exclude_keywords: Use this field when you need to differentiate the company from others with the same name in a different industry
# or when you want to exclude specific types of documents or information. Leave it as an empty string ("") if not needed.
exclude_keywords = "insurance"
# You may uncomment your_additional_guidelines and HumanMessage and update the content with some guidelines of your own
# your_additional_guidelines=f"Note that the {company} is ... / focus on ...."
messages = [
SystemMessage(content="You are an expert researcher ready to begin the information gathering process.")
# ,HumanMessage(content=your_additional_guidelines)
]
async for s in app.astream({"company": company, "company_keywords": company_keywords, "exclude_keywords": exclude_keywords, "messages":messages}, stream_mode="values"):
message = s["messages"][-1]
if isinstance(message, tuple):
print(message)
else:
message.pretty_print()
else:
await cl.Message(content="*Searching for news items ...*").send()
await asyncio.sleep(2)
await cl.Message(content="*Curating 8 documents ...*").send()
await asyncio.sleep(3)
await cl.Message(content="*Research complete. Generating report*").send()
await asyncio.sleep(1)
app_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
markdown_file_path = os.path.abspath(os.path.join(app_folder, 'reports', session_state.customer_research_report_md))
await cl.Message(content=read_markdown_file(markdown_file_path)).send()
await offer_actions()
def read_markdown_file(file_path):
try:
print(file_path)
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content
except FileNotFoundError:
return f"Error: The file {file_path} was not found."
except Exception as e:
return f"An error occurred while reading the file: {str(e)}"
# Define the research state
class ResearchState(TypedDict):
company: str
company_keywords: str
exclude_keywords: str
report: str
# Declare a dictionary where:
# - The outer dictionary has string keys.
# - The inner dictionary can have keys of different types (e.g., str, int).
# - The inner dictionary values can be of different types (e.g., str, float).
documents: Dict[str, Dict[Union[str, int], Union[str, float]]]
RAG_docs: Dict[str, Dict[Union[str, int], Union[str, float]]]
messages: Annotated[list[AnyMessage], add_messages]
# Define the structure for the model's response, which includes citations.
class Citation(BaseModel):
source_id: str = Field(
...,
description="The url of a SPECIFIC source which justifies the answer.",
)
quote: str = Field(
...,
description="The VERBATIM quote from the specified source that justifies the answer.",
)
class QuotedAnswer(BaseModel):
"""Answer the user question based only on the given sources, and cite the sources used."""
answer: str = Field(
...,
description="The answer to the user question, which is based only on the given sources. Include any relevant sources in the answer as markdown hyperlinks. For example: 'This is a sample text ([url website](url))'"
)
citations: List[Citation] = Field(
..., description="Citations from the given sources that justify the answer."
)
# Add Tavily's arguments to enhance the web search tool's capabilities
class TavilyQuery(BaseModel):
query: str = Field(description="web search query")
topic: str = Field(description="type of search, should be 'general' or 'news'. Choose 'news' ONLY when the company you searching is publicly traded and is likely to be featured on popular news")
days: int = Field(description="number of days back to run 'news' search")
# raw_content: bool = Field(description="include raw content from found sources, use it ONLY if you need more information besides the summary content provided")
domains: Optional[List[str]] = Field(default=None, description="list of domains to include in the research. Useful when trying to gather information from trusted and relevant domains")
# Define the args_schema for the tavily_search tool using a multi-query approach, enabling more precise queries for Tavily.
class TavilySearchInput(BaseModel):
sub_queries: List[TavilyQuery] = Field(description="set of sub-queries that can be answered in isolation")
class TavilyExtractInput(BaseModel):
urls: List[str] = Field(description="list of a single or several URLs for extracting raw content to gather additional information")
@tool("tavily_search", args_schema=TavilySearchInput, return_direct=True)
async def tavily_search(sub_queries: List[TavilyQuery]):
"""Perform searches for each sub-query using the Tavily search tool concurrently."""
# Define a coroutine function to perform a single search with error handling
async def perform_search(itm):
try:
# Add date to the query as we need the most recent results
query_with_date = f"{itm.query} {datetime.now().strftime('%m-%Y')}"
# Attempt to perform the search, hardcoding days to 90 (days will be used only when topic is news)
response = await tavily_client.search(query=query_with_date, topic=itm.topic, days=itm.days, max_results=1)
return response['results']
except Exception as e:
# Handle any exceptions, log them, and return an empty list
print(f"Error occurred during search for query '{itm.query}': {str(e)}")
return []
# Run all the search tasks in parallel
search_tasks = [perform_search(itm) for itm in sub_queries]
search_responses = await asyncio.gather(*search_tasks)
# Combine the results from all the responses
search_results = []
for response in search_responses:
search_results.extend(response)
await cl.Message(content=f"Searching for news items ...").send()
return search_results
# Code for adding Tavily Extract as a tool (found it more useful to use Tavily Extract in a separate node)
# @tool("tavily_extract", args_schema=TavilyExtractInput, return_direct=True)
# async def tavily_extract(urls: TavilyExtractInput):
# """Extract raw content from urls to gather additional information."""
# try:
# response = await tavily_client.extract(urls=urls)
# return response['results']
# except Exception as e:
# # Handle any exceptions, log them, and return an empty list
# print(f"Error occurred during extract: {str(e)}")
# return []
tools = [tavily_search]
tools_by_name = {tool.name: tool for tool in tools}
tavily_client = AsyncTavilyClient()
# Define an async custom research tool node to store Tavily's search results for improved processing and later on filtering
async def tool_node(state: ResearchState):
docs = state.get('documents',{})
docs_str = ""
msgs = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
new_docs = await tool.ainvoke(tool_call["args"])
for doc in new_docs:
# Make sure that this document was not retrieved before
if not docs or doc['url'] not in docs:
docs[doc['url']] = doc
docs_str += json.dumps(doc)
# For Tavily Extract tool, checking if raw_content was retrieved a document
# if doc.get('raw_content', None) and doc['url'] in docs:
# docs[doc['url']]['raw_content'] = doc['raw_content'] # add raw content retrieved by extract
# docs_str += json.dumps(doc)
msgs.append(ToolMessage(content=f"Found the following new documents/information: {docs_str}", tool_call_id=tool_call["id"]))
return {"messages": msgs, "documents": docs}
# Invoke a model with research tools to gather data about the company
def research_model(state: ResearchState):
prompt = f"""Today's date is {datetime.now().strftime('%d/%m/%Y')}.\n
You are an expert researcher tasked with gathering information for a quarterly report on recent developments in portfolio companies.\n
Your current objective is to gather documents about any significant events that occurred in the past quarter for the following company: {state['company']}.\n
The user has provided the following company keywords: {state['company_keywords']} to help you find documents relevant to the correct company.\n
**Instructions:**\n
- Use the 'tavily_search' tool to search for relevant documents
- Focus on gathering documents by making appropriate tool calls
- If you believe you have gathered enough information, state 'I have gathered enough information and am ready to proceed.'
"""
messages = state['messages'] + [SystemMessage(content=prompt)]
model = ChatOpenAI(model="gpt-4o-mini",temperature=0)
response = model.bind_tools(tools).invoke(messages)
return {"messages": [response]}
# Define the function that decides whether to continue research using tools or proceed to writing the report
def should_continue(state: ResearchState) -> Literal["tools", "curate"]:
messages = state['messages']
last_message = messages[-1]
# If the LLM makes a tool call, then we route to the "tools" node
if last_message.tool_calls:
return "tools"
# Otherwise, we stop (reply to the user with citations)
return "curate"
async def select_and_process(state: ResearchState):
msg = "Curating Documents ...\n"
prompt = f"""You are an expert researcher specializing in analyzing portfolio companies.\n
Your current task is to review a list of documents and select the most relevant URLs related to recent developments for the following company: {state['company']}.\n
Be aware that some documents may refer to other companies with similar or identical names, potentially leading to conflicting information.\n
Your objective is to choose the documents that pertain to the correct company and provide the most consistent and synchronized information, using the following keywords provided by the user to help identify the correct company as a guide:{state['company_keywords']}.\n"""
# Optionally include exclusion keywords if provided by the user
if state['exclude_keywords'] != "":
prompt += f"""Additionally, if any form of the following exclusion words are present in the documents, do not include them and filter out those documents: {state['exclude_keywords']}.\n"""
# Append the list of gathered documents to the prompt
prompt += f"""\nHere is the list of documents gathered for your review:\n{state['documents']}\n\n"""
# Use the model to filter documents and obtain relevant URLs structured as TavilyExtractInput
messages = [SystemMessage(content=prompt)]
model = ChatOpenAI(model="gpt-4o-mini",temperature=0)
relevant_urls = model.with_structured_output(TavilyExtractInput).invoke(messages)
# Create a dictionary of relevant documents based on the URLs returned by the model
RAG_docs = {url: state['documents'][url] for url in relevant_urls.urls if url in state['documents']}
try:
# Extract raw content from the selected URLs using the Tavily client
response = await tavily_client.extract(urls=relevant_urls.urls)
# Save the raw content into the RAG_docs dictionary for each URL
msg += "Extracted raw content for:\n"
for itm in response['results']:
url = itm['url']
msg += f"{url}\n"
raw_content = itm['raw_content']
RAG_docs[url]['raw_content'] = raw_content
except Exception as e:
print(f"Error occurred during Tavily Extract request")
msg += f"ֿֿ\n\nState of RAG documents that will be used for the report:\n\n{RAG_docs}"
return {"messages": [AIMessage(content=msg)],"RAG_docs": RAG_docs}
# Define the function to write the report based on the retrieved documents.
async def write_report(state: ResearchState):
# Create the prompt
prompt = f"""Today's date is {datetime.now().strftime('%d/%m/%Y')}\n.
You are an expert researcher, writing a quarterly report about recent events in portfolio companies.\n
Your task is to write an in-depth, well-written, and detailed report on the following company: {state['company']}. in markdown syntax\n
Here are all the documents you should base your answer on:\n{state['RAG_docs']}\n"""
# messages = [state['messages'][-1]] + [SystemMessage(content=prompt)]
# Create a system message with the constructed prompt (no need to include entire chat history)
messages = [SystemMessage(content=prompt)]
model = ChatOpenAI(model="gpt-4o-mini",temperature=0)
response = model.with_structured_output(QuotedAnswer).invoke(messages)
full_report = response.answer
msg = "Curating Documents ...\n"
await cl.Message(content=f"*Curating {len(response.citations)} documents ...*").send()
# Add Citations Section to the report
full_report += "\n\n### Citations\n"
for citation in response.citations:
doc = state['RAG_docs'].get(citation.source_id)
full_report += f"- [{doc.get('title',citation.source_id)}]({citation.source_id}): \"{citation.quote}\"\n"
# We return a list, because this will get added to the existing list
return {"messages": [AIMessage(content=f"Generated Report:\n{full_report}")], "report": full_report}
async def generete_pdf(state: ResearchState):
await cl.Message(content=f"*Research complete. Generating report*").send()
directory = "reports"
file_name = f"{state['company']} Quarterly Report {datetime.now().strftime('%Y-%m-%d')}"
# Check if the directory exists
if not os.path.exists(directory):
# Create the directory
os.makedirs(directory)
markdown_file_path = f'{directory}/{file_name}.md'
pdf_file_path = f'{directory}/{file_name}.pdf'
session_state = cl.user_session.get("session_state", None)
session_state.customer_research_report_md = f"{file_name}.md"
session_state.customer_research_report_pdf = f"{file_name}.pdf"
for file_path in [markdown_file_path, pdf_file_path]:
if os.path.exists(file_path):
os.remove(file_path)
print(f"Existing file deleted: {file_path}")
with open(markdown_file_path, 'w', encoding='utf-8') as md_file:
md_file.write(state['report'])
await cl.Message(content=state['report']).send()
msg = generate_pdf_from_md(state['report'], filename=pdf_file_path)
return {"messages": [AIMessage(content=msg)]}
|