Spaces:
Sleeping
Sleeping
File size: 3,770 Bytes
04ffec9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
from speakers.common.registry import registry
from speakers.server.bootstrap.bootstrap_register import load_bootstrap, get_bootstrap
from omegaconf import OmegaConf
from speakers.common.utils import get_abs_path
from oscrypto import util as crypto_utils
import asyncio
import time
import os
import sys
import traceback
import subprocess
root_dir = os.path.dirname(os.path.abspath(__file__))
registry.register_path("server_library_root", root_dir)
# Time to wait for web client to send a request to /task-state request
# before that web clients task gets removed from the queue
WEB_CLIENT_TIMEOUT = -1
# Time before finished tasks get removed from memory
FINISHED_TASK_REMOVE_TIMEOUT = 1800
def generate_nonce():
return crypto_utils.rand_bytes(16).hex()
def start_translator_client_proc(speakers_config_file: str, nonce: str = None):
cmds = [
sys.executable,
'-m', 'speakers',
'--mode', 'web_runner',
'--speakers-config-file', speakers_config_file,
'--nonce', nonce,
]
proc = subprocess.Popen(cmds, cwd=f"{registry.get_path('library_root')}/../")
return proc
async def start_async_app(speakers_config_file: str, nonce: str = None):
config = OmegaConf.load(get_abs_path(speakers_config_file))
load_bootstrap(config=config.get("bootstrap"))
runner_bootstrap_web = get_bootstrap("runner_bootstrap_web")
runner_bootstrap_web.set_nonce(nonce=nonce)
await runner_bootstrap_web.run()
return runner_bootstrap_web
async def dispatch(speakers_config_file: str, nonce: str = None):
if nonce is None:
nonce = os.getenv('MT_WEB_NONCE', generate_nonce())
runner = await start_async_app(speakers_config_file=speakers_config_file, nonce=nonce)
# Create client process
print()
client_process = start_translator_client_proc(speakers_config_file, nonce=nonce)
try:
while True:
"""任务队列状态维护"""
await asyncio.sleep(1)
# Restart client if OOM or similar errors occured
if client_process.poll() is not None:
print('Restarting translator process')
if len(runner.ongoing_tasks) > 0:
task_id = runner.ongoing_tasks.pop(0)
state = runner.task_states[task_id]
state['info'] = 'error'
state['finished'] = True
client_process = start_translator_client_proc(speakers_config_file=speakers_config_file)
# Filter queued and finished tasks
now = time.time()
to_del_task_ids = set()
for tid, s in runner.task_states.items():
payload = runner.task_data[tid]
# Remove finished tasks after 30 minutes
if s['finished'] and now - payload.created_at > FINISHED_TASK_REMOVE_TIMEOUT:
to_del_task_ids.add(tid)
# Remove queued tasks without web client
elif WEB_CLIENT_TIMEOUT >= 0:
if tid not in runner.ongoing_tasks and not s['finished'] \
and now - payload.requested_at > WEB_CLIENT_TIMEOUT:
print('REMOVING TASK', tid)
to_del_task_ids.add(tid)
try:
runner.queue.remove(tid)
except Exception:
pass
for tid in to_del_task_ids:
del runner.task_states[tid]
del runner.task_data[tid]
except:
if client_process.poll() is None:
# client_process.terminate()
client_process.kill()
await runner.destroy()
traceback.print_exc()
raise
|