File size: 3,869 Bytes
d2d6865
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import asyncio
import io
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Dict, List, Union, Tuple

import sounddevice as sd
import soundfile as sf
from elevenlabslib import ElevenLabsUser, ElevenLabsVoice

from .utils import timeit

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

USER = ElevenLabsUser(os.environ["ELEVENLABS_API_KEY"])


@dataclass
class Speaker:
    name: str
    voice: ElevenLabsVoice
    color: str
    description: str = None


async def text_to_speechbytes_async(text, speaker, loop):
    with ThreadPoolExecutor() as executor:
        speech_bytes = await loop.run_in_executor(executor, text_to_speechbytes, text, speaker.voice)
    return speech_bytes


async def play_history(history: List[Tuple[Speaker, str]]):
    loop = asyncio.get_event_loop()

    # Create a list of tasks for all text_to_speechbytes function calls
    tasks = [text_to_speechbytes_async(
        text, speaker, loop) for speaker, text in history]

    # Run tasks concurrently, waiting for the first one to complete
    for speech_bytes in await asyncio.gather(*tasks):
        audioFile = io.BytesIO(speech_bytes)
        soundFile = sf.SoundFile(audioFile)
        sd.play(soundFile.read(), samplerate=soundFile.samplerate, blocking=True)


async def save_history(history: List[Tuple[Speaker, str]], audio_savepath: str):
    loop = asyncio.get_event_loop()

    # Create a list of tasks for all text_to_speechbytes function calls
    tasks = [text_to_speechbytes_async(
        text, speaker, loop) for speaker, text in history]

    # Run tasks concurrently, waiting for the first one to complete
    all_speech_bytes = await asyncio.gather(*tasks)

    # Combine all audio bytes into a single audio file
    concatenated_audio = io.BytesIO(b''.join(all_speech_bytes))

    # Save the combined audio file to disk
    with sf.SoundFile(concatenated_audio, mode='r') as soundFile:
        with sf.SoundFile(
            audio_savepath, mode='w',
            samplerate=soundFile.samplerate,
            channels=soundFile.channels,
        ) as outputFile:
            outputFile.write(soundFile.read())


def check_voice_exists(voice: Union[ElevenLabsVoice, str]) -> Union[ElevenLabsVoice, None]:
    log.info(f"Getting voice {voice}...")
    _available_voices = USER.get_voices_by_name(voice)
    if _available_voices:
        log.info(f"Voice {voice} already exists, found {_available_voices}.")
        return _available_voices[0]
    return None


@timeit
def get_make_voice(voice: Union[ElevenLabsVoice, str], audio_path: List[str] = None) -> ElevenLabsVoice:
    _voice = check_voice_exists(voice)
    if _voice is not None:
        return _voice
    else:
        if USER.get_voice_clone_available():
            assert audio_path is not None, "audio_path must be provided"
            assert isinstance(audio_path, list), "audio_path must be a list"
            log.info(f"Cloning voice {voice}...")
            _audio_source_dict = {
                # Audio path is a PosixPath
                _.name: open(_, "rb").read() for _ in audio_path
            }
            newVoice = USER.clone_voice_bytes(voice, _audio_source_dict)
            return newVoice
    raise ValueError(
        f"Voice {voice} does not exist and cloning is not available.")


@timeit
def text_to_speech(text: str, voice: ElevenLabsVoice):
    log.info(f"Generating audio using voice {voice}...")
    time_start = time.time()
    voice.generate_and_play_audio(text, playInBackground=False)
    duration = time.time() - time_start
    return duration


@timeit
def text_to_speechbytes(text: str, voice: ElevenLabsVoice):
    log.info(f"Generating audio for voice {voice} text {text}...")
    audio_bytes = voice.generate_audio_bytes(text)
    return audio_bytes