Spaces:
Runtime error
Runtime error
File size: 6,108 Bytes
c6fd5b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import asyncio
import json
import logging
import math
import os
import time
from contextlib import suppress
import edge_tts
import gradio as gr
import httpx
import soundfile as sf
from ..utils import cache_path, env_str, seconds_to_ms
from ..voices import voice_manager
log = logging.getLogger(__name__)
# TTS
async def generate_speech_from_text(
text: str,
voice_name: str,
rate: int = 0,
progress=gr.Progress(), # noqa: B008
) -> tuple[str, str]:
def update_progress(pct, msg) -> None:
log.debug("Progress: %.1f%%: %s", pct * 100, msg)
progress(pct, msg)
tts_start = -1.0
rvc_start = -1.0
ts0 = time.time()
update_progress(0, "Starting...")
voice = voice_manager.voices[voice_name]
text = text.strip()
output_tts_path = cache_path(voice.tts, "", rate, text, extension="mp3")
text_ptr = 0
tts_cached = os.path.exists(output_tts_path)
if not tts_cached:
log.info("Synthesizing %s chars into %s", len(text), output_tts_path)
rates = f"+{rate}%" if rate >= 0 else f"{rate}%"
communicate = edge_tts.Communicate(text, voice.tts, rate=rates)
with open(output_tts_path, "wb") as f:
async for chunk in communicate.stream():
if tts_start < 0:
tts_start = time.time()
chunk_type = chunk["type"]
if chunk_type == "audio":
f.write(chunk["data"])
elif chunk_type == "WordBoundary":
chunk_text = chunk["text"]
text_index = -1
with suppress(ValueError):
text_index = text.index(chunk_text, text_ptr)
if text_index == -1:
log.warning("Extraneous text received from edge tts: %s", chunk_text)
continue
text_ptr = text_index + len(chunk_text)
pct_complete = text_ptr / len(text)
log.debug("%.1f%%: %s", pct_complete * 100, chunk)
update_progress(pct_complete / 2, "Synthesizing...")
else:
log.warning("Unknown chunk type: %s: %s", chunk_type, json.dumps(chunk))
else:
log.info("TTS cached at %s", output_tts_path)
audio_duration = sf.info(output_tts_path).duration
expected_processing_time = audio_duration / 10 + 20 # 10x real-time on nvidia t4
ts1 = time.time()
output_rvc_path = cache_path(voice.tts, voice.name, rate, text, extension="mp3")
rvc_cached = os.path.exists(output_rvc_path)
if not rvc_cached:
log.info(
"Converting %s of audio into %s. Expected duration: %s",
seconds_to_ms(audio_duration),
output_rvc_path,
seconds_to_ms(expected_processing_time),
)
last_check = 0.0
timeout = httpx.Timeout(5, read=60.0)
endpoint_url = env_str("RVC_ENDPOINT")
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(f"{endpoint_url}/v1/rvc", content=output_tts_path.read_bytes())
rvc_start = time.time()
response.raise_for_status()
data = response.json()
log.info("Submitted for conversion: %s", data)
result_url = data["urls"]["result"]
while True:
elapsed = time.time() - ts1
rvc_elapsed = time.time() - rvc_start
proportion = elapsed / expected_processing_time
pct_complete = 0.5 + math.tanh(proportion) / 2
update_progress(pct_complete, "Processing...")
if rvc_elapsed > 0.8 * expected_processing_time and elapsed - last_check > 10:
last_check = elapsed
response = await client.get(result_url)
content_type = response.headers.get("Content-Type")
processed_bytes = await response.aread()
log.info(f"Checking status: %s (%s) {len(processed_bytes):,} bytes", response.status_code, content_type)
if response.status_code == 200 and content_type == "audio/mpeg":
output_rvc_path.write_bytes(processed_bytes)
break
elif response.status_code != 404:
response.raise_for_status()
await asyncio.sleep(0.1)
log.info("Successfully converted text (%s chars) -> %s", len(text), output_rvc_path)
else:
log.info("Already converted: %s", output_rvc_path)
def format_duration(duration: float) -> str:
return "Cached" if duration < 1 else seconds_to_ms(duration)
def format_wpm(duration: float) -> str:
return "Cached" if duration < 1 else f"{word_count * 60 / duration:,.0f}"
def format_rate(duration: float) -> str:
return "Cached" if duration < 1 else f"{audio_duration / duration:.1f}x"
def format_latency(latency: float) -> str:
return "N/A" if latency < 1 else f"{latency:.2f}s"
ts2 = time.time()
total_time = ts2 - ts0
rvc_time = ts2 - rvc_start if rvc_start > 0 else 0
tts_time = ts1 - tts_start if tts_start > 0 else 0
word_count = len(text.split())
durations = (audio_duration, total_time, tts_time, rvc_time)
times = " | ".join(format_duration(t) for t in durations)
wpms = " | ".join(format_wpm(t) for t in durations)
rates = " | ".join(format_rate(t) for t in durations)
latencies = " | ".join(format_latency(latency) for latency in (0, 0, tts_start - ts0, rvc_start - ts1))
rvc_cost = "N/A" if rvc_cached else f"{rvc_time * 0.0164:.1f}¢"
markdown_status = f"""
Audio successfully synthesized.
| | Words | Chars | Cost |
|-----|------:|------:|-----:|
|Count|{word_count:,}|{len(text):,}|{rvc_cost}|
| |Actual|Processing|TTS|RVC|
|-----|-----:|---------:|--:|--:|
|Time|{times}|
|WPM|{wpms}|
|Rate|{rates}|
|Latency|{latencies}|
""".strip()
log.info(markdown_status)
return markdown_status, str(output_rvc_path)
|