import asyncio import json import logging import math import os import time from functools import lru_cache import edge_tts import gradio as gr import httpx import soundfile as sf from tts_service.utils import cache_path, env_str from tts_service.voices import voice_manager log = logging.getLogger(__name__) @lru_cache(maxsize=None) def import_voice_converter(): from rvc.infer.infer import VoiceConverter return VoiceConverter() # TTS async def run_tts_script( text: str, voice_name: str, rate: int = 0, progress=gr.Progress(), # noqa: B008 ) -> tuple[str, str]: def update_progress(pct, msg) -> None: log.debug("Progress: %.1f%%: %s", pct * 100, msg) progress(pct, msg) log.info("Synthesizing text (%s chars)", len(text)) update_progress(0, "Starting...") voice = voice_manager.voices[voice_name] text = text.strip() output_tts_path = cache_path(voice.tts, "", rate, text, extension="mp3") text_ptr = 0 if not os.path.exists(output_tts_path): rates = f"+{rate}%" if rate >= 0 else f"{rate}%" communicate = edge_tts.Communicate(text, voice.tts, rate=rates) with open(output_tts_path, "wb") as f: async for chunk in communicate.stream(): chunk_type = chunk["type"] if chunk_type == "audio": f.write(chunk["data"]) elif chunk_type == "WordBoundary": chunk_text = chunk["text"] text_index = text.index(chunk_text, text_ptr) if text_index == -1: log.warning("Extraneous text received from edge tts: %s", chunk_text) continue text_ptr = text_index + len(chunk_text) pct_complete = text_ptr / len(text) log.debug("%.1f%%: %s", pct_complete * 100, chunk) update_progress(pct_complete / 2, "Synthesizing...") else: log.warning("Unknown chunk type: %s: %s", chunk_type, json.dumps(chunk)) audio_duration = sf.info(output_tts_path).duration expected_processing_time = audio_duration / 8 + 10 # 10x real-time on nvidia t4 log.info(f"Synthesized {audio_duration:,.0f}s, expected processing time: {expected_processing_time:,.0f}s") output_rvc_path = cache_path(voice.tts, voice.name, rate, text, extension="mp3") if not os.path.exists(output_rvc_path): ts0 = time.time() last_check = 0.0 timeout = httpx.Timeout(5, read=15.0) endpoint_url = env_str("RVC_ENDPOINT") async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post(f"{endpoint_url}/v1/rvc", content=output_tts_path.read_bytes()) response.raise_for_status() data = response.json() log.info("Submitted for conversion: %s", data) result_url = data["urls"]["result"] while True: elapsed = time.time() - ts0 proportion = elapsed / expected_processing_time pct_complete = 0.5 + math.tanh(proportion) / 2 update_progress(pct_complete, "Processing...") if elapsed > 0.8 * expected_processing_time and elapsed - last_check > 10: last_check = elapsed response = await client.get(result_url) content_type = response.headers.get("Content-Type") processed_bytes = await response.aread() log.info(f"Checking status: %s (%s) {len(processed_bytes):,} bytes", response.status_code, content_type) if response.status_code == 200 and content_type == "audio/mpeg": output_rvc_path.write_bytes(processed_bytes) break elif response.status_code != 404: response.raise_for_status() await asyncio.sleep(0.1) log.info("Successfully converted text (%s chars) -> %s", len(text), output_rvc_path) else: log.info("Already converted: %s", output_rvc_path) return f"{audio_duration:,.0f}s of audio successfully synthesized.", str(output_rvc_path) # Prerequisites