Spaces:
Sleeping
Sleeping
File size: 4,140 Bytes
e679d69 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import asyncio
import importlib
import inspect
import logging
import os
import os.path as osp
import sys
import time
from functools import partial
from logging.handlers import RotatingFileHandler
from typing import Any, Dict, Generator, Iterable, List, Optional, Union
def load_class_from_string(class_path: str, path=None):
path_in_sys = False
if path:
if path not in sys.path:
path_in_sys = True
sys.path.insert(0, path)
try:
module_name, class_name = class_path.rsplit('.', 1)
module = importlib.import_module(module_name)
cls = getattr(module, class_name)
return cls
finally:
if path and path_in_sys:
sys.path.remove(path)
def create_object(config: Union[Dict, Any] = None):
"""Create an instance based on the configuration where 'type' is a
preserved key to indicate the class (path). When accepting non-dictionary
input, the function degenerates to an identity.
"""
if config is None or not isinstance(config, dict):
return config
assert isinstance(config, dict) and 'type' in config
config = config.copy()
obj_type = config.pop('type')
if isinstance(obj_type, str):
obj_type = load_class_from_string(obj_type)
if inspect.isclass(obj_type):
obj = obj_type(**config)
else:
assert callable(obj_type)
obj = partial(obj_type, **config)
return obj
async def async_as_completed(futures: Iterable[asyncio.Future]):
"""A asynchronous wrapper for `asyncio.as_completed`"""
loop = asyncio.get_event_loop()
wrappers = []
for fut in futures:
assert isinstance(fut, asyncio.Future)
wrapper = loop.create_future()
fut.add_done_callback(wrapper.set_result)
wrappers.append(wrapper)
for next_completed in asyncio.as_completed(wrappers):
yield await next_completed
def filter_suffix(response: Union[str, List[str]],
suffixes: Optional[List[str]] = None) -> str:
"""Filter response with suffixes.
Args:
response (Union[str, List[str]]): generated responses by LLMs.
suffixes (str): a list of suffixes to be deleted.
Return:
str: a clean response.
"""
if suffixes is None:
return response
batched = True
if isinstance(response, str):
response = [response]
batched = False
processed = []
for resp in response:
for item in suffixes:
# if response.endswith(item):
# response = response[:len(response) - len(item)]
if item in resp:
resp = resp.split(item)[0]
processed.append(resp)
if not batched:
return processed[0]
return processed
def get_logger(
name: str = 'lagent',
level: str = 'debug',
fmt:
str = '%(asctime)s %(levelname)8s %(filename)20s %(lineno)4s - %(message)s',
add_file_handler: bool = False,
log_dir: str = 'log',
log_file: str = time.strftime('%Y-%m-%d.log', time.localtime()),
max_bytes: int = 5 * 1024 * 1024,
backup_count: int = 3,
):
logger = logging.getLogger(name)
logger.propagate = False
logger.setLevel(getattr(logging, level.upper(), logging.DEBUG))
formatter = logging.Formatter(fmt)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if add_file_handler:
if not osp.exists(log_dir):
os.makedirs(log_dir)
log_file_path = osp.join(log_dir, log_file)
file_handler = RotatingFileHandler(
log_file_path,
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
class GeneratorWithReturn:
"""Generator wrapper to capture the return value."""
def __init__(self, generator: Generator):
self.generator = generator
self.ret = None
def __iter__(self):
self.ret = yield from self.generator
return self.ret
|