Spaces:
Build error
Build error
from __future__ import annotations | |
import re | |
from abc import abstractmethod | |
import json | |
from typing import Union, List, Tuple, NamedTuple, TYPE_CHECKING | |
from . import output_parser_registry | |
from agentverse.utils import AgentAction, AgentFinish, AgentCriticism | |
from agentverse.llms import LLMResult | |
from agentverse.logging import logger | |
from pydantic import BaseModel | |
if TYPE_CHECKING: | |
from agentverse.agents.base import BaseAgent | |
from agentverse.environments.base import BaseEnvironment | |
class OutputParserError(Exception): | |
"""Exception raised when parsing output from a command fails.""" | |
def __init__(self, message): | |
self.message = message | |
def __str__(self): | |
return "Failed to parse output of the model:%s\n " % self.message | |
class OutputParser(BaseModel): | |
"""Base class for output parsers.""" | |
def parse(self, output: LLMResult) -> NamedTuple: | |
pass | |
class AliceHomeParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
cleaned_output = text.strip() | |
cleaned_output = re.sub(r"\n+", "\n", cleaned_output) | |
cleaned_output = cleaned_output.split("\n") | |
if not ( | |
len(cleaned_output) == 2 | |
and cleaned_output[0].startswith("Thought:") | |
and cleaned_output[1].startswith("Action:") | |
): | |
raise OutputParserError(text) | |
action = cleaned_output[1][len("Action:") :].strip() | |
return AgentFinish({"output": action}, text) | |
class CommonParser1(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
cleaned_output = text.strip() | |
cleaned_output = re.sub(r"\n+", "\n", cleaned_output) | |
cleaned_output = cleaned_output.split("\n") | |
if not ( | |
len(cleaned_output) == 3 | |
and cleaned_output[0].startswith("Thought:") | |
and cleaned_output[1].startswith("Action:") | |
and cleaned_output[2].startswith("Action Input:") | |
): | |
raise OutputParserError(text) | |
action = cleaned_output[1][len("Action:") :].strip() | |
action_input = cleaned_output[2][len("Action Input:") :].strip() | |
if action in ["Speak"]: | |
return AgentFinish({"output": action_input}, text) | |
elif action == "CallOn": | |
return AgentFinish({"output": "[CallOn] " + action_input}, text) | |
elif action == "RaiseHand": | |
return AgentFinish({"output": "[RaiseHand] " + action_input}, text) | |
elif action == "Listen": | |
return AgentFinish({"output": ""}, text) | |
else: | |
return AgentAction(action.lower(), action_input, text) | |
class MathProblem2PlayersToolsParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
cleaned_output = text.strip() | |
cleaned_output = re.sub(r"\n+", "\n", cleaned_output) | |
cleaned_output = cleaned_output.split("\n") | |
if not ( | |
len(cleaned_output) == 2 | |
and cleaned_output[0].startswith("Action:") | |
and cleaned_output[1].startswith("Action Input:") | |
): | |
raise OutputParserError(text) | |
action = cleaned_output[0][len("Action:") :].strip() | |
action_input = cleaned_output[1][len("Action Input:") :].strip() | |
if action == "Speak": | |
return AgentFinish({"output": action_input}, text) | |
else: | |
return AgentAction(action, action_input, text) | |
class NlpClassroom3PlayersParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
cleaned_output = text.strip() | |
cleaned_output = re.sub(r"\n+", "\n", cleaned_output) | |
cleaned_output = cleaned_output.split("\n") | |
if not ( | |
len(cleaned_output) == 2 | |
and cleaned_output[0].startswith("Action:") | |
and cleaned_output[1].startswith("Action Input:") | |
): | |
raise OutputParserError(text) | |
action = cleaned_output[0][len("Action:") :].strip() | |
action_input = cleaned_output[1][len("Action Input:") :].strip() | |
if action == "Speak": | |
return AgentFinish({"output": action_input}, text) | |
else: | |
raise OutputParserError(text) | |
class NlpClassroom9PlayersParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
cleaned_output = text.strip() | |
cleaned_output = re.sub(r"\n+", "\n", cleaned_output) | |
cleaned_output = cleaned_output.split("\n") | |
if not ( | |
len(cleaned_output) >= 2 | |
and cleaned_output[0].startswith("Action:") | |
and cleaned_output[1].startswith("Action Input:") | |
): | |
raise OutputParserError(text) | |
action = cleaned_output[0][len("Action:") :].strip() | |
# action_input = cleaned_output[1][len("Action Input:") :].strip() | |
action_input = "\n".join(cleaned_output[1:]).strip()[len("Action Input:") :].strip() | |
if action == "Speak": | |
return AgentFinish({"output": action_input}, text) | |
elif action == "CallOn": | |
return AgentFinish({"output": "[CallOn] " + action_input}, text) | |
elif action == "RaiseHand": | |
return AgentFinish({"output": "[RaiseHand] " + action_input}, text) | |
elif action == "Listen": | |
return AgentFinish({"output": ""}, text) | |
else: | |
return AgentAction(action, action_input, text) | |
class NlpClassroom9PlayersGroupParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
cleaned_output = text.strip() | |
cleaned_output = re.sub(r"\n+", "\n", cleaned_output) | |
cleaned_output = cleaned_output.split("\n") | |
if not ( | |
len(cleaned_output) == 2 | |
and cleaned_output[0].startswith("Action:") | |
and cleaned_output[1].startswith("Action Input:") | |
): | |
raise OutputParserError(text) | |
action = cleaned_output[0][len("Action:") :].strip() | |
action_input = cleaned_output[1][len("Action Input:") :].strip() | |
if action == "Speak": | |
return AgentFinish({"output": action_input}, text) | |
elif action in ["CallOn", "RaiseHand", "GroupDiscuss"]: | |
return AgentFinish({"output": f"[{action}] {action_input}"}, text) | |
elif action == "Listen": | |
return AgentFinish({"output": ""}, text) | |
else: | |
return AgentAction(action, action_input, text) | |
class PokemonParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
cleaned_output = text.strip() | |
cleaned_output = re.sub(r"\n+", "\n", cleaned_output) | |
cleaned_output = cleaned_output.split("\n") | |
if not ( | |
len(cleaned_output) == 3 | |
and cleaned_output[0].startswith("Thought:") | |
and cleaned_output[1].startswith("Action:") | |
and cleaned_output[2].startswith("Action Input:") | |
): | |
raise OutputParserError(text) | |
action = cleaned_output[1][len("Action:") :].strip() | |
action_input = cleaned_output[2][len("Action Input:") :].strip() | |
try: | |
action_input = json.loads(action_input) | |
except json.JSONDecodeError: | |
raise OutputParserError(text) | |
action_input["action"] = action | |
return AgentFinish({"output": json.dumps(action_input)}, text) | |
class PrisonerDilemmaParser(OutputParser): | |
# make sure 1 1 2 2 3 3 | |
cur_round: int = 1 | |
encounter_cur_round: bool = False | |
def parse( | |
self, agent: "BaseAgent", environment: "BaseEnvironment", output: LLMResult | |
) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
cleaned_output = text.strip() | |
cleaned_output = re.sub(r"\n+", "\n", cleaned_output) | |
cleaned_output = cleaned_output.split("\n") | |
if not ( | |
len(cleaned_output) >= 2 | |
and cleaned_output[0].startswith("Action:") | |
and cleaned_output[1].startswith("Action Input:") | |
): | |
raise OutputParserError(text) | |
action = cleaned_output[0][len("Action:") :].strip() | |
action_input = "\n".join(cleaned_output[1:]).strip()[len("Action Input:") :].strip() | |
if action == "Speak": | |
# make sure the police count the round right | |
# if agent.name == "Police": | |
# action_input = re.sub(r'Round (\d+)', f'Round {self.cur_round}', action_input) | |
# self.cur_round += 1 | |
# if self.encounter_cur_round: | |
# self.encounter_cur_round = False | |
# self.cur_round += 1 | |
# else: | |
# self.encounter_cur_round = True | |
# each time police speak is a new round | |
if agent.name == "Police": | |
if environment.cnt_turn == (environment.max_turns - 4): | |
action_input = ( | |
"Attention! You are now required to made your final decision and I will made the " | |
"final judgement to both of you based on this time, Please Answer now !" | |
) | |
elif environment.cnt_turn == (environment.max_turns - 2): | |
action_input = "Attention! Suspect2, it's now your time to make your final decision, Please Answer now !" | |
# elif self.cur_round == 1: | |
# action_input = "Hey Listen! You are both arrested, and I am going to give you both a chance to walk out of here," \ | |
# "But you should comply with the following rules:" \ | |
# "- If one of you are willing to testifies against the other and the other one remains silent, then the one who testifies will be released IMMEDIATELY, while the silent one will be sentenced to TEN years in prison." \ | |
# "- If both of you remain silent, you will each receive a sentence of ONE year in prison." \ | |
# "- It seems that always testifying is a goog strategy, So! if you both choose to testify against each other, you will each receive a sentence of FIVE years in prison." \ | |
# "Now, it's your time to consider testifying or remaining silent. Remember this is a best chance you might ever have to walk out of here without guilty." \ | |
# "I will noticed both of you WHEN you have to make your final decision! Before that, try to make your best!" \ | |
self.cur_round += 1 | |
return AgentFinish({"output": action_input}, text) | |
else: | |
raise OutputParserError(text) | |
class CommonParser2(OutputParser): | |
# def parse(self, agent, env, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
return AgentFinish({"output": output.content}, output.content) | |
class RoleAssignerParser(OutputParser): | |
cnt_critic_agents: int = 0 | |
def parse(self, output: LLMResult) -> List[str]: | |
text = output.content | |
pattern = re.compile(r"\d\.\s*(.+)") | |
roles = pattern.findall(text) | |
if len(roles) < self.cnt_critic_agents: | |
logger.error( | |
f"Role assigner failed to assign roles to {self.cnt_critic_agents} critics!" | |
) | |
raise OutputParserError(text) | |
return roles | |
class EvaluatorParser(OutputParser): | |
dimensions: List[str] = None | |
def parse(self, output: LLMResult) -> Tuple[List[int], str]: | |
text = output.content | |
cleaned_output = re.sub(r"\n+", "\n", text.strip()) | |
checks = cleaned_output.split("\n") | |
patterns = [ | |
re.compile(r"(?:\d\.\s*)?" + dimension + r":\s*(\d)") | |
for dimension in self.dimensions | |
] | |
try: | |
# find score and advice | |
score = [ | |
int(pattern.findall(checks[i])[0]) for i, pattern in enumerate(patterns) | |
] | |
advice_text = "".join(checks[len(self.dimensions) :]) | |
advice = re.findall(r"(?:\d\.\s*)?Advice:\s*(.+)", advice_text)[0] | |
# logger.info("Evaluator give the following advice:\n" + advice) | |
except (IndexError, ValueError): | |
# logger.error("Bad response from evaluator!") | |
raise OutputParserError(text) | |
return score, advice | |
class HumanevalSolverParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
# start_pos = text.find("```") | |
# end_pos = text.rfind("```") | |
# if end_pos == -1: | |
# raise OutputParserError(text) | |
# text = text[start_pos:end_pos] | |
# cleaned_output = text.strip().strip("```").strip() | |
# if cleaned_output.startswith("python"): | |
# cleaned_output = cleaned_output[6:].strip() | |
# elif cleaned_output.startswith("python3"): | |
# cleaned_output = cleaned_output[7:].strip() | |
code = re.findall(r"```.*?\n(.+?)```", text, re.DOTALL)[-1] | |
return AgentFinish({"output": code}, text) | |
class HumanevalSolverParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
try: | |
parsed_result = re.findall( | |
r"Thought:(.+?)Reasoning:(.+?)Criticism:(.+?)File Path:(.+?)Code:(.+?)Command:(.+)", | |
text, | |
re.DOTALL, | |
)[0] | |
cleaned_output = { | |
"thought": parsed_result[0].strip(), | |
"reasoning": parsed_result[1].strip(), | |
"criticism": parsed_result[2].strip(), | |
"file_path": parsed_result[3].strip().strip("`"), | |
"code": parsed_result[4] | |
.strip() | |
.strip("```") | |
.strip("python") | |
.strip("python3"), | |
"command": parsed_result[5].strip().strip("`"), | |
} | |
except BaseException as e: | |
raise OutputParserError(text) | |
return AgentFinish({"output": cleaned_output}, text) | |
class HumanevalEvaluatorParser(OutputParser): | |
dimensions: List[str] = None | |
def parse(self, output: LLMResult) -> Tuple[List[int], str]: | |
text = output.content | |
cleaned_output = re.sub(r"\n+", "\n", text.strip()) | |
checks = cleaned_output.split("\n") | |
patterns = [ | |
re.compile(r"(?:\d.\s*)?" + dimension + r":\s*(\d)") | |
for dimension in self.dimensions | |
] | |
advice = "" | |
for check in reversed(checks): | |
advice = check + advice | |
if check.startswith("Advice:"): | |
break | |
checks[-1] = advice | |
try: | |
# find score and advice | |
score = [] | |
for pattern in patterns: | |
for check in checks[:-1]: | |
if pattern.findall(check): | |
score.append(bool(int(pattern.findall(check)[0]))) | |
break | |
advice = re.findall(r"(?:\d.\s*)?Advice:\s*(.+)", checks[-1])[0] | |
# logger.info("Evaluator give the following advice:\n" + advice) | |
except (IndexError, ValueError): | |
# logger.error("Bad response from evaluator!") | |
raise OutputParserError(text) | |
return score[0], advice | |
class HumanevalyCriticParser(OutputParser): | |
def parse(self, output: LLMResult) -> AgentCriticism: | |
text = output.content | |
if "[Agree]" in text: | |
return AgentCriticism(True, "") | |
else: | |
return AgentCriticism(False, text) | |
class MGSMEvaluatorParser(OutputParser): | |
dimensions: List[str] = None | |
def parse(self, output: LLMResult) -> Tuple[List[int], str]: | |
text = output.content | |
cleaned_output = re.sub(r"\n+", "\n", text.strip()) | |
# checks = cleaned_output.split("\n") | |
patterns = [ | |
re.compile(r"(?:\d.\s*)?" + dimension + r":\s*(\d)") | |
for dimension in self.dimensions | |
] | |
try: | |
# find score and advice | |
score_num = [ | |
int(pattern.findall(cleaned_output)[0]) | |
for i, pattern in enumerate(patterns) | |
][0] | |
if score_num == 0: | |
score = False | |
elif score_num == 1: | |
score = True | |
else: | |
raise ValueError("Bad score!") | |
pat = re.compile(r"(?:\d.\s*)?Response:\s*(.+)", re.DOTALL) | |
advice = pat.findall(cleaned_output)[0] | |
# logger.info("Evaluator give the following advice:\n" + advice) | |
except (IndexError, ValueError): | |
# logger.error("Bad response from evaluator!") | |
raise OutputParserError(text) | |
return score, advice | |
class MGSMCriticAgreeParser(OutputParser): | |
def parse(self, output: LLMResult) -> AgentCriticism: | |
text = output.content | |
text = re.sub(r"\n+", "\n", text.strip()) | |
# checks = text.split("\n") | |
# if not text.startswith("Thought:"): | |
# raise OutputParserError(text) | |
# if not (checks[0].startswith("Action:")): | |
# raise OutputParserError(text) | |
# if checks[0].strip(". ") == "Action: Agree": | |
# return AgentCriticism(True, "") | |
if "[Agree]" in text: | |
return AgentCriticism(True, "") | |
else: | |
# pattern = re.compile(r"Action Input: ([\S\n ]+)") | |
# try: | |
# criticism = pattern.findall(text)[0].strip() | |
# criticism = ( | |
# re.findall(r"Output:\S?(.+)", text)[0].replace("[Wrong]", "") | |
# ).strip() | |
criticism = text.replace("[Disagree]", "").strip() | |
# except IndexError: | |
# logger.error("Bad response from critic!") | |
# raise OutputParserError(text) | |
# criticism = "I think the solution is not correct. Please think carefully and correct it." | |
return AgentCriticism(False, criticism) | |
# else: | |
# raise OutputParserError(text) | |
class ResponseGenEvaluatorParser(OutputParser): | |
dimensions: List[str] = None | |
def parse(self, output: LLMResult) -> Tuple[List[int], str]: | |
text = output.content | |
cleaned_output = re.sub(r"\n+", "\n", text.strip()) | |
checks = cleaned_output.split("\n") | |
patterns = [ | |
re.compile(r"(?:\d.\s*)?" + dimension + r":\s*(\d+)") | |
for dimension in self.dimensions | |
] | |
advice = "" | |
for check in reversed(checks): | |
advice = check + advice | |
if check.startswith("Advice:"): | |
break | |
checks[-1] = advice | |
try: | |
# find score and advice | |
score = [ | |
int(pattern.findall(checks[i])[0]) for i, pattern in enumerate(patterns) | |
] | |
advice = re.findall(r"(?:\d.\s*)?Advice:\s*(.+)", checks[-1])[0] | |
# logger.info("Evaluator give the following advice:\n" + advice) | |
except (IndexError, ValueError): | |
# logger.error("Bad response from evaluator!") | |
raise OutputParserError(text) | |
return score, advice | |
class CommonParser3(OutputParser): | |
def parse(self, output: LLMResult) -> AgentCriticism: | |
text = output.content | |
text = re.sub(r"\n+", "\n", text.strip()) | |
checks = text.split("\n") | |
if not (checks[0].startswith("Action:")): | |
raise OutputParserError(text) | |
if checks[0].strip(". ") == "Action: Agree": | |
return AgentCriticism(True, "") | |
elif checks[0].strip(". ") == "Action: Disagree": | |
pattern = re.compile(r"Action Input: ([\S\n ]+)") | |
try: | |
criticism = pattern.findall(text)[0].strip() | |
except IndexError: | |
criticism = ( | |
"I think it is not correct. Please think carefully and improve it." | |
) | |
# raise OutputParserError(text) | |
return AgentCriticism(False, criticism) | |
else: | |
raise OutputParserError(text) | |
class ResponseGenCriticParser(OutputParser): | |
def parse(self, output: LLMResult) -> AgentCriticism: | |
text = output.content | |
# text = re.sub(r"\n+", "\n", text.strip()) | |
# checks = text.split("\n") | |
# if not (checks[0].startswith("Action:")): | |
# raise OutputParserError(text) | |
# if checks[0].strip(". ") == "Action: Agree": | |
# return AgentCriticism(True, "") | |
# elif checks[0].strip(". ") == "Action: Disagree": | |
# pattern = re.compile(r"Action Input: ([\S\n ]+)") | |
# try: | |
# criticism = pattern.findall(text)[0].strip() | |
# except IndexError: | |
# # criticism = "I think the solution is not correct. Please think carefully and correct it." | |
# raise OutputParserError(text) | |
# return AgentCriticism(False, criticism) | |
# else: | |
# raise OutputParserError(text) | |
result = re.findall(r"Decision:(.+?)Response:(.+)", text, re.DOTALL) | |
if len(result) == 0: | |
result = ["Disagree", "I think the response can be further improved."] | |
else: | |
result = result[0] | |
if "Agree" in result[0]: | |
return AgentCriticism(True, "") | |
else: | |
return AgentCriticism(False, result[1].strip()) | |
class RoleAssignerParser(OutputParser): | |
cnt_critic_agents: int = 0 | |
def parse(self, output: LLMResult) -> List[str]: | |
text = output.content | |
pattern = re.compile(r"\d+?\.\s*(.+?) - (.+)") | |
roles = pattern.findall(text) | |
if len(roles) < self.cnt_critic_agents: | |
logger.error( | |
f"Role assigner failed to assign roles to {self.cnt_critic_agents} critics!" | |
) | |
raise OutputParserError(text) | |
res = [] | |
for role in roles: | |
res.append({"name": role[0], "description": role[1]}) | |
return res | |
class SolverParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
text = output.content | |
pattern = re.compile(r"\d+?\.\s*(.+?) - (.+)") | |
tasks = pattern.findall(text) | |
if len(tasks) == 0: | |
raise OutputParserError(text) | |
return AgentFinish({"output": tasks}, text) | |
class ToolUsingSolverParser(OutputParser): | |
def parse(self, output: LLMResult) -> Union[AgentAction, AgentFinish]: | |
if output.function_name != "": | |
return AgentAction( | |
tool=output.function_name, | |
tool_input=output.function_arguments, | |
log=output.content, | |
) | |
else: | |
return AgentFinish({"output": output.content}, output.content) | |
class HumanevalEvaluatorParser(OutputParser): | |
def parse(self, output: LLMResult) -> Tuple[List[int], str]: | |
text = output.content | |
try: | |
result = re.findall(r"Status:(.+?)Speak:(.+)", text, re.DOTALL)[0] | |
score = bool(int(result[0])) | |
words = result[1].strip() | |
except (IndexError, ValueError): | |
# logger.error("Bad response from evaluator!") | |
raise OutputParserError(text) | |
return score, words | |