timeki's picture
fixs
ccd4b9e
raw
history blame
11.2 kB
import sys
import os
from contextlib import contextmanager
from langchain.schema import Document
from langgraph.graph import END, StateGraph
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod
from typing_extensions import TypedDict
from typing import List, Dict
from IPython.display import display, HTML, Image
from .chains.answer_chitchat import make_chitchat_node
from .chains.answer_ai_impact import make_ai_impact_node
from .chains.query_transformation import make_query_transform_node
from .chains.translation import make_translation_node
from .chains.intent_categorization import make_intent_categorization_node
from .chains.retrieve_documents import make_retriever_node
from .chains.answer_rag import make_rag_node
from .chains.graph_retriever import make_graph_retriever_node
from .chains.chitchat_categorization import make_chitchat_intent_categorization_node
from .chains.set_defaults import set_defaults
class GraphState(TypedDict):
"""
Represents the state of our graph.
"""
user_input : str
language : str
intent : str
search_graphs_chitchat : bool
query: str
remaining_questions : List[dict]
n_questions : int
answer: str
audience: str = "experts"
sources_input: List[str] = ["IPCC","IPBES"]
sources_auto: bool = True
min_year: int = 1960
max_year: int = None
documents: List[Document]
recommended_content : List[Document]
# graphs_returned: Dict[str,str]
def search(state): #TODO
return state
def answer_search(state):#TODO
return state
def route_intent(state):
intent = state["intent"]
if intent in ["chitchat","esg"]:
return "answer_chitchat"
# elif intent == "ai_impact":
# return "answer_ai_impact"
else:
# Search route
return "search"
def chitchat_route_intent(state):
intent = state["search_graphs_chitchat"]
if intent is True:
return "retrieve_graphs_chitchat"
elif intent is False:
return END
def route_translation(state):
if state["language"].lower() == "english":
return "transform_query"
else:
return "translate_query"
def route_based_on_relevant_docs(state,threshold_docs=0.2):
docs = [x for x in state["documents"] if x.metadata["reranking_score"] > threshold_docs]
if len(docs) > 0:
return "answer_rag"
else:
return "answer_rag_no_docs"
def make_id_dict(values):
return {k:k for k in values}
def make_graph_agent(llm, vectorstore_ipcc, vectorstore_graphs, reranker, threshold_docs=0.2):
workflow = StateGraph(GraphState)
# Define the node functions
categorize_intent = make_intent_categorization_node(llm)
transform_query = make_query_transform_node(llm)
translate_query = make_translation_node(llm)
answer_chitchat = make_chitchat_node(llm)
# answer_ai_impact = make_ai_impact_node(llm)
retrieve_documents = make_retriever_node(vectorstore_ipcc, reranker, llm)
retrieve_graphs = make_graph_retriever_node(vectorstore_graphs, reranker)
# answer_rag_graph = make_rag_graph_node(llm)
answer_rag = make_rag_node(llm, with_docs=True)
answer_rag_no_docs = make_rag_node(llm, with_docs=False)
chitchat_categorize_intent = make_chitchat_intent_categorization_node(llm)
# Define the nodes
workflow.add_node("set_defaults", set_defaults)
workflow.add_node("categorize_intent", categorize_intent)
workflow.add_node("search", search)
workflow.add_node("answer_search", answer_search)
workflow.add_node("transform_query", transform_query)
workflow.add_node("translate_query", translate_query)
# workflow.add_node("transform_query_ai", transform_query)
# workflow.add_node("translate_query_ai", translate_query)
workflow.add_node("answer_chitchat", answer_chitchat)
workflow.add_node("chitchat_categorize_intent", chitchat_categorize_intent)
# workflow.add_node("answer_ai_impact", answer_ai_impact)
workflow.add_node("retrieve_graphs", retrieve_graphs)
workflow.add_node("retrieve_graphs_chitchat", retrieve_graphs)
# workflow.add_node("retrieve_graphs_ai", retrieve_graphs)
# workflow.add_node("answer_rag_graph", answer_rag_graph)
# workflow.add_node("answer_rag_graph_ai", answer_rag_graph)
workflow.add_node("retrieve_documents", retrieve_documents)
workflow.add_node("answer_rag", answer_rag)
workflow.add_node("answer_rag_no_docs", answer_rag_no_docs)
# Entry point
workflow.set_entry_point("set_defaults")
# CONDITIONAL EDGES
workflow.add_conditional_edges(
"categorize_intent",
route_intent,
make_id_dict(["answer_chitchat","search"])
)
workflow.add_conditional_edges(
"chitchat_categorize_intent",
chitchat_route_intent,
make_id_dict(["retrieve_graphs_chitchat", END])
)
workflow.add_conditional_edges(
"search",
route_translation,
make_id_dict(["translate_query","transform_query"])
)
workflow.add_conditional_edges(
"retrieve_documents",
lambda state : "retrieve_documents" if len(state["remaining_questions"]) > 0 else "answer_search",
make_id_dict(["retrieve_documents","answer_search"])
)
workflow.add_conditional_edges(
"answer_search",
lambda x : route_based_on_relevant_docs(x,threshold_docs=threshold_docs),
make_id_dict(["answer_rag","answer_rag_no_docs"])
)
# Define the edges
workflow.add_edge("set_defaults", "categorize_intent")
workflow.add_edge("translate_query", "transform_query")
workflow.add_edge("transform_query", "retrieve_graphs")
# workflow.add_edge("retrieve_graphs", "answer_rag_graph")
workflow.add_edge("retrieve_graphs", "retrieve_documents")
# workflow.add_edge("answer_rag_graph", "retrieve_documents")
workflow.add_edge("answer_rag", END)
workflow.add_edge("answer_rag_no_docs", END)
workflow.add_edge("answer_chitchat", "chitchat_categorize_intent")
# workflow.add_edge("answer_chitchat", END)
# workflow.add_edge("answer_ai_impact", END)
workflow.add_edge("retrieve_graphs_chitchat", END)
# workflow.add_edge("answer_ai_impact", "translate_query_ai")
# workflow.add_edge("translate_query_ai", "transform_query_ai")
# workflow.add_edge("transform_query_ai", "retrieve_graphs_ai")
# workflow.add_edge("retrieve_graphs_ai", "answer_rag_graph_ai")
# workflow.add_edge("answer_rag_graph_ai", END)
# workflow.add_edge("retrieve_graphs_ai", END)
# Compile
app = workflow.compile()
return app
def display_graph(app):
display(
Image(
app.get_graph(xray = True).draw_mermaid_png(
draw_method=MermaidDrawMethod.API,
)
)
)
# import sys
# import os
# from contextlib import contextmanager
# from langchain.schema import Document
# from langgraph.graph import END, StateGraph
# from langchain_core.runnables.graph import CurveStyle, NodeColors, MermaidDrawMethod
# from typing_extensions import TypedDict
# from typing import List
# from IPython.display import display, HTML, Image
# from .chains.answer_chitchat import make_chitchat_node
# from .chains.answer_ai_impact import make_ai_impact_node
# from .chains.query_transformation import make_query_transform_node
# from .chains.translation import make_translation_node
# from .chains.intent_categorization import make_intent_categorization_node
# from .chains.retriever import make_retriever_node
# from .chains.answer_rag import make_rag_node
# class GraphState(TypedDict):
# """
# Represents the state of our graph.
# """
# user_input : str
# language : str
# intent : str
# query: str
# questions : List[dict]
# answer: str
# audience: str = "experts"
# sources_input: List[str] = ["auto"]
# documents: List[Document]
# def search(state):
# return {}
# def route_intent(state):
# intent = state["intent"]
# if intent in ["chitchat","esg"]:
# return "answer_chitchat"
# elif intent == "ai_impact":
# return "answer_ai_impact"
# else:
# # Search route
# return "search"
# def route_translation(state):
# if state["language"].lower() == "english":
# return "transform_query"
# else:
# return "translate_query"
# def route_based_on_relevant_docs(state,threshold_docs=0.2):
# docs = [x for x in state["documents"] if x.metadata["reranking_score"] > threshold_docs]
# if len(docs) > 0:
# return "answer_rag"
# else:
# return "answer_rag_no_docs"
# def make_id_dict(values):
# return {k:k for k in values}
# def make_graph_agent(llm,vectorstore,reranker,threshold_docs = 0.2):
# workflow = StateGraph(GraphState)
# # Define the node functions
# categorize_intent = make_intent_categorization_node(llm)
# transform_query = make_query_transform_node(llm)
# translate_query = make_translation_node(llm)
# answer_chitchat = make_chitchat_node(llm)
# answer_ai_impact = make_ai_impact_node(llm)
# retrieve_documents = make_retriever_node(vectorstore,reranker)
# answer_rag = make_rag_node(llm,with_docs=True)
# answer_rag_no_docs = make_rag_node(llm,with_docs=False)
# # Define the nodes
# workflow.add_node("categorize_intent", categorize_intent)
# workflow.add_node("search", search)
# workflow.add_node("transform_query", transform_query)
# workflow.add_node("translate_query", translate_query)
# workflow.add_node("answer_chitchat", answer_chitchat)
# workflow.add_node("answer_ai_impact", answer_ai_impact)
# workflow.add_node("retrieve_documents",retrieve_documents)
# workflow.add_node("answer_rag",answer_rag)
# workflow.add_node("answer_rag_no_docs",answer_rag_no_docs)
# # Entry point
# workflow.set_entry_point("categorize_intent")
# # CONDITIONAL EDGES
# workflow.add_conditional_edges(
# "categorize_intent",
# route_intent,
# make_id_dict(["answer_chitchat","answer_ai_impact","search"])
# )
# workflow.add_conditional_edges(
# "search",
# route_translation,
# make_id_dict(["translate_query","transform_query"])
# )
# workflow.add_conditional_edges(
# "retrieve_documents",
# lambda x : route_based_on_relevant_docs(x,threshold_docs=threshold_docs),
# make_id_dict(["answer_rag","answer_rag_no_docs"])
# )
# # Define the edges
# workflow.add_edge("translate_query", "transform_query")
# workflow.add_edge("transform_query", "retrieve_documents")
# workflow.add_edge("retrieve_documents", "answer_rag")
# workflow.add_edge("answer_rag", END)
# workflow.add_edge("answer_rag_no_docs", END)
# workflow.add_edge("answer_chitchat", END)
# workflow.add_edge("answer_ai_impact", END)
# # Compile
# app = workflow.compile()
# return app
# def display_graph(app):
# display(
# Image(
# app.get_graph(xray = True).draw_mermaid_png(
# draw_method=MermaidDrawMethod.API,
# )
# )
# )