Spaces:
Build error
Build error
import io | |
import logging | |
from typing import Optional | |
from werkzeug.datastructures import FileStorage | |
from core.model_manager import ModelManager | |
from core.model_runtime.entities.model_entities import ModelType | |
from models.model import App, AppMode, AppModelConfig, Message | |
from services.errors.audio import ( | |
AudioTooLargeServiceError, | |
NoAudioUploadedServiceError, | |
ProviderNotSupportSpeechToTextServiceError, | |
ProviderNotSupportTextToSpeechServiceError, | |
UnsupportedAudioTypeServiceError, | |
) | |
FILE_SIZE = 30 | |
FILE_SIZE_LIMIT = FILE_SIZE * 1024 * 1024 | |
ALLOWED_EXTENSIONS = ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm", "amr"] | |
logger = logging.getLogger(__name__) | |
class AudioService: | |
def transcript_asr(cls, app_model: App, file: FileStorage, end_user: Optional[str] = None): | |
if app_model.mode in {AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value}: | |
workflow = app_model.workflow | |
if workflow is None: | |
raise ValueError("Speech to text is not enabled") | |
features_dict = workflow.features_dict | |
if "speech_to_text" not in features_dict or not features_dict["speech_to_text"].get("enabled"): | |
raise ValueError("Speech to text is not enabled") | |
else: | |
app_model_config: AppModelConfig = app_model.app_model_config | |
if not app_model_config.speech_to_text_dict["enabled"]: | |
raise ValueError("Speech to text is not enabled") | |
if file is None: | |
raise NoAudioUploadedServiceError() | |
extension = file.mimetype | |
if extension not in [f"audio/{ext}" for ext in ALLOWED_EXTENSIONS]: | |
raise UnsupportedAudioTypeServiceError() | |
file_content = file.read() | |
file_size = len(file_content) | |
if file_size > FILE_SIZE_LIMIT: | |
message = f"Audio size larger than {FILE_SIZE} mb" | |
raise AudioTooLargeServiceError(message) | |
model_manager = ModelManager() | |
model_instance = model_manager.get_default_model_instance( | |
tenant_id=app_model.tenant_id, model_type=ModelType.SPEECH2TEXT | |
) | |
if model_instance is None: | |
raise ProviderNotSupportSpeechToTextServiceError() | |
buffer = io.BytesIO(file_content) | |
buffer.name = "temp.mp3" | |
return {"text": model_instance.invoke_speech2text(file=buffer, user=end_user)} | |
def transcript_tts( | |
cls, | |
app_model: App, | |
text: Optional[str] = None, | |
voice: Optional[str] = None, | |
end_user: Optional[str] = None, | |
message_id: Optional[str] = None, | |
): | |
from collections.abc import Generator | |
from flask import Response, stream_with_context | |
from app import app | |
from extensions.ext_database import db | |
def invoke_tts(text_content: str, app_model, voice: Optional[str] = None): | |
with app.app_context(): | |
if app_model.mode in {AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value}: | |
workflow = app_model.workflow | |
if workflow is None: | |
raise ValueError("TTS is not enabled") | |
features_dict = workflow.features_dict | |
if "text_to_speech" not in features_dict or not features_dict["text_to_speech"].get("enabled"): | |
raise ValueError("TTS is not enabled") | |
voice = features_dict["text_to_speech"].get("voice") if voice is None else voice | |
else: | |
text_to_speech_dict = app_model.app_model_config.text_to_speech_dict | |
if not text_to_speech_dict.get("enabled"): | |
raise ValueError("TTS is not enabled") | |
voice = text_to_speech_dict.get("voice") if voice is None else voice | |
model_manager = ModelManager() | |
model_instance = model_manager.get_default_model_instance( | |
tenant_id=app_model.tenant_id, model_type=ModelType.TTS | |
) | |
try: | |
if not voice: | |
voices = model_instance.get_tts_voices() | |
if voices: | |
voice = voices[0].get("value") | |
else: | |
raise ValueError("Sorry, no voice available.") | |
return model_instance.invoke_tts( | |
content_text=text_content.strip(), user=end_user, tenant_id=app_model.tenant_id, voice=voice | |
) | |
except Exception as e: | |
raise e | |
if message_id: | |
message = db.session.query(Message).filter(Message.id == message_id).first() | |
if message.answer == "" and message.status == "normal": | |
return None | |
else: | |
response = invoke_tts(message.answer, app_model=app_model, voice=voice) | |
if isinstance(response, Generator): | |
return Response(stream_with_context(response), content_type="audio/mpeg") | |
return response | |
else: | |
response = invoke_tts(text, app_model, voice) | |
if isinstance(response, Generator): | |
return Response(stream_with_context(response), content_type="audio/mpeg") | |
return response | |
def transcript_tts_voices(cls, tenant_id: str, language: str): | |
model_manager = ModelManager() | |
model_instance = model_manager.get_default_model_instance(tenant_id=tenant_id, model_type=ModelType.TTS) | |
if model_instance is None: | |
raise ProviderNotSupportTextToSpeechServiceError() | |
try: | |
return model_instance.get_tts_voices(language) | |
except Exception as e: | |
raise e | |