Lagent / lagent /utils /util.py
Superkingjcj's picture
Upload 111 files
e679d69 verified
import asyncio
import importlib
import inspect
import logging
import os
import os.path as osp
import sys
import time
from functools import partial
from logging.handlers import RotatingFileHandler
from typing import Any, Dict, Generator, Iterable, List, Optional, Union
def load_class_from_string(class_path: str, path=None):
path_in_sys = False
if path:
if path not in sys.path:
path_in_sys = True
sys.path.insert(0, path)
try:
module_name, class_name = class_path.rsplit('.', 1)
module = importlib.import_module(module_name)
cls = getattr(module, class_name)
return cls
finally:
if path and path_in_sys:
sys.path.remove(path)
def create_object(config: Union[Dict, Any] = None):
"""Create an instance based on the configuration where 'type' is a
preserved key to indicate the class (path). When accepting non-dictionary
input, the function degenerates to an identity.
"""
if config is None or not isinstance(config, dict):
return config
assert isinstance(config, dict) and 'type' in config
config = config.copy()
obj_type = config.pop('type')
if isinstance(obj_type, str):
obj_type = load_class_from_string(obj_type)
if inspect.isclass(obj_type):
obj = obj_type(**config)
else:
assert callable(obj_type)
obj = partial(obj_type, **config)
return obj
async def async_as_completed(futures: Iterable[asyncio.Future]):
"""A asynchronous wrapper for `asyncio.as_completed`"""
loop = asyncio.get_event_loop()
wrappers = []
for fut in futures:
assert isinstance(fut, asyncio.Future)
wrapper = loop.create_future()
fut.add_done_callback(wrapper.set_result)
wrappers.append(wrapper)
for next_completed in asyncio.as_completed(wrappers):
yield await next_completed
def filter_suffix(response: Union[str, List[str]],
suffixes: Optional[List[str]] = None) -> str:
"""Filter response with suffixes.
Args:
response (Union[str, List[str]]): generated responses by LLMs.
suffixes (str): a list of suffixes to be deleted.
Return:
str: a clean response.
"""
if suffixes is None:
return response
batched = True
if isinstance(response, str):
response = [response]
batched = False
processed = []
for resp in response:
for item in suffixes:
# if response.endswith(item):
# response = response[:len(response) - len(item)]
if item in resp:
resp = resp.split(item)[0]
processed.append(resp)
if not batched:
return processed[0]
return processed
def get_logger(
name: str = 'lagent',
level: str = 'debug',
fmt:
str = '%(asctime)s %(levelname)8s %(filename)20s %(lineno)4s - %(message)s',
add_file_handler: bool = False,
log_dir: str = 'log',
log_file: str = time.strftime('%Y-%m-%d.log', time.localtime()),
max_bytes: int = 5 * 1024 * 1024,
backup_count: int = 3,
):
logger = logging.getLogger(name)
logger.propagate = False
logger.setLevel(getattr(logging, level.upper(), logging.DEBUG))
formatter = logging.Formatter(fmt)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if add_file_handler:
if not osp.exists(log_dir):
os.makedirs(log_dir)
log_file_path = osp.join(log_dir, log_file)
file_handler = RotatingFileHandler(
log_file_path,
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
class GeneratorWithReturn:
"""Generator wrapper to capture the return value."""
def __init__(self, generator: Generator):
self.generator = generator
self.ret = None
def __iter__(self):
self.ret = yield from self.generator
return self.ret