import gradio as gr import getpass import os def _set_if_undefined(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"Please provide your {var}") _set_if_undefined("OPENAI_API_KEY") _set_if_undefined("LANGCHAIN_API_KEY") _set_if_undefined("TAVILY_API_KEY") # Optional, add tracing in LangSmith os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_PROJECT"] = "Multi-agent Collaboration" from typing import Annotated, List, Tuple, Union from langchain_community.tools.tavily_search import TavilySearchResults from langchain_core.tools import tool from langchain_experimental.tools import PythonREPLTool tavily_tool = TavilySearchResults(max_results=5) # This executes code locally, which can be unsafe python_repl_tool = PythonREPLTool() from langchain.agents import AgentExecutor, create_openai_tools_agent from langchain_core.messages import BaseMessage, HumanMessage from langchain_openai import ChatOpenAI def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str): # Each worker node will be given a name and some tools. prompt = ChatPromptTemplate.from_messages( [ ( "system", system_prompt, ), MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="agent_scratchpad"), ] ) agent = create_openai_tools_agent(llm, tools, prompt) executor = AgentExecutor(agent=agent, tools=tools) return executor def agent_node(state, agent, name): result = agent.invoke(state) return {"messages": [HumanMessage(content=result["output"], name=name)]} from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers.openai_functions import JsonOutputFunctionsParser members = ["Researcher", "Coder"] system_prompt = ( "You are a supervisor tasked with managing a conversation between the" " following workers: {members}. Given the following user request," " respond with the worker to act next. Each worker will perform a" " task and respond with their results and status. When finished," " respond with FINISH." ) # Our team supervisor is an LLM node. It just picks the next agent to process # and decides when the work is completed options = ["FINISH"] + members # Using openai function calling can make output parsing easier for us function_def = { "name": "route", "description": "Select the next role.", "parameters": { "title": "routeSchema", "type": "object", "properties": { "next": { "title": "Next", "anyOf": [ {"enum": options}, ], } }, "required": ["next"], }, } prompt = ChatPromptTemplate.from_messages( [ ("system", system_prompt), MessagesPlaceholder(variable_name="messages"), ( "system", "Given the conversation above, who should act next?" " Or should we FINISH? Select one of: {options}", ), ] ).partial(options=str(options), members=", ".join(members)) llm = ChatOpenAI(model="gpt-4-1106-preview") supervisor_chain = ( prompt | llm.bind_functions(functions=[function_def], function_call="route") | JsonOutputFunctionsParser() ) import operator from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict import functools from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langgraph.graph import StateGraph, END # The agent state is the input to each node in the graph class AgentState(TypedDict): # The annotation tells the graph that new messages will always # be added to the current states messages: Annotated[Sequence[BaseMessage], operator.add] # The 'next' field indicates where to route to next next: str research_agent = create_agent(llm, [tavily_tool], "You are a web researcher.") research_node = functools.partial(agent_node, agent=research_agent, name="Researcher") # NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION code_agent = create_agent( llm, [python_repl_tool], "You may generate safe python code to analyze data and generate charts using matplotlib.", ) code_node = functools.partial(agent_node, agent=code_agent, name="Coder") workflow = StateGraph(AgentState) workflow.add_node("Researcher", research_node) workflow.add_node("Coder", code_node) workflow.add_node("supervisor", supervisor_chain) for member in members: # We want our workers to ALWAYS "report back" to the supervisor when done workflow.add_edge(member, "supervisor") # The supervisor populates the "next" field in the graph state # which routes to a node or finishes conditional_map = {k: k for k in members} conditional_map["FINISH"] = END workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map) # Finally, add entrypoint workflow.set_entry_point("supervisor") graph = workflow.compile() ### def invoke(openai_api_key, topic, word_count=500): if (openai_api_key == ""): raise gr.Error("OpenAI API Key is required.") if (topic == ""): raise gr.Error("Topic is required.") #agentops.init(os.environ["AGENTOPS_API_KEY"]) os.environ["OPENAI_API_KEY"] = openai_api_key for s in graph.stream( { "messages": [ HumanMessage(content="Code hello world and print it to the terminal") ] } ): if "__end__" not in s: print(s) print("----") return result gr.close_all() demo = gr.Interface(fn = invoke, inputs = [gr.Textbox(label = "OpenAI API Key", type = "password", lines = 1), gr.Textbox(label = "Topic", value="TODO", lines = 1), gr.Number(label = "Word Count", value=1000, minimum=500, maximum=5000)], outputs = [gr.Markdown(label = "Generated Blog Post", value="TODO")], title = "Multi-Agent RAG: Blog Post Generation", description = "TODO") demo.launch()