File size: 9,999 Bytes
1ce5e18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
#!/usr/bin/env python
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import difflib
from collections.abc import Mapping
from typing import Any, Callable, Dict
class InterpretorError(ValueError):
"""
An error raised when the interpretor cannot evaluate a Python expression, due to syntax error or unsupported
operations.
"""
pass
def evaluate(code: str, tools: Dict[str, Callable], state=None, chat_mode=False):
"""
Evaluate a python expression using the content of the variables stored in a state and only evaluating a given set
of functions.
This function will recurse through the nodes of the tree provided.
Args:
code (`str`):
The code to evaluate.
tools (`Dict[str, Callable]`):
The functions that may be called during the evaluation. Any call to another function will fail with an
`InterpretorError`.
state (`Dict[str, Any]`):
A dictionary mapping variable names to values. The `state` should contain the initial inputs but will be
updated by this function to contain all variables as they are evaluated.
chat_mode (`bool`, *optional*, defaults to `False`):
Whether or not the function is called from `Agent.chat`.
"""
try:
expression = ast.parse(code)
except SyntaxError as e:
print("The code generated by the agent is not valid.\n", e)
return
if state is None:
state = {}
result = None
for idx, node in enumerate(expression.body):
try:
line_result = evaluate_ast(node, state, tools)
except InterpretorError as e:
msg = f"Evaluation of the code stopped at line {idx} before the end because of the following error"
if chat_mode:
msg += (
f". Copy paste the following error message and send it back to the agent:\nI get an error: '{e}'"
)
else:
msg += f":\n{e}"
print(msg)
break
if line_result is not None:
result = line_result
return result
def evaluate_ast(expression: ast.AST, state: Dict[str, Any], tools: Dict[str, Callable]):
"""
Evaluate an absract syntax tree using the content of the variables stored in a state and only evaluating a given
set of functions.
This function will recurse trough the nodes of the tree provided.
Args:
expression (`ast.AST`):
The code to evaluate, as an abastract syntax tree.
state (`Dict[str, Any]`):
A dictionary mapping variable names to values. The `state` is updated if need be when the evaluation
encounters assignements.
tools (`Dict[str, Callable]`):
The functions that may be called during the evaluation. Any call to another function will fail with an
`InterpretorError`.
"""
if isinstance(expression, ast.Assign):
# Assignement -> we evaluate the assignement which should update the state
# We return the variable assigned as it may be used to determine the final result.
return evaluate_assign(expression, state, tools)
elif isinstance(expression, ast.Call):
# Function call -> we return the value of the function call
return evaluate_call(expression, state, tools)
elif isinstance(expression, ast.Constant):
# Constant -> just return the value
return expression.value
elif isinstance(expression, ast.Dict):
# Dict -> evaluate all keys and values
keys = [evaluate_ast(k, state, tools) for k in expression.keys]
values = [evaluate_ast(v, state, tools) for v in expression.values]
return dict(zip(keys, values))
elif isinstance(expression, ast.Expr):
# Expression -> evaluate the content
return evaluate_ast(expression.value, state, tools)
elif isinstance(expression, ast.For):
# For loop -> execute the loop
return evaluate_for(expression, state, tools)
elif isinstance(expression, ast.FormattedValue):
# Formatted value (part of f-string) -> evaluate the content and return
return evaluate_ast(expression.value, state, tools)
elif isinstance(expression, ast.If):
# If -> execute the right branch
return evaluate_if(expression, state, tools)
elif hasattr(ast, "Index") and isinstance(expression, ast.Index):
return evaluate_ast(expression.value, state, tools)
elif isinstance(expression, ast.JoinedStr):
return "".join([str(evaluate_ast(v, state, tools)) for v in expression.values])
elif isinstance(expression, ast.List):
# List -> evaluate all elements
return [evaluate_ast(elt, state, tools) for elt in expression.elts]
elif isinstance(expression, ast.Name):
# Name -> pick up the value in the state
return evaluate_name(expression, state, tools)
elif isinstance(expression, ast.Subscript):
# Subscript -> return the value of the indexing
return evaluate_subscript(expression, state, tools)
else:
# For now we refuse anything else. Let's add things as we need them.
raise InterpretorError(f"{expression.__class__.__name__} is not supported.")
def evaluate_assign(assign, state, tools):
var_names = assign.targets
result = evaluate_ast(assign.value, state, tools)
if len(var_names) == 1:
state[var_names[0].id] = result
else:
if len(result) != len(var_names):
raise InterpretorError(f"Expected {len(var_names)} values but got {len(result)}.")
for var_name, r in zip(var_names, result):
state[var_name.id] = r
return result
def evaluate_call(call, state, tools):
if not isinstance(call.func, ast.Name):
raise InterpretorError(
f"It is not permitted to evaluate other functions than the provided tools (tried to execute {call.func} of "
f"type {type(call.func)}."
)
func_name = call.func.id
if func_name not in tools:
raise InterpretorError(
f"It is not permitted to evaluate other functions than the provided tools (tried to execute {call.func.id})."
)
func = tools[func_name]
# Todo deal with args
args = [evaluate_ast(arg, state, tools) for arg in call.args]
kwargs = {keyword.arg: evaluate_ast(keyword.value, state, tools) for keyword in call.keywords}
return func(*args, **kwargs)
def evaluate_subscript(subscript, state, tools):
index = evaluate_ast(subscript.slice, state, tools)
value = evaluate_ast(subscript.value, state, tools)
if isinstance(value, (list, tuple)):
return value[int(index)]
if index in value:
return value[index]
if isinstance(index, str) and isinstance(value, Mapping):
close_matches = difflib.get_close_matches(index, list(value.keys()))
if len(close_matches) > 0:
return value[close_matches[0]]
raise InterpretorError(f"Could not index {value} with '{index}'.")
def evaluate_name(name, state, tools):
if name.id in state:
return state[name.id]
close_matches = difflib.get_close_matches(name.id, list(state.keys()))
if len(close_matches) > 0:
return state[close_matches[0]]
raise InterpretorError(f"The variable `{name.id}` is not defined.")
def evaluate_condition(condition, state, tools):
if len(condition.ops) > 1:
raise InterpretorError("Cannot evaluate conditions with multiple operators")
left = evaluate_ast(condition.left, state, tools)
comparator = condition.ops[0]
right = evaluate_ast(condition.comparators[0], state, tools)
if isinstance(comparator, ast.Eq):
return left == right
elif isinstance(comparator, ast.NotEq):
return left != right
elif isinstance(comparator, ast.Lt):
return left < right
elif isinstance(comparator, ast.LtE):
return left <= right
elif isinstance(comparator, ast.Gt):
return left > right
elif isinstance(comparator, ast.GtE):
return left >= right
elif isinstance(comparator, ast.Is):
return left is right
elif isinstance(comparator, ast.IsNot):
return left is not right
elif isinstance(comparator, ast.In):
return left in right
elif isinstance(comparator, ast.NotIn):
return left not in right
else:
raise InterpretorError(f"Operator not supported: {comparator}")
def evaluate_if(if_statement, state, tools):
result = None
if evaluate_condition(if_statement.test, state, tools):
for line in if_statement.body:
line_result = evaluate_ast(line, state, tools)
if line_result is not None:
result = line_result
else:
for line in if_statement.orelse:
line_result = evaluate_ast(line, state, tools)
if line_result is not None:
result = line_result
return result
def evaluate_for(for_loop, state, tools):
result = None
iterator = evaluate_ast(for_loop.iter, state, tools)
for counter in iterator:
state[for_loop.target.id] = counter
for expression in for_loop.body:
line_result = evaluate_ast(expression, state, tools)
if line_result is not None:
result = line_result
return result
|