Spaces:
Sleeping
Sleeping
File size: 10,003 Bytes
e679d69 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
import re
import signal
from contextlib import contextmanager, redirect_stdout
from dataclasses import dataclass
from enum import Enum
from io import StringIO
from typing import Optional, Type
from ..schema import ActionReturn, ActionStatusCode
from .base_action import AsyncActionMixin, BaseAction, tool_api
from .parser import BaseParser, JsonParser
class Status(str, Enum):
"""Execution status."""
SUCCESS = 'success'
FAILURE = 'failure'
@dataclass
class ExecutionResult:
"""Execution result."""
status: Status
value: Optional[str] = None
msg: Optional[str] = None
@contextmanager
def _raise_timeout(timeout):
def _handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, _handler)
signal.alarm(timeout)
try:
yield
finally:
signal.alarm(0)
class IPythonInteractive(BaseAction):
"""An interactive IPython shell for code execution.
Args:
timeout (int): Upper bound of waiting time for Python script execution.
Defaults to ``20``.
max_out_len (int): maximum output length. No truncation occurs if negative.
Defaults to ``2048``.
use_signals (bool): whether signals should be used for timing function out
or the multiprocessing. Set to ``False`` when not running in the main
thread, e.g. web applications. Defaults to ``True``
description (dict): The description of the action. Defaults to ``None``.
parser (Type[BaseParser]): The parser class to process the
action's inputs and outputs. Defaults to :class:`JsonParser`.
"""
def __init__(
self,
timeout: int = 30,
max_out_len: int = 8192,
use_signals: bool = True,
description: Optional[dict] = None,
parser: Type[BaseParser] = JsonParser,
):
super().__init__(description, parser)
self.timeout = timeout
self._executor = self.create_shell()
self._highlighting = re.compile(
r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
self._max_out_len = max_out_len if max_out_len >= 0 else None
self._use_signals = use_signals
def reset(self):
"""Clear the context."""
self._executor.reset()
@tool_api
def run(self, command: str, timeout: Optional[int] = None) -> ActionReturn:
"""Launch an IPython Interactive Shell to execute code.
Args:
command (:class:`str`): Python code snippet
timeout (:class:`Optional[int]`): timeout for execution.
This argument only works in the main thread. Defaults to ``None``.
"""
from timeout_decorator import timeout as timer
tool_return = ActionReturn(args={'text': command}, type=self.name)
ret = (
timer(timeout or self.timeout)(self.exec)(command)
if self._use_signals else self.exec(command))
if ret.status is Status.SUCCESS:
tool_return.result = [{'type': 'text', 'content': ret.value}]
tool_return.state = ActionStatusCode.SUCCESS
else:
tool_return.errmsg = ret.msg
tool_return.state = ActionStatusCode.API_ERROR
return tool_return
def exec(self, code: str) -> ExecutionResult:
"""Run Python scripts in IPython shell.
Args:
code (:class:`str`): code block
Returns:
:py:class:`ExecutionResult`: execution result
"""
with StringIO() as io:
with redirect_stdout(io):
ret = self._executor.run_cell(self.extract_code(code))
result = ret.result
if result is not None:
return ExecutionResult(Status.SUCCESS,
str(result)[:self._max_out_len])
outs = io.getvalue().strip().split('\n')
if not outs:
return ExecutionResult(Status.SUCCESS, '')
for i, out in enumerate(outs):
if re.search('Error|Traceback', out, re.S):
if 'TimeoutError' in out:
return ExecutionResult(
Status.FAILURE,
msg=('The code interpreter encountered '
'a timeout error.'))
err_idx = i
break
else:
return ExecutionResult(Status.SUCCESS,
outs[-1].strip()[:self._max_out_len])
return ExecutionResult(
Status.FAILURE,
msg=self._highlighting.sub(
'', '\n'.join(outs[err_idx:])[:self._max_out_len]),
)
@staticmethod
def create_shell():
from IPython import InteractiveShell
from traitlets.config import Config
c = Config()
c.HistoryManager.enabled = False
c.HistoryManager.hist_file = ':memory:'
return InteractiveShell(
user_ns={'_raise_timeout': _raise_timeout}, config=c)
@staticmethod
def extract_code(text: str) -> str:
"""Extract Python code from markup languages.
Args:
text (:class:`str`): Markdown-formatted text
Returns:
:class:`str`: Python code
"""
import json5
# Match triple backtick blocks first
triple_match = re.search(r'```[^\n]*\n(.+?)```', text, re.DOTALL)
# Match single backtick blocks second
single_match = re.search(r'`([^`]*)`', text, re.DOTALL)
if triple_match:
text = triple_match.group(1)
elif single_match:
text = single_match.group(1)
else:
try:
text = json5.loads(text)['code']
except Exception:
pass
# If no code blocks found, return original text
return text
@staticmethod
def wrap_code_with_timeout(code: str, timeout: int) -> str:
if not code.strip():
return code
code = code.strip('\n').rstrip()
indent = len(code) - len(code.lstrip())
handle = ' ' * indent + f'with _raise_timeout({timeout}):\n'
block = '\n'.join([' ' + line for line in code.split('\n')])
wrapped_code = handle + block
last_line = code.split('\n')[-1]
is_expression = True
try:
compile(last_line.lstrip(), '<stdin>', 'eval')
except SyntaxError:
is_expression = False
if is_expression:
wrapped_code += '\n' * 5 + last_line
return wrapped_code
class AsyncIPythonInteractive(AsyncActionMixin, IPythonInteractive):
"""An interactive IPython shell for code execution.
Args:
timeout (int): Upper bound of waiting time for Python script execution.
Defaults to ``20``.
max_out_len (int): maximum output length. No truncation occurs if negative.
Defaults to ``2048``.
use_signals (bool): whether signals should be used for timing function out
or the multiprocessing. Set to ``False`` when not running in the main
thread, e.g. web applications. Defaults to ``True``
description (dict): The description of the action. Defaults to ``None``.
parser (Type[BaseParser]): The parser class to process the
action's inputs and outputs. Defaults to :class:`JsonParser`.
"""
@tool_api
async def run(self,
command: str,
timeout: Optional[int] = None) -> ActionReturn:
"""Launch an IPython Interactive Shell to execute code.
Args:
command (:class:`str`): Python code snippet
timeout (:class:`Optional[int]`): timeout for execution.
This argument only works in the main thread. Defaults to ``None``.
"""
tool_return = ActionReturn(args={'text': command}, type=self.name)
ret = await self.exec(command, timeout)
if ret.status is Status.SUCCESS:
tool_return.result = [{'type': 'text', 'content': ret.value}]
tool_return.state = ActionStatusCode.SUCCESS
else:
tool_return.errmsg = ret.msg
tool_return.state = ActionStatusCode.API_ERROR
return tool_return
async def exec(self, code: str, timeout: int = None) -> ExecutionResult:
"""Asynchronously run Python scripts in IPython shell.
Args:
code (:class:`str`): code block
timeout (:class:`int`): max waiting time for code execution
Returns:
:py:class:`ExecutionResult`: execution result
"""
with StringIO() as io:
with redirect_stdout(io):
ret = await self._executor.run_cell_async(
# ret = await self.create_shell().run_cell_async(
self.wrap_code_with_timeout(
self.extract_code(code), timeout or self.timeout))
result = ret.result
if result is not None:
return ExecutionResult(Status.SUCCESS,
str(result)[:self._max_out_len])
outs = io.getvalue().strip().split('\n')
if not outs:
return ExecutionResult(Status.SUCCESS, '')
for i, out in enumerate(outs):
if re.search('Error|Traceback', out, re.S):
if 'TimeoutError' in out:
return ExecutionResult(
Status.FAILURE,
msg=('The code interpreter encountered a '
'timeout error.'))
err_idx = i
break
else:
return ExecutionResult(Status.SUCCESS,
outs[-1].strip()[:self._max_out_len])
return ExecutionResult(
Status.FAILURE,
msg=self._highlighting.sub(
'', '\n'.join(outs[err_idx:])[:self._max_out_len]),
)
|