File size: 4,679 Bytes
01523b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from __future__ import annotations

from agentverse.logging import get_logger
from colorama import Fore
import bdb
from string import Template
from typing import TYPE_CHECKING, List, Any

from agentverse.message import ExecutorMessage, Message, SolverMessage
from agentverse.utils import AgentFinish, AgentAction

from agentverse.agents import agent_registry
from agentverse.agents.base import BaseAgent
import requests

logger = get_logger()


@agent_registry.register("executor")
class ExecutorAgent(BaseAgent):
    max_history: int = 5

    def step(
        self, task_description: str, solution: str, tools: List[dict] = [], **kwargs
    ) -> ExecutorMessage:
        logger.debug("", self.name, Fore.MAGENTA)
        prepend_prompt, append_prompt = self.get_all_prompts(
            task_description=task_description,
            solution=solution,
            agent_name=self.name,
            **kwargs,
        )

        history = self.memory.to_messages(self.name, start_index=-self.max_history)
        parsed_response = None
        for i in range(self.max_retry):
            try:
                response = self.llm.generate_response(
                    prepend_prompt, history, append_prompt, tools
                )
                parsed_response = self.output_parser.parse(response)
                break
            except (KeyboardInterrupt, bdb.BdbQuit):
                raise
            except Exception as e:
                logger.error(e)
                logger.warn("Retrying...")
                continue

        if parsed_response is None:
            logger.error(f"{self.name} failed to generate valid response.")
        if isinstance(parsed_response, AgentFinish):
            message = ExecutorMessage(
                content=parsed_response.return_values["output"],
                sender=self.name,
                sender_agent=self,
            )
        elif isinstance(parsed_response, AgentAction):
            message = ExecutorMessage(
                content=parsed_response.log,
                sender=self.name,
                sender_agent=self,
                tool_name=parsed_response.tool,
                tool_input=parsed_response.tool_input,
            )
        else:
            raise ValueError(
                f"Error response type: {type(parsed_response)}. Only support \
                    AgentFinish and AgentAction. Modify your output parser."
            )
        return message

    async def astep(
        self, task_description: str, solution: str, tools: List[dict] = [], **kwargs
    ) -> ExecutorMessage:
        logger.debug("", self.name, Fore.MAGENTA)
        prepend_prompt, append_prompt = self.get_all_prompts(
            task_description=task_description,
            solution=solution,
            agent_name=self.name,
            **kwargs,
        )

        history = self.memory.to_messages(self.name, start_index=-self.max_history)
        parsed_response = None
        for i in range(self.max_retry):
            try:
                response = await self.llm.agenerate_response(
                    prepend_prompt, history, append_prompt, tools
                )
                parsed_response = self.output_parser.parse(response)
                break
            except (KeyboardInterrupt, bdb.BdbQuit):
                raise
            except Exception as e:
                logger.error(e)
                logger.warn("Retrying...")
                continue

        if parsed_response is None:
            logger.error(f"{self.name} failed to generate valid response.")
            parsed_response = AgentAction(tool="", tool_input="", log="")
        if isinstance(parsed_response, AgentFinish):
            message = ExecutorMessage(
                content=parsed_response.return_values["output"],
                sender=self.name,
                sender_agent=self,
            )
        elif isinstance(parsed_response, AgentAction):
            message = ExecutorMessage(
                content=parsed_response.log,
                sender=self.name,
                sender_agent=self,
                tool_name=parsed_response.tool,
                tool_input=parsed_response.tool_input,
            )
        else:
            raise ValueError(
                f"Error response type: {type(parsed_response)}. Only support \
                    AgentFinish and AgentAction. Modify your output parser."
            )
        return message

    def add_message_to_memory(self, messages: List[Message]) -> None:
        self.memory.add_message(messages)

    def reset(self) -> None:
        """Reset the agent"""
        self.memory.reset()
        # TODO: reset receiver