File size: 3,768 Bytes
b3385db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import asyncio
import json
import logging
import os
from functools import lru_cache

import edge_tts
import gradio as gr

from tts_service.utils import cache_path
from tts_service.voices import voice_manager

log = logging.getLogger(__name__)


@lru_cache(maxsize=None)
def import_voice_converter():
    from rvc.infer.infer import VoiceConverter

    return VoiceConverter()


# TTS
async def run_tts_script(
    text: str,
    voice_name: str = "male-1",
    rate: int = 0,
    progress=gr.Progress(),  # noqa: B008
) -> tuple[str, str]:
    async def update_progress(pct, msg) -> None:
        log.debug("Progress: %.1f%%: %s", pct * 100, msg)
        progress(pct, msg)
        await asyncio.sleep(0)

    await update_progress(0, "Starting...")
    voice = voice_manager.voices[voice_name]
    format = "wav"

    text = text.strip()
    output_tts_path = cache_path(voice.tts, "", rate, text, extension=format)
    text_ptr = 0
    if not os.path.exists(output_tts_path):
        rates = f"+{rate}%" if rate >= 0 else f"{rate}%"
        communicate = edge_tts.Communicate(text, voice.tts, rate=rates)
        with open(output_tts_path, "wb") as f:
            async for chunk in communicate.stream():
                chunk_type = chunk["type"]
                if chunk_type == "audio":
                    f.write(chunk["data"])
                elif chunk_type == "WordBoundary":
                    chunk_text = chunk["text"]
                    text_index = text.index(chunk_text, text_ptr)
                    if text_index == -1:
                        log.warning("Extraneous text received from edge tts: %s", chunk_text)
                        continue
                    text_ptr = text_index + len(chunk_text)
                    pct_complete = text_ptr / len(text)
                    log.debug("%.1f%%: %s", pct_complete * 100, chunk)
                    await update_progress(pct_complete / 2, "Synthesizing...")
                else:
                    log.warning("Unknown chunk type: %s: %s", chunk_type, json.dumps(chunk))

    output_rvc_path = cache_path(voice.tts, voice.name, rate, text, extension=format)
    if not os.path.exists(output_rvc_path):
        infer_pipeline = import_voice_converter()
        await infer_pipeline.convert_audio(
            pitch=voice.pitch,
            filter_radius=voice.filter_radius,
            index_rate=voice.index_rate,
            volume_envelope=voice.rms_mix_rate,
            protect=voice.protect,
            hop_length=voice.hop_length,
            f0_method=voice.f0_method,
            audio_input_path=str(output_tts_path),
            audio_output_path=str(output_rvc_path),
            model_path=voice.model,
            index_path=voice.index,
            split_audio=True,
            f0_autotune=voice.autotune is not None,
            f0_autotune_strength=voice.autotune,
            clean_audio=voice.clean is not None,
            clean_strength=voice.clean,
            export_format=format.upper(),
            upscale_audio=voice.upscale,
            f0_file=None,
            embedder_model=voice.embedder_model,
            embedder_model_custom=None,
            sid=0,
            formant_shifting=None,
            formant_qfrency=None,
            formant_timbre=None,
            post_process=None,
            reverb=None,
            pitch_shift=None,
            limiter=None,
            gain=None,
            distortion=None,
            chorus=None,
            bitcrush=None,
            clipping=None,
            compressor=None,
            delay=None,
            sliders=None,
            callback=lambda pct: update_progress(0.5 + pct / 2, "Converting..."),
        )

    return "Text synthesized successfully.", str(output_rvc_path)


# Prerequisites