Spaces:
Runtime error
Runtime error
import asyncio | |
import json | |
import logging | |
import math | |
import os | |
import time | |
from contextlib import suppress | |
import edge_tts | |
import gradio as gr | |
import httpx | |
import soundfile as sf | |
from ..utils import cache_path, env_str, seconds_to_ms | |
from ..voices import voice_manager | |
log = logging.getLogger(__name__) | |
# TTS | |
async def generate_speech_from_text( | |
text: str, | |
voice_name: str, | |
rate: int = 0, | |
progress=gr.Progress(), # noqa: B008 | |
) -> tuple[str, str]: | |
def update_progress(pct, msg) -> None: | |
log.debug("Progress: %.1f%%: %s", pct * 100, msg) | |
progress(pct, msg) | |
tts_start = -1.0 | |
rvc_start = -1.0 | |
ts0 = time.time() | |
update_progress(0, "Starting...") | |
voice = voice_manager.voices[voice_name] | |
text = text.strip() | |
output_tts_path = cache_path(voice.tts, "", rate, text, extension="mp3") | |
text_ptr = 0 | |
tts_cached = os.path.exists(output_tts_path) | |
if not tts_cached: | |
log.info("Synthesizing %s chars into %s", len(text), output_tts_path) | |
rates = f"+{rate}%" if rate >= 0 else f"{rate}%" | |
communicate = edge_tts.Communicate(text, voice.tts, rate=rates) | |
with open(output_tts_path, "wb") as f: | |
async for chunk in communicate.stream(): | |
if tts_start < 0: | |
tts_start = time.time() | |
chunk_type = chunk["type"] | |
if chunk_type == "audio": | |
f.write(chunk["data"]) | |
elif chunk_type == "WordBoundary": | |
chunk_text = chunk["text"] | |
text_index = -1 | |
with suppress(ValueError): | |
text_index = text.index(chunk_text, text_ptr) | |
if text_index == -1: | |
log.warning("Extraneous text received from edge tts: %s", chunk_text) | |
continue | |
text_ptr = text_index + len(chunk_text) | |
pct_complete = text_ptr / len(text) | |
log.debug("%.1f%%: %s", pct_complete * 100, chunk) | |
update_progress(pct_complete / 2, "Synthesizing...") | |
else: | |
log.warning("Unknown chunk type: %s: %s", chunk_type, json.dumps(chunk)) | |
else: | |
log.info("TTS cached at %s", output_tts_path) | |
audio_duration = sf.info(output_tts_path).duration | |
expected_processing_time = audio_duration / 10 + 20 # 10x real-time on nvidia t4 | |
ts1 = time.time() | |
output_rvc_path = cache_path(voice.tts, voice.name, rate, text, extension="mp3") | |
rvc_cached = os.path.exists(output_rvc_path) | |
if not rvc_cached: | |
log.info( | |
"Converting %s of audio into %s. Expected duration: %s", | |
seconds_to_ms(audio_duration), | |
output_rvc_path, | |
seconds_to_ms(expected_processing_time), | |
) | |
last_check = 0.0 | |
timeout = httpx.Timeout(5, read=60.0) | |
endpoint_url = env_str("RVC_ENDPOINT") | |
async with httpx.AsyncClient(timeout=timeout) as client: | |
response = await client.post(f"{endpoint_url}/v1/rvc", content=output_tts_path.read_bytes()) | |
rvc_start = time.time() | |
response.raise_for_status() | |
data = response.json() | |
log.info("Submitted for conversion: %s", data) | |
result_url = data["urls"]["result"] | |
while True: | |
elapsed = time.time() - ts1 | |
rvc_elapsed = time.time() - rvc_start | |
proportion = elapsed / expected_processing_time | |
pct_complete = 0.5 + math.tanh(proportion) / 2 | |
update_progress(pct_complete, "Processing...") | |
if rvc_elapsed > 0.8 * expected_processing_time and elapsed - last_check > 10: | |
last_check = elapsed | |
response = await client.get(result_url) | |
content_type = response.headers.get("Content-Type") | |
processed_bytes = await response.aread() | |
log.info(f"Checking status: %s (%s) {len(processed_bytes):,} bytes", response.status_code, content_type) | |
if response.status_code == 200 and content_type == "audio/mpeg": | |
output_rvc_path.write_bytes(processed_bytes) | |
break | |
elif response.status_code != 404: | |
response.raise_for_status() | |
await asyncio.sleep(0.1) | |
log.info("Successfully converted text (%s chars) -> %s", len(text), output_rvc_path) | |
else: | |
log.info("Already converted: %s", output_rvc_path) | |
def format_duration(duration: float) -> str: | |
return "Cached" if duration < 1 else seconds_to_ms(duration) | |
def format_wpm(duration: float) -> str: | |
return "Cached" if duration < 1 else f"{word_count * 60 / duration:,.0f}" | |
def format_rate(duration: float) -> str: | |
return "Cached" if duration < 1 else f"{audio_duration / duration:.1f}x" | |
def format_latency(latency: float) -> str: | |
return "N/A" if latency < 1 else f"{latency:.2f}s" | |
ts2 = time.time() | |
total_time = ts2 - ts0 | |
rvc_time = ts2 - rvc_start if rvc_start > 0 else 0 | |
tts_time = ts1 - tts_start if tts_start > 0 else 0 | |
word_count = len(text.split()) | |
durations = (audio_duration, total_time, tts_time, rvc_time) | |
times = " | ".join(format_duration(t) for t in durations) | |
wpms = " | ".join(format_wpm(t) for t in durations) | |
rates = " | ".join(format_rate(t) for t in durations) | |
latencies = " | ".join(format_latency(latency) for latency in (0, 0, tts_start - ts0, rvc_start - ts1)) | |
rvc_cost = "N/A" if rvc_cached else f"{rvc_time * 0.0164:.1f}¢" | |
markdown_status = f""" | |
Audio successfully synthesized. | |
| | Words | Chars | Cost | | |
|-----|------:|------:|-----:| | |
|Count|{word_count:,}|{len(text):,}|{rvc_cost}| | |
| |Actual|Processing|TTS|RVC| | |
|-----|-----:|---------:|--:|--:| | |
|Time|{times}| | |
|WPM|{wpms}| | |
|Rate|{rates}| | |
|Latency|{latencies}| | |
""".strip() | |
log.info(markdown_status) | |
return markdown_status, str(output_rvc_path) | |