File size: 17,227 Bytes
7781557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
import asyncio
import chainlit as cl
import json
import operator
import os

from typing import TypedDict, List, Annotated, Literal, Dict, Union, Optional 
from datetime import datetime



from langchain_core.tools import tool
from langchain_core.messages import AnyMessage, AIMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_core.pydantic_v1 import BaseModel, Field, conlist
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END, add_messages

from tavily import AsyncTavilyClient, TavilyClient

from .utils_actions import offer_actions
from .utils_pdf import generate_pdf_from_md


async def get_latest_news(customer):

    session_state = cl.user_session.get("session_state", None)

    await cl.Message(content=f"*Researching Latest News on {customer}*").send()

    if session_state.do_customer_research:
        print("Searching for news items ...")
        workflow = StateGraph(ResearchState)

        # Add nodes
        workflow.add_node("research", research_model)
        workflow.add_node("tools", tool_node)
        workflow.add_node("curate", select_and_process)
        workflow.add_node("write", write_report)
        workflow.add_node("publish", generete_pdf)
        # Set the entrypoint as route_query
        workflow.set_entry_point("research")

        # Determine which node is called next
        workflow.add_conditional_edges(
            "research",
            # Next, we pass in the function that will determine which node is called next.
            should_continue,
        )

        # Add a normal edge from `tools` to `research`.
        # This means that after `tools` is called, `research` node is called next in  order to determine if we should keep  or move to the 'curate' step
        workflow.add_edge("tools", "research")
        workflow.add_edge("curate","write")
        workflow.add_edge("write", "publish")  # Option in the future, to add another step and filter the documents retrieved using rerhank before writing the report
        workflow.add_edge("publish", END)  # Option in the future, to add another step and filter the documents retrieved using rerhank before writing the report

        app = workflow.compile()

        company = "HSBC"
        company_keywords = "banking, financial services, investment, wealth management, digital banking"
        # (Optional) exclude_keywords: Use this field when you need to differentiate the company from others with the same name in a different industry
        # or when you want to exclude specific types of documents or information. Leave it as an empty string ("") if not needed.
        exclude_keywords = "insurance"
        # You may uncomment your_additional_guidelines and HumanMessage and update the content with some guidelines of your own
        # your_additional_guidelines=f"Note that the {company} is ... / focus on ...."
        messages = [
            SystemMessage(content="You are an expert researcher ready to begin the information gathering process.")
            # ,HumanMessage(content=your_additional_guidelines)
        ]
        async for s in app.astream({"company": company, "company_keywords": company_keywords, "exclude_keywords": exclude_keywords, "messages":messages}, stream_mode="values"):
            message = s["messages"][-1]
            if isinstance(message, tuple):
                print(message)
            else:
                message.pretty_print()
    else:
        await cl.Message(content="*Searching for news items ...*").send()
        await asyncio.sleep(2)

        await cl.Message(content="*Curating 8 documents ...*").send()  
        await asyncio.sleep(3)

        await cl.Message(content="*Research complete. Generating report*").send()
        await asyncio.sleep(1)
        
        app_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        markdown_file_path = os.path.abspath(os.path.join(app_folder, 'reports', session_state.customer_research_report_md))

        await cl.Message(content=read_markdown_file(markdown_file_path)).send() 

    await offer_actions()


def read_markdown_file(file_path):
    try:
        print(file_path)
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
        return content
    except FileNotFoundError:
        return f"Error: The file {file_path} was not found."
    except Exception as e:
        return f"An error occurred while reading the file: {str(e)}"


# Define the research state
class ResearchState(TypedDict):
    company: str
    company_keywords: str
    exclude_keywords: str
    report: str
    # Declare a dictionary where:
    # - The outer dictionary has string keys.
    # - The inner dictionary can have keys of different types (e.g., str, int).
    # - The inner dictionary values can be of different types (e.g., str, float).
    documents: Dict[str, Dict[Union[str, int], Union[str, float]]]
    RAG_docs: Dict[str, Dict[Union[str, int], Union[str, float]]]
    messages: Annotated[list[AnyMessage], add_messages]

# Define the structure for the model's response, which includes citations.
class Citation(BaseModel):
    source_id: str = Field(
        ...,
        description="The url of a SPECIFIC source which justifies the answer.",
    )
    quote: str = Field(
        ...,
        description="The VERBATIM quote from the specified source that justifies the answer.",
    )


class QuotedAnswer(BaseModel):
    """Answer the user question based only on the given sources, and cite the sources used."""
    answer: str = Field(
        ...,
        description="The answer to the user question, which is based only on the given sources. Include any relevant sources in the answer as markdown hyperlinks. For example: 'This is a sample text ([url website](url))'"
    )
    citations: List[Citation] = Field(
        ..., description="Citations from the given sources that justify the answer."
    )
    
# Add Tavily's arguments to enhance the web search tool's capabilities
class TavilyQuery(BaseModel):
    query: str = Field(description="web search query")
    topic: str = Field(description="type of search, should be 'general' or 'news'. Choose 'news' ONLY when the company you searching is publicly traded and is likely to be featured on popular news")
    days: int = Field(description="number of days back to run 'news' search")
    # raw_content: bool = Field(description="include raw content from found sources, use it ONLY if you need more information besides the summary content provided")
    domains: Optional[List[str]] = Field(default=None, description="list of domains to include in the research. Useful when trying to gather information from trusted and relevant domains")
 

# Define the args_schema for the tavily_search tool using a multi-query approach, enabling more precise queries for Tavily.
class TavilySearchInput(BaseModel):
    sub_queries: List[TavilyQuery] = Field(description="set of sub-queries that can be answered in isolation")


class TavilyExtractInput(BaseModel):
    urls: List[str] = Field(description="list of a single or several URLs for extracting raw content to gather additional information")


@tool("tavily_search", args_schema=TavilySearchInput, return_direct=True)
async def tavily_search(sub_queries: List[TavilyQuery]):
    """Perform searches for each sub-query using the Tavily search tool concurrently."""  
    # Define a coroutine function to perform a single search with error handling
    async def perform_search(itm):
        try:
            # Add date to the query as we need the most recent results
            query_with_date = f"{itm.query} {datetime.now().strftime('%m-%Y')}"
            # Attempt to perform the search, hardcoding days to 90 (days will be used only when topic is news)
            response = await tavily_client.search(query=query_with_date, topic=itm.topic, days=itm.days, max_results=1)
            return response['results']
        except Exception as e:
            # Handle any exceptions, log them, and return an empty list
            print(f"Error occurred during search for query '{itm.query}': {str(e)}")
            return []
    
    # Run all the search tasks in parallel
    search_tasks = [perform_search(itm) for itm in sub_queries]
    search_responses = await asyncio.gather(*search_tasks)
    
    # Combine the results from all the responses
    search_results = []
    for response in search_responses:
        search_results.extend(response)
    await cl.Message(content=f"Searching for news items ...").send()    
    return search_results

# Code for adding Tavily Extract as a tool (found it more useful to use Tavily Extract in a separate node)
# @tool("tavily_extract", args_schema=TavilyExtractInput, return_direct=True)
# async def tavily_extract(urls: TavilyExtractInput):
#     """Extract raw content from urls to gather additional information."""
#     try:
#         response = await tavily_client.extract(urls=urls)
#         return response['results']
#     except Exception as e:
#         # Handle any exceptions, log them, and return an empty list
#         print(f"Error occurred during extract: {str(e)}")
#         return []


tools = [tavily_search]
tools_by_name = {tool.name: tool for tool in tools}
tavily_client = AsyncTavilyClient()


# Define an async custom research tool node to store Tavily's search results for improved processing and later on filtering
async def tool_node(state: ResearchState):
    docs = state.get('documents',{})
    docs_str = ""
    msgs = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        new_docs = await tool.ainvoke(tool_call["args"])
        for doc in new_docs:
            # Make sure that this document was not retrieved before
            if not docs or doc['url'] not in docs:
                docs[doc['url']] = doc
                docs_str += json.dumps(doc)
            # For Tavily Extract tool, checking if raw_content was retrieved a document
            # if doc.get('raw_content', None) and doc['url'] in docs:
            #     docs[doc['url']]['raw_content'] = doc['raw_content'] # add raw content retrieved by extract
            #     docs_str += json.dumps(doc)
        msgs.append(ToolMessage(content=f"Found the following new documents/information: {docs_str}", tool_call_id=tool_call["id"]))
    return {"messages": msgs, "documents": docs}
    
# Invoke a model with research tools to gather data about the company  
def research_model(state: ResearchState):
    prompt = f"""Today's date is {datetime.now().strftime('%d/%m/%Y')}.\n
You are an expert researcher tasked with gathering information for a quarterly report on recent developments in portfolio companies.\n
Your current objective is to gather documents about any significant events that occurred in the past quarter for the following company: {state['company']}.\n
The user has provided the following company keywords: {state['company_keywords']} to help you find documents relevant to the correct company.\n     
**Instructions:**\n
- Use the 'tavily_search' tool to search for relevant documents
- Focus on gathering documents by making appropriate tool calls
- If you believe you have gathered enough information, state 'I have gathered enough information and am ready to proceed.'
"""
    messages = state['messages'] + [SystemMessage(content=prompt)]
    model = ChatOpenAI(model="gpt-4o-mini",temperature=0)
    response = model.bind_tools(tools).invoke(messages)
    return {"messages": [response]}
    

# Define the function that decides whether to continue research using tools or proceed to writing the report
def should_continue(state: ResearchState) -> Literal["tools", "curate"]:
    messages = state['messages']
    last_message = messages[-1]
    # If the LLM makes a tool call, then we route to the "tools" node
    if last_message.tool_calls:
        return "tools"
    # Otherwise, we stop (reply to the user with citations)
    return "curate"

async def select_and_process(state: ResearchState):
    msg = "Curating Documents ...\n" 
    prompt = f"""You are an expert researcher specializing in analyzing portfolio companies.\n
Your current task is to review a list of documents and select the most relevant URLs related to recent developments for the following company: {state['company']}.\n
Be aware that some documents may refer to other companies with similar or identical names, potentially leading to conflicting information.\n
Your objective is to choose the documents that pertain to the correct company and provide the most consistent and synchronized information, using the following keywords provided by the user to help identify the correct company as a guide:{state['company_keywords']}.\n"""
    # Optionally include exclusion keywords if provided by the user 
    if state['exclude_keywords'] != "":
        prompt += f"""Additionally, if any form of the following exclusion words are present in the documents, do not include them and filter out those documents: {state['exclude_keywords']}.\n"""
    # Append the list of gathered documents to the prompt
    prompt += f"""\nHere is the list of documents gathered for your review:\n{state['documents']}\n\n"""

    # Use the model to filter documents and obtain relevant URLs structured as TavilyExtractInput
    messages = [SystemMessage(content=prompt)]  
    model = ChatOpenAI(model="gpt-4o-mini",temperature=0)
    relevant_urls = model.with_structured_output(TavilyExtractInput).invoke(messages)
    
    # Create a dictionary of relevant documents based on the URLs returned by the model
    RAG_docs = {url: state['documents'][url] for url in relevant_urls.urls if url in state['documents']}

    try:
        # Extract raw content from the selected URLs using the Tavily client
        response = await tavily_client.extract(urls=relevant_urls.urls)
        
        # Save the raw content into the RAG_docs dictionary for each URL
        msg += "Extracted raw content for:\n"
        for itm in response['results']:
            url = itm['url']
            msg += f"{url}\n" 
            raw_content = itm['raw_content']
            RAG_docs[url]['raw_content'] = raw_content
    except Exception as e:
        print(f"Error occurred during Tavily Extract request")
        
    msg += f"ֿֿ\n\nState of RAG documents that will be used for the report:\n\n{RAG_docs}"
        
    return {"messages": [AIMessage(content=msg)],"RAG_docs": RAG_docs}
            
# Define the function to write the report based on the retrieved documents.
async def write_report(state: ResearchState):
    # Create the prompt
    prompt = f"""Today's date is {datetime.now().strftime('%d/%m/%Y')}\n.
You are an expert researcher, writing a quarterly report about recent events in portfolio companies.\n
Your task is to write an in-depth, well-written, and detailed report on the following company: {state['company']}. in markdown syntax\n
Here are all the documents you should base your answer on:\n{state['RAG_docs']}\n""" 
    # messages = [state['messages'][-1]] + [SystemMessage(content=prompt)] 
    # Create a system message with the constructed prompt (no need to include entire chat history)
    messages = [SystemMessage(content=prompt)] 
    model = ChatOpenAI(model="gpt-4o-mini",temperature=0)
    response = model.with_structured_output(QuotedAnswer).invoke(messages)
    full_report = response.answer
    msg = "Curating Documents ...\n"
    await cl.Message(content=f"*Curating {len(response.citations)} documents ...*").send()   
    # Add Citations Section to the report
    full_report += "\n\n### Citations\n"
    for citation in response.citations:
        doc = state['RAG_docs'].get(citation.source_id)
        full_report += f"- [{doc.get('title',citation.source_id)}]({citation.source_id}): \"{citation.quote}\"\n"
    # We return a list, because this will get added to the existing list
    return {"messages": [AIMessage(content=f"Generated Report:\n{full_report}")], "report": full_report}

async def generete_pdf(state: ResearchState):
    await cl.Message(content=f"*Research complete. Generating report*").send()    
    directory = "reports"
    file_name = f"{state['company']} Quarterly Report {datetime.now().strftime('%Y-%m-%d')}"
    # Check if the directory exists
    if not os.path.exists(directory):
        # Create the directory
        os.makedirs(directory)

    markdown_file_path = f'{directory}/{file_name}.md'
    pdf_file_path = f'{directory}/{file_name}.pdf'

    session_state = cl.user_session.get("session_state", None)
    session_state.customer_research_report_md = f"{file_name}.md"
    session_state.customer_research_report_pdf = f"{file_name}.pdf"

    for file_path in [markdown_file_path, pdf_file_path]:
        if os.path.exists(file_path):
            os.remove(file_path)
            print(f"Existing file deleted: {file_path}")
    with open(markdown_file_path, 'w', encoding='utf-8') as md_file:
        md_file.write(state['report'])

    await cl.Message(content=state['report']).send() 

    msg = generate_pdf_from_md(state['report'], filename=pdf_file_path)

    return {"messages": [AIMessage(content=msg)]}