File size: 10,165 Bytes
3667c7a
 
 
ac13632
3667c7a
 
 
 
44800eb
e12b285
44800eb
3667c7a
 
 
3a5dbe6
 
 
 
 
3667c7a
ac13632
3a5dbe6
 
e12b285
 
 
 
 
 
 
3a5dbe6
 
 
 
 
 
 
 
 
 
 
 
 
 
e12b285
 
 
 
 
 
 
 
 
 
 
3a5dbe6
 
 
 
 
 
 
 
 
 
e12b285
 
 
 
 
 
 
3a5dbe6
 
 
 
 
 
 
 
e12b285
3a5dbe6
 
 
 
 
e12b285
 
 
 
 
 
 
 
3a5dbe6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3667c7a
3a5dbe6
 
 
e12b285
 
 
 
 
 
 
 
44800eb
 
e12b285
44800eb
33727a3
44800eb
 
e12b285
44800eb
 
 
 
 
 
e12b285
44800eb
 
 
3667c7a
ac13632
 
e12b285
 
 
 
 
 
ac13632
3a5dbe6
 
 
 
 
e12b285
 
 
 
 
 
3a5dbe6
 
ac13632
 
 
 
3667c7a
 
e12b285
 
941931d
 
e12b285
941931d
e12b285
 
 
 
e98b248
941931d
e98b248
 
 
 
941931d
e12b285
941931d
e12b285
941931d
e12b285
 
941931d
 
 
e98b248
 
 
 
941931d
 
 
 
 
 
e98b248
 
941931d
 
 
 
 
 
 
 
 
 
e98b248
e12b285
e98b248
 
 
 
941931d
e12b285
 
 
 
 
55d992f
8d9f6bc
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import io
import wave

import numpy as np
import requests

from openai import OpenAI

from utils.errors import APIError, AudioConversionError
from typing import List, Dict, Optional, Generator, Tuple


class STTManager:
    def __init__(self, config):
        self.SAMPLE_RATE = 48000
        self.CHUNK_LENGTH = 5
        self.STEP_LENGTH = 3
        self.MAX_RELIABILITY_CUTOFF = self.CHUNK_LENGTH - 1

        self.config = config
        self.status = self.test_stt()
        self.streaming = self.test_streaming()

    def numpy_audio_to_bytes(self, audio_data: np.ndarray) -> bytes:
        """
        Convert a numpy array of audio data to bytes.

        :param audio_data: Numpy array containing audio data.
        :return: Bytes representation of the audio data.
        """
        num_channels = 1
        sampwidth = 2

        buffer = io.BytesIO()
        try:
            with wave.open(buffer, "wb") as wf:
                wf.setnchannels(num_channels)
                wf.setsampwidth(sampwidth)
                wf.setframerate(self.SAMPLE_RATE)
                wf.writeframes(audio_data.tobytes())
        except Exception as e:
            raise AudioConversionError(f"Error converting numpy array to audio bytes: {e}")
        return buffer.getvalue()

    def process_audio_chunk(
        self, audio: Tuple[int, np.ndarray], audio_buffer: np.ndarray, transcript: Dict
    ) -> Tuple[Dict, np.ndarray, str]:
        """
        Process streamed audio data to accumulate and transcribe with overlapping segments.

        :param audio: Tuple containing the sample rate and audio data as numpy array.
        :param audio_buffer: Current audio buffer as numpy array.
        :param transcript: Current transcript dictionary.
        :return: Updated transcript, updated audio buffer, and transcript text.
        """
        audio_buffer = np.concatenate((audio_buffer, audio[1]))

        if len(audio_buffer) >= self.SAMPLE_RATE * self.CHUNK_LENGTH or len(audio_buffer) % (self.SAMPLE_RATE // 2) != 0:
            audio_bytes = self.numpy_audio_to_bytes(audio_buffer[: self.SAMPLE_RATE * self.CHUNK_LENGTH])
            audio_buffer = audio_buffer[self.SAMPLE_RATE * self.STEP_LENGTH :]
            new_transcript = self.speech_to_text_stream(audio_bytes)
            transcript = self.merge_transcript(transcript, new_transcript)

        return transcript, audio_buffer, transcript["text"]

    def speech_to_text_stream(self, audio: bytes) -> List[Dict[str, str]]:
        """
        Convert speech to text from a byte stream using streaming.

        :param audio: Bytes representation of audio data.
        :return: List of dictionaries containing transcribed words and their timestamps.
        """
        if self.config.stt.type == "HF_API":
            raise APIError("STT Error: Streaming not supported for this STT type")
        try:
            data = ("temp.wav", audio, "audio/wav")
            client = OpenAI(base_url=self.config.stt.url, api_key=self.config.stt.key)
            transcription = client.audio.transcriptions.create(
                model=self.config.stt.name, file=data, response_format="verbose_json", timestamp_granularities=["word"]
            )
        except APIError:
            raise
        except Exception as e:
            raise APIError(f"STT Error: Unexpected error: {e}")
        return transcription.words

    def merge_transcript(self, transcript: Dict, new_transcript: List[Dict[str, str]]) -> Dict:
        """
        Merge new transcript data with the existing transcript.

        :param transcript: Existing transcript dictionary.
        :param new_transcript: New transcript data to merge.
        :return: Updated transcript dictionary.
        """
        cut_off = transcript["last_cutoff"]
        transcript["last_cutoff"] = self.MAX_RELIABILITY_CUTOFF - self.STEP_LENGTH

        transcript["words"] = transcript["words"][: len(transcript["words"]) - transcript["not_confirmed"]]
        transcript["not_confirmed"] = 0
        first_word = True

        for word_dict in new_transcript:
            if word_dict["start"] >= cut_off:
                if first_word:
                    if len(transcript["words"]) > 0 and transcript["words"][-1] == word_dict["word"]:
                        continue
                first_word = False
                transcript["words"].append(word_dict["word"])
                if word_dict["start"] > self.MAX_RELIABILITY_CUTOFF:
                    transcript["not_confirmed"] += 1
                else:
                    transcript["last_cutoff"] = max(1.0, word_dict["end"] - self.STEP_LENGTH)

        transcript["text"] = " ".join(transcript["words"])
        return transcript

    def speech_to_text_full(self, audio: Tuple[int, np.ndarray]) -> str:
        """
        Convert speech to text from a full audio segment.

        :param audio: Tuple containing the sample rate and audio data as numpy array.
        :return: Transcribed text.
        """
        audio_bytes = self.numpy_audio_to_bytes(audio[1])
        try:
            if self.config.stt.type == "OPENAI_API":
                data = ("temp.wav", audio_bytes, "audio/wav")
                client = OpenAI(base_url=self.config.stt.url, api_key=self.config.stt.key)
                transcription = client.audio.transcriptions.create(model=self.config.stt.name, file=data, response_format="text")
            elif self.config.stt.type == "HF_API":
                headers = {"Authorization": "Bearer " + self.config.stt.key}
                response = requests.post(self.config.stt.url, headers=headers, data=audio_bytes)
                if response.status_code != 200:
                    error_details = response.json().get("error", "No error message provided")
                    raise APIError("STT Error: HF API error", status_code=response.status_code, details=error_details)
                transcription = response.json().get("text", None)
                if transcription is None:
                    raise APIError("STT Error: No transcription returned by HF API")
        except APIError:
            raise
        except Exception as e:
            raise APIError(f"STT Error: Unexpected error: {e}")

        return transcription

    def test_stt(self) -> bool:
        """
        Test if the STT service is working correctly.

        :return: True if the STT service is working, False otherwise.
        """
        try:
            self.speech_to_text_full((48000, np.zeros(10000)))
            return True
        except:
            return False

    def test_streaming(self) -> bool:
        """
        Test if the STT streaming service is working correctly.

        :return: True if the STT streaming service is working, False otherwise.
        """
        try:
            self.speech_to_text_stream(self.numpy_audio_to_bytes(np.zeros(10000)))
            return True
        except:
            return False


class TTSManager:
    def __init__(self, config):
        self.config = config
        self.status = self.test_tts(stream=False)
        self.streaming = self.test_tts(stream=True) if self.status else False

    def test_tts(self, stream) -> bool:
        """
        Test if the TTS service is working correctly.
        :return: True if the TTS service is working, False otherwise.
        """
        try:
            list(self.read_text("Handshake", stream=stream))
            return True
        except:
            return False

    def read_text(self, text: str, stream: Optional[bool] = None) -> Generator[bytes, None, None]:
        """
        Convert text to speech and return the audio bytes, optionally streaming the response.
        :param text: Text to convert to speech.
        :param stream: Whether to use streaming or not.
        :return: Generator yielding chunks of audio bytes.
        """
        if stream is None:
            stream = self.streaming

        headers = {"Authorization": "Bearer " + self.config.tts.key}
        data = {"model": self.config.tts.name, "input": text, "voice": "alloy", "response_format": "opus"}

        try:
            if not stream:
                if self.config.tts.type == "OPENAI_API":
                    response = requests.post(self.config.tts.url + "/audio/speech", headers=headers, json=data)
                elif self.config.tts.type == "HF_API":
                    response = requests.post(self.config.tts.url, headers=headers, json={"inputs": text})

                if response.status_code != 200:
                    error_details = response.json().get("error", "No error message provided")
                    raise APIError(f"TTS Error: {self.config.tts.type} error", status_code=response.status_code, details=error_details)
                yield response.content
            else:
                if self.config.tts.type != "OPENAI_API":
                    raise APIError("TTS Error: Streaming not supported for this TTS type")

                with requests.post(self.config.tts.url + "/audio/speech", headers=headers, json=data, stream=True) as response:
                    if response.status_code != 200:
                        error_details = response.json().get("error", "No error message provided")
                        raise APIError("TTS Error: OPENAI API error", status_code=response.status_code, details=error_details)
                    yield from response.iter_content(chunk_size=1024)
        except APIError:
            raise
        except Exception as e:
            raise APIError(f"TTS Error: Unexpected error: {e}")

    def read_last_message(self, chat_history: List[List[Optional[str]]]) -> Generator[bytes, None, None]:
        """
        Read the last message in the chat history and convert it to speech.
        :param chat_history: List of chat messages.
        :return: Generator yielding chunks of audio bytes.
        """
        if len(chat_history) > 0 and chat_history[-1][1]:
            n = len(chat_history) - 1
            while n >= 0 and chat_history[n][1]:
                n -= 1
            for i in range(n + 1, len(chat_history)):
                yield from self.read_text(chat_history[i][1])