import gradio as gr from huggingface_hub import InferenceClient """ For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference """ client = InferenceClient("microsoft/phi-2") #client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") def respond( message, history: list[tuple[str, str]], system_message, max_tokens, temperature, top_p, ): messages = [{"role": "system", "content": system_message}] for val in history: if val[0]: messages.append({"role": "user", "content": val[0]}) if val[1]: messages.append({"role": "assistant", "content": val[1]}) messages.append({"role": "user", "content": message}) response = "" for message in messages: print(message) for message in client.chat_completion( messages, max_tokens=max_tokens, stream=True, temperature=temperature, top_p=top_p, ): token = message.choices[0].delta.content response += token yield response """ For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface """ demo = gr.ChatInterface( respond, additional_inputs=[ gr.Textbox(value="You are a friendly Chatbot.", label="System message"), gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), gr.Slider( minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)", ), ], ) from typing import Annotated, Sequence, TypedDict import operator import functools from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage from langchain_community.tools.tavily_search import TavilySearchResults from langchain_experimental.tools import PythonREPLTool from langchain.agents import create_openai_tools_agent from langchain_huggingface import HuggingFacePipeline from langgraph.graph import StateGraph, END from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline # SETUP: HuggingFace Model and Pipeline #name = "meta-llama/Llama-3.2-1B" #name="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B" #name="deepseek-ai/deepseek-llm-7b-chat" #name="openai-community/gpt2" #name="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" #name="microsoft/Phi-3.5-mini-instruct" name="Qwen/Qwen2.5-7B-Instruct-1M" tokenizer = AutoTokenizer.from_pretrained(name,truncation=True) tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained(name) pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, device_map="auto", max_new_tokens=500, # text to generate for outputs ) print ("pipeline is created") # Wrap in LangChain's HuggingFacePipeline llm = HuggingFacePipeline(pipeline=pipe) # Members and Final Options members = ["Researcher", "Coder"] options = ["FINISH"] + members # Supervisor prompt system_prompt = ( "You are a supervisor tasked with managing a conversation between the following workers: {members}." " Given the following user request, respond with the workers to act next. Each worker will perform a task" " and respond with their results and status. When all workers are finished, respond with FINISH." ) # Prompt template required for the workflow prompt = ChatPromptTemplate.from_messages( [ ("system", system_prompt), MessagesPlaceholder(variable_name="messages"), ("system", "Given the conversation above, who should act next? Or Should we FINISH? Select one of: {options}"), ] ).partial(options=str(options), members=", ".join(members)) print ("Prompt Template created") # Supervisor routing logic def route_tool_response(llm_response: str) -> str: """ Parse the LLM response to determine the next step based on routing logic. Handles unexpected or poorly structured responses gracefully. """ # Normalize the LLM response #llm_response = llm_response.strip().lower() # Strip whitespace and make lowercase # Remove any prefixes like "Assistant:" or "System:" # if ":" in llm_response: # llm_response = llm_response.split(":")[-1].strip() # Check for "finish" or worker names in the response for member in members: #if member.lower() in llm_response: if member in llm_response: return member if "finish" in llm_response: return "FINISH" # If no valid response is found, return a fallback error return "Invalid" def supervisor_chain(state): """ Supervisor logic to interact with HuggingFacePipeline and decide the next worker. """ messages = state.get("messages", []) try: # Construct prompt for the supervisor user_prompt = prompt.format(messages=messages) # Generate the LLM's response llm_response = pipe(user_prompt, max_new_tokens=100)[0]["generated_text"] print(f"[DEBUG] LLM Response: {llm_response.strip()}") # Log LLM raw output # Route the response to determine the next action next_action = route_tool_response(llm_response) # Validate the next action if next_action not in options: raise ValueError(f"Invalid next action: '{next_action}'. Expected one of {options}.") # # Initialize intermediate_steps if not already present # if "intermediate_steps" not in state: # state["intermediate_steps"] = [] # # Append the supervisor decision to intermediate_steps # state["intermediate_steps"].append( # {"supervisor": "decision", "next_action": next_action} # ) print(f"[DEBUG] Next action decided: {next_action}") # Log decision return {"next": next_action, "messages": messages} # return {"next": next_action, "messages": messages, "intermediate_steps": state["intermediate_steps"]} except Exception as e: print(f"[ERROR] Supervisor chain failed: {e}") raise RuntimeError(f"Supervisor logic error: {str(e)}") # AgentState definition class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], operator.add] next: str # Create tools tavily_tool = TavilySearchResults(max_results=5) python_repl_tool = PythonREPLTool() # Create agents with their respective prompts research_agent = create_openai_tools_agent( llm=llm, tools=[tavily_tool], prompt=ChatPromptTemplate.from_messages( [ SystemMessage(content="You are a web researcher."), MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder ] ), ) print ("Created agents with their respective prompts") code_agent = create_openai_tools_agent( llm=llm, tools=[python_repl_tool], prompt=ChatPromptTemplate.from_messages( [ SystemMessage(content="You may generate safe Python code for analysis."), MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder ] ), ) print ("create_openai_tools_agent") # Create the workflow workflow = StateGraph(AgentState) # Nodes workflow.add_node("Researcher", research_agent) # Pass the agent directly (no .run required) workflow.add_node("Coder", code_agent) # Pass the agent directly workflow.add_node("supervisor", supervisor_chain) # Add edges for workflow transitions for member in members: workflow.add_edge(member, "supervisor") #workflow.add_conditional_edges( # "supervisor", # lambda x: x["next"], # {k: k for k in members} | {"FINISH": END} # Dynamically map workers to their actions #) workflow.add_conditional_edges( "supervisor", lambda x: x["next"], {"Researcher":"Researcher","Coder":"Coder","FINISH": END} ) print("[DEBUG] Workflow edges added: supervisor -> members/FINISH based on 'next'") # Define entry point workflow.set_entry_point("supervisor") print(workflow) # Compile the workflow graph = workflow.compile() from IPython.display import display, Image display(Image(graph.get_graph().draw_mermaid_png())) # Properly formatted initial state initial_state = { "messages": [ #HumanMessage(content="Code hello world and print it to the terminal.") # Correct format for user input HumanMessage(content="Write Code for printing \"hello world\" in Python. Keep it precise.") # Correct format for user input ] # , # "intermediate_steps": [] # Add this to track progress if needed } # Properly formatted second test state second_test = { "messages": [ HumanMessage(content="How is the weather in Sanfrancisco and Bangalore? Give research results") # Correct format for user input ] # , # "intermediate_steps": [] # Add this to track progress if needed } if __name__ == "__main__": #demo.launch() # Execute the workflow try: #print(f"[TRACE] Initial workflow state: {initial_state}") #result = graph.invoke(initial_state) #print("[INFO] Workflow Execution Complete.") #print(f"[TRACE] Workflow Result: {result}") # Final workflow result print(f"[TRACE] Initial workflow state: {second_test}") result2 = graph.invoke(second_test) print("[INFO] Workflow Execution Complete.") print(f"[TRACE] Workflow Result: {result2}") # Final workflow result except Exception as e: print(f"[ERROR] Workflow execution failed: {e}")