from speakers.common.registry import registry from speakers.server.bootstrap.bootstrap_register import load_bootstrap, get_bootstrap from omegaconf import OmegaConf from speakers.common.utils import get_abs_path from oscrypto import util as crypto_utils import asyncio import time import os import sys import traceback import subprocess root_dir = os.path.dirname(os.path.abspath(__file__)) registry.register_path("server_library_root", root_dir) # Time to wait for web client to send a request to /task-state request # before that web clients task gets removed from the queue WEB_CLIENT_TIMEOUT = -1 # Time before finished tasks get removed from memory FINISHED_TASK_REMOVE_TIMEOUT = 1800 def generate_nonce(): return crypto_utils.rand_bytes(16).hex() def start_translator_client_proc(speakers_config_file: str, nonce: str = None): cmds = [ sys.executable, '-m', 'speakers', '--mode', 'web_runner', '--speakers-config-file', speakers_config_file, '--nonce', nonce, ] proc = subprocess.Popen(cmds, cwd=f"{registry.get_path('library_root')}/../") return proc async def start_async_app(speakers_config_file: str, nonce: str = None): config = OmegaConf.load(get_abs_path(speakers_config_file)) load_bootstrap(config=config.get("bootstrap")) runner_bootstrap_web = get_bootstrap("runner_bootstrap_web") runner_bootstrap_web.set_nonce(nonce=nonce) await runner_bootstrap_web.run() return runner_bootstrap_web async def dispatch(speakers_config_file: str, nonce: str = None): if nonce is None: nonce = os.getenv('MT_WEB_NONCE', generate_nonce()) runner = await start_async_app(speakers_config_file=speakers_config_file, nonce=nonce) # Create client process print() client_process = start_translator_client_proc(speakers_config_file, nonce=nonce) try: while True: """任务队列状态维护""" await asyncio.sleep(1) # Restart client if OOM or similar errors occured if client_process.poll() is not None: print('Restarting translator process') if len(runner.ongoing_tasks) > 0: task_id = runner.ongoing_tasks.pop(0) state = runner.task_states[task_id] state['info'] = 'error' state['finished'] = True client_process = start_translator_client_proc(speakers_config_file=speakers_config_file) # Filter queued and finished tasks now = time.time() to_del_task_ids = set() for tid, s in runner.task_states.items(): payload = runner.task_data[tid] # Remove finished tasks after 30 minutes if s['finished'] and now - payload.created_at > FINISHED_TASK_REMOVE_TIMEOUT: to_del_task_ids.add(tid) # Remove queued tasks without web client elif WEB_CLIENT_TIMEOUT >= 0: if tid not in runner.ongoing_tasks and not s['finished'] \ and now - payload.requested_at > WEB_CLIENT_TIMEOUT: print('REMOVING TASK', tid) to_del_task_ids.add(tid) try: runner.queue.remove(tid) except Exception: pass for tid in to_del_task_ids: del runner.task_states[tid] del runner.task_data[tid] except: if client_process.poll() is None: # client_process.terminate() client_process.kill() await runner.destroy() traceback.print_exc() raise