Spaces:
Running
Running
import re | |
import signal | |
from contextlib import contextmanager, redirect_stdout | |
from dataclasses import dataclass | |
from enum import Enum | |
from io import StringIO | |
from typing import Optional, Type | |
from ..schema import ActionReturn, ActionStatusCode | |
from .base_action import AsyncActionMixin, BaseAction, tool_api | |
from .parser import BaseParser, JsonParser | |
class Status(str, Enum): | |
"""Execution status.""" | |
SUCCESS = 'success' | |
FAILURE = 'failure' | |
class ExecutionResult: | |
"""Execution result.""" | |
status: Status | |
value: Optional[str] = None | |
msg: Optional[str] = None | |
def _raise_timeout(timeout): | |
def _handler(signum, frame): | |
raise TimeoutError() | |
signal.signal(signal.SIGALRM, _handler) | |
signal.alarm(timeout) | |
try: | |
yield | |
finally: | |
signal.alarm(0) | |
class IPythonInteractive(BaseAction): | |
"""An interactive IPython shell for code execution. | |
Args: | |
timeout (int): Upper bound of waiting time for Python script execution. | |
Defaults to ``20``. | |
max_out_len (int): maximum output length. No truncation occurs if negative. | |
Defaults to ``2048``. | |
use_signals (bool): whether signals should be used for timing function out | |
or the multiprocessing. Set to ``False`` when not running in the main | |
thread, e.g. web applications. Defaults to ``True`` | |
description (dict): The description of the action. Defaults to ``None``. | |
parser (Type[BaseParser]): The parser class to process the | |
action's inputs and outputs. Defaults to :class:`JsonParser`. | |
""" | |
def __init__( | |
self, | |
timeout: int = 30, | |
max_out_len: int = 8192, | |
use_signals: bool = True, | |
description: Optional[dict] = None, | |
parser: Type[BaseParser] = JsonParser, | |
): | |
super().__init__(description, parser) | |
self.timeout = timeout | |
self._executor = self.create_shell() | |
self._highlighting = re.compile( | |
r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') | |
self._max_out_len = max_out_len if max_out_len >= 0 else None | |
self._use_signals = use_signals | |
def reset(self): | |
"""Clear the context.""" | |
self._executor.reset() | |
def run(self, command: str, timeout: Optional[int] = None) -> ActionReturn: | |
"""Launch an IPython Interactive Shell to execute code. | |
Args: | |
command (:class:`str`): Python code snippet | |
timeout (:class:`Optional[int]`): timeout for execution. | |
This argument only works in the main thread. Defaults to ``None``. | |
""" | |
from timeout_decorator import timeout as timer | |
tool_return = ActionReturn(args={'text': command}, type=self.name) | |
ret = ( | |
timer(timeout or self.timeout)(self.exec)(command) | |
if self._use_signals else self.exec(command)) | |
if ret.status is Status.SUCCESS: | |
tool_return.result = [{'type': 'text', 'content': ret.value}] | |
tool_return.state = ActionStatusCode.SUCCESS | |
else: | |
tool_return.errmsg = ret.msg | |
tool_return.state = ActionStatusCode.API_ERROR | |
return tool_return | |
def exec(self, code: str) -> ExecutionResult: | |
"""Run Python scripts in IPython shell. | |
Args: | |
code (:class:`str`): code block | |
Returns: | |
:py:class:`ExecutionResult`: execution result | |
""" | |
with StringIO() as io: | |
with redirect_stdout(io): | |
ret = self._executor.run_cell(self.extract_code(code)) | |
result = ret.result | |
if result is not None: | |
return ExecutionResult(Status.SUCCESS, | |
str(result)[:self._max_out_len]) | |
outs = io.getvalue().strip().split('\n') | |
if not outs: | |
return ExecutionResult(Status.SUCCESS, '') | |
for i, out in enumerate(outs): | |
if re.search('Error|Traceback', out, re.S): | |
if 'TimeoutError' in out: | |
return ExecutionResult( | |
Status.FAILURE, | |
msg=('The code interpreter encountered ' | |
'a timeout error.')) | |
err_idx = i | |
break | |
else: | |
return ExecutionResult(Status.SUCCESS, | |
outs[-1].strip()[:self._max_out_len]) | |
return ExecutionResult( | |
Status.FAILURE, | |
msg=self._highlighting.sub( | |
'', '\n'.join(outs[err_idx:])[:self._max_out_len]), | |
) | |
def create_shell(): | |
from IPython import InteractiveShell | |
from traitlets.config import Config | |
c = Config() | |
c.HistoryManager.enabled = False | |
c.HistoryManager.hist_file = ':memory:' | |
return InteractiveShell( | |
user_ns={'_raise_timeout': _raise_timeout}, config=c) | |
def extract_code(text: str) -> str: | |
"""Extract Python code from markup languages. | |
Args: | |
text (:class:`str`): Markdown-formatted text | |
Returns: | |
:class:`str`: Python code | |
""" | |
import json5 | |
# Match triple backtick blocks first | |
triple_match = re.search(r'```[^\n]*\n(.+?)```', text, re.DOTALL) | |
# Match single backtick blocks second | |
single_match = re.search(r'`([^`]*)`', text, re.DOTALL) | |
if triple_match: | |
text = triple_match.group(1) | |
elif single_match: | |
text = single_match.group(1) | |
else: | |
try: | |
text = json5.loads(text)['code'] | |
except Exception: | |
pass | |
# If no code blocks found, return original text | |
return text | |
def wrap_code_with_timeout(code: str, timeout: int) -> str: | |
if not code.strip(): | |
return code | |
code = code.strip('\n').rstrip() | |
indent = len(code) - len(code.lstrip()) | |
handle = ' ' * indent + f'with _raise_timeout({timeout}):\n' | |
block = '\n'.join([' ' + line for line in code.split('\n')]) | |
wrapped_code = handle + block | |
last_line = code.split('\n')[-1] | |
is_expression = True | |
try: | |
compile(last_line.lstrip(), '<stdin>', 'eval') | |
except SyntaxError: | |
is_expression = False | |
if is_expression: | |
wrapped_code += '\n' * 5 + last_line | |
return wrapped_code | |
class AsyncIPythonInteractive(AsyncActionMixin, IPythonInteractive): | |
"""An interactive IPython shell for code execution. | |
Args: | |
timeout (int): Upper bound of waiting time for Python script execution. | |
Defaults to ``20``. | |
max_out_len (int): maximum output length. No truncation occurs if negative. | |
Defaults to ``2048``. | |
use_signals (bool): whether signals should be used for timing function out | |
or the multiprocessing. Set to ``False`` when not running in the main | |
thread, e.g. web applications. Defaults to ``True`` | |
description (dict): The description of the action. Defaults to ``None``. | |
parser (Type[BaseParser]): The parser class to process the | |
action's inputs and outputs. Defaults to :class:`JsonParser`. | |
""" | |
async def run(self, | |
command: str, | |
timeout: Optional[int] = None) -> ActionReturn: | |
"""Launch an IPython Interactive Shell to execute code. | |
Args: | |
command (:class:`str`): Python code snippet | |
timeout (:class:`Optional[int]`): timeout for execution. | |
This argument only works in the main thread. Defaults to ``None``. | |
""" | |
tool_return = ActionReturn(args={'text': command}, type=self.name) | |
ret = await self.exec(command, timeout) | |
if ret.status is Status.SUCCESS: | |
tool_return.result = [{'type': 'text', 'content': ret.value}] | |
tool_return.state = ActionStatusCode.SUCCESS | |
else: | |
tool_return.errmsg = ret.msg | |
tool_return.state = ActionStatusCode.API_ERROR | |
return tool_return | |
async def exec(self, code: str, timeout: int = None) -> ExecutionResult: | |
"""Asynchronously run Python scripts in IPython shell. | |
Args: | |
code (:class:`str`): code block | |
timeout (:class:`int`): max waiting time for code execution | |
Returns: | |
:py:class:`ExecutionResult`: execution result | |
""" | |
with StringIO() as io: | |
with redirect_stdout(io): | |
ret = await self._executor.run_cell_async( | |
# ret = await self.create_shell().run_cell_async( | |
self.wrap_code_with_timeout( | |
self.extract_code(code), timeout or self.timeout)) | |
result = ret.result | |
if result is not None: | |
return ExecutionResult(Status.SUCCESS, | |
str(result)[:self._max_out_len]) | |
outs = io.getvalue().strip().split('\n') | |
if not outs: | |
return ExecutionResult(Status.SUCCESS, '') | |
for i, out in enumerate(outs): | |
if re.search('Error|Traceback', out, re.S): | |
if 'TimeoutError' in out: | |
return ExecutionResult( | |
Status.FAILURE, | |
msg=('The code interpreter encountered a ' | |
'timeout error.')) | |
err_idx = i | |
break | |
else: | |
return ExecutionResult(Status.SUCCESS, | |
outs[-1].strip()[:self._max_out_len]) | |
return ExecutionResult( | |
Status.FAILURE, | |
msg=self._highlighting.sub( | |
'', '\n'.join(outs[err_idx:])[:self._max_out_len]), | |
) | |