Spaces:
Running
Running
from whisperx.alignment import ( | |
DEFAULT_ALIGN_MODELS_TORCH as DAMT, | |
DEFAULT_ALIGN_MODELS_HF as DAMHF, | |
) | |
from whisperx.utils import TO_LANGUAGE_CODE | |
import whisperx | |
import torch | |
import gc | |
import os | |
import soundfile as sf | |
from IPython.utils import capture # noqa | |
from .language_configuration import EXTRA_ALIGN, INVERTED_LANGUAGES | |
from .logging_setup import logger | |
from .postprocessor import sanitize_file_name | |
from .utils import remove_directory_contents, run_command | |
# ZERO GPU CONFIG | |
import spaces | |
import copy | |
import random | |
import time | |
def random_sleep(): | |
if os.environ.get("ZERO_GPU") == "TRUE": | |
print("Random sleep") | |
sleep_time = round(random.uniform(7.2, 9.9), 1) | |
time.sleep(sleep_time) | |
def load_and_transcribe_audio(asr_model, audio, compute_type, language, asr_options, batch_size, segment_duration_limit): | |
# Load model | |
model = whisperx.load_model( | |
asr_model, | |
os.environ.get("SONITR_DEVICE") if os.environ.get("ZERO_GPU") != "TRUE" else "cuda", | |
compute_type=compute_type, | |
language=language, | |
asr_options=asr_options, | |
) | |
# Transcribe audio | |
result = model.transcribe( | |
audio, | |
batch_size=batch_size, | |
chunk_size=segment_duration_limit, | |
print_progress=True, | |
) | |
del model | |
gc.collect() | |
torch.cuda.empty_cache() # noqa | |
return result | |
def load_align_and_align_segments(result, audio, DAMHF): | |
# Load alignment model | |
model_a, metadata = whisperx.load_align_model( | |
language_code=result["language"], | |
device=os.environ.get("SONITR_DEVICE") if os.environ.get("ZERO_GPU") != "TRUE" else "cuda", | |
model_name=None | |
if result["language"] in DAMHF.keys() | |
else EXTRA_ALIGN[result["language"]], | |
) | |
# Align segments | |
alignment_result = whisperx.align( | |
result["segments"], | |
model_a, | |
metadata, | |
audio, | |
os.environ.get("SONITR_DEVICE") if os.environ.get("ZERO_GPU") != "TRUE" else "cuda", | |
return_char_alignments=True, | |
print_progress=False, | |
) | |
# Clean up | |
del model_a | |
gc.collect() | |
torch.cuda.empty_cache() # noqa | |
return alignment_result | |
def diarize_audio(diarize_model, audio_wav, min_speakers, max_speakers): | |
if os.environ.get("ZERO_GPU") == "TRUE": | |
diarize_model.model.to(torch.device("cuda")) | |
diarize_segments = diarize_model( | |
audio_wav, | |
min_speakers=min_speakers, | |
max_speakers=max_speakers | |
) | |
return diarize_segments | |
# ZERO GPU CONFIG | |
ASR_MODEL_OPTIONS = [ | |
"tiny", | |
"base", | |
"small", | |
"medium", | |
"large", | |
"large-v1", | |
"large-v2", | |
"large-v3", | |
"distil-large-v2", | |
"Systran/faster-distil-whisper-large-v3", | |
"tiny.en", | |
"base.en", | |
"small.en", | |
"medium.en", | |
"distil-small.en", | |
"distil-medium.en", | |
"OpenAI_API_Whisper", | |
] | |
COMPUTE_TYPE_GPU = [ | |
"default", | |
"auto", | |
"int8", | |
"int8_float32", | |
"int8_float16", | |
"int8_bfloat16", | |
"float16", | |
"bfloat16", | |
"float32" | |
] | |
COMPUTE_TYPE_CPU = [ | |
"default", | |
"auto", | |
"int8", | |
"int8_float32", | |
"int16", | |
"float32", | |
] | |
WHISPER_MODELS_PATH = './WHISPER_MODELS' | |
def openai_api_whisper( | |
input_audio_file, | |
source_lang=None, | |
chunk_duration=1800 | |
): | |
info = sf.info(input_audio_file) | |
duration = info.duration | |
output_directory = "./whisper_api_audio_parts" | |
os.makedirs(output_directory, exist_ok=True) | |
remove_directory_contents(output_directory) | |
if duration > chunk_duration: | |
# Split the audio file into smaller chunks with 30-minute duration | |
cm = f'ffmpeg -i "{input_audio_file}" -f segment -segment_time {chunk_duration} -c:a libvorbis "{output_directory}/output%03d.ogg"' | |
run_command(cm) | |
# Get list of generated chunk files | |
chunk_files = sorted( | |
[f"{output_directory}/{f}" for f in os.listdir(output_directory) if f.endswith('.ogg')] | |
) | |
else: | |
one_file = f"{output_directory}/output000.ogg" | |
cm = f'ffmpeg -i "{input_audio_file}" -c:a libvorbis {one_file}' | |
run_command(cm) | |
chunk_files = [one_file] | |
# Transcript | |
segments = [] | |
language = source_lang if source_lang else None | |
for i, chunk in enumerate(chunk_files): | |
from openai import OpenAI | |
client = OpenAI() | |
audio_file = open(chunk, "rb") | |
transcription = client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
language=language, | |
response_format="verbose_json", | |
timestamp_granularities=["segment"], | |
) | |
try: | |
transcript_dict = transcription.model_dump() | |
except: # noqa | |
transcript_dict = transcription.to_dict() | |
if language is None: | |
logger.info(f'Language detected: {transcript_dict["language"]}') | |
language = TO_LANGUAGE_CODE[transcript_dict["language"]] | |
chunk_time = chunk_duration * (i) | |
for seg in transcript_dict["segments"]: | |
if "start" in seg.keys(): | |
segments.append( | |
{ | |
"text": seg["text"], | |
"start": seg["start"] + chunk_time, | |
"end": seg["end"] + chunk_time, | |
} | |
) | |
audio = whisperx.load_audio(input_audio_file) | |
result = {"segments": segments, "language": language} | |
return audio, result | |
def find_whisper_models(): | |
path = WHISPER_MODELS_PATH | |
folders = [] | |
if os.path.exists(path): | |
for folder in os.listdir(path): | |
folder_path = os.path.join(path, folder) | |
if ( | |
os.path.isdir(folder_path) | |
and 'model.bin' in os.listdir(folder_path) | |
): | |
folders.append(folder) | |
return folders | |
def transcribe_speech( | |
audio_wav, | |
asr_model, | |
compute_type, | |
batch_size, | |
SOURCE_LANGUAGE, | |
literalize_numbers=True, | |
segment_duration_limit=15, | |
): | |
""" | |
Transcribe speech using a whisper model. | |
Parameters: | |
- audio_wav (str): Path to the audio file in WAV format. | |
- asr_model (str): The whisper model to be loaded. | |
- compute_type (str): Type of compute to be used (e.g., 'int8', 'float16'). | |
- batch_size (int): Batch size for transcription. | |
- SOURCE_LANGUAGE (str): Source language for transcription. | |
Returns: | |
- Tuple containing: | |
- audio: Loaded audio file. | |
- result: Transcription result as a dictionary. | |
""" | |
if asr_model == "OpenAI_API_Whisper": | |
if literalize_numbers: | |
logger.info( | |
"OpenAI's API Whisper does not support " | |
"the literalization of numbers." | |
) | |
return openai_api_whisper(audio_wav, SOURCE_LANGUAGE) | |
# https://github.com/openai/whisper/discussions/277 | |
prompt = "以下是普通话的句子。" if SOURCE_LANGUAGE == "zh" else None | |
SOURCE_LANGUAGE = ( | |
SOURCE_LANGUAGE if SOURCE_LANGUAGE != "zh-TW" else "zh" | |
) | |
asr_options = { | |
"initial_prompt": prompt, | |
"suppress_numerals": literalize_numbers | |
} | |
if asr_model not in ASR_MODEL_OPTIONS: | |
base_dir = WHISPER_MODELS_PATH | |
if not os.path.exists(base_dir): | |
os.makedirs(base_dir) | |
model_dir = os.path.join(base_dir, sanitize_file_name(asr_model)) | |
if not os.path.exists(model_dir): | |
from ctranslate2.converters import TransformersConverter | |
quantization = "float32" | |
# Download new model | |
try: | |
converter = TransformersConverter( | |
asr_model, | |
low_cpu_mem_usage=True, | |
copy_files=[ | |
"tokenizer_config.json", "preprocessor_config.json" | |
] | |
) | |
converter.convert( | |
model_dir, | |
quantization=quantization, | |
force=False | |
) | |
except Exception as error: | |
if "File tokenizer_config.json does not exist" in str(error): | |
converter._copy_files = [ | |
"tokenizer.json", "preprocessor_config.json" | |
] | |
converter.convert( | |
model_dir, | |
quantization=quantization, | |
force=True | |
) | |
else: | |
raise error | |
asr_model = model_dir | |
logger.info(f"ASR Model: {str(model_dir)}") | |
audio = whisperx.load_audio(audio_wav) | |
result = load_and_transcribe_audio( | |
asr_model, audio, compute_type, SOURCE_LANGUAGE, asr_options, batch_size, segment_duration_limit | |
) | |
if result["language"] == "zh" and not prompt: | |
result["language"] = "zh-TW" | |
logger.info("Chinese - Traditional (zh-TW)") | |
return audio, result | |
def align_speech(audio, result): | |
""" | |
Aligns speech segments based on the provided audio and result metadata. | |
Parameters: | |
- audio (array): The audio data in a suitable format for alignment. | |
- result (dict): Metadata containing information about the segments | |
and language. | |
Returns: | |
- result (dict): Updated metadata after aligning the segments with | |
the audio. This includes character-level alignments if | |
'return_char_alignments' is set to True. | |
Notes: | |
- This function uses language-specific models to align speech segments. | |
- It performs language compatibility checks and selects the | |
appropriate alignment model. | |
- Cleans up memory by releasing resources after alignment. | |
""" | |
DAMHF.update(DAMT) # lang align | |
if ( | |
not result["language"] in DAMHF.keys() | |
and not result["language"] in EXTRA_ALIGN.keys() | |
): | |
logger.warning( | |
"Automatic detection: Source language not compatible with align" | |
) | |
raise ValueError( | |
f"Detected language {result['language']} incompatible, " | |
"you can select the source language to avoid this error." | |
) | |
if ( | |
result["language"] in EXTRA_ALIGN.keys() | |
and EXTRA_ALIGN[result["language"]] == "" | |
): | |
lang_name = ( | |
INVERTED_LANGUAGES[result["language"]] | |
if result["language"] in INVERTED_LANGUAGES.keys() | |
else result["language"] | |
) | |
logger.warning( | |
"No compatible wav2vec2 model found " | |
f"for the language '{lang_name}', skipping alignment." | |
) | |
return result | |
# random_sleep() | |
result = load_align_and_align_segments(result, audio, DAMHF) | |
return result | |
diarization_models = { | |
"pyannote_3.1": "pyannote/speaker-diarization-3.1", | |
"pyannote_2.1": "pyannote/speaker-diarization@2.1", | |
"disable": "", | |
} | |
def reencode_speakers(result): | |
if result["segments"][0]["speaker"] == "SPEAKER_00": | |
return result | |
speaker_mapping = {} | |
counter = 0 | |
logger.debug("Reencode speakers") | |
for segment in result["segments"]: | |
old_speaker = segment["speaker"] | |
if old_speaker not in speaker_mapping: | |
speaker_mapping[old_speaker] = f"SPEAKER_{counter:02d}" | |
counter += 1 | |
segment["speaker"] = speaker_mapping[old_speaker] | |
return result | |
def diarize_speech( | |
audio_wav, | |
result, | |
min_speakers, | |
max_speakers, | |
YOUR_HF_TOKEN, | |
model_name="pyannote/speaker-diarization@2.1", | |
): | |
""" | |
Performs speaker diarization on speech segments. | |
Parameters: | |
- audio_wav (array): Audio data in WAV format to perform speaker | |
diarization. | |
- result (dict): Metadata containing information about speech segments | |
and alignments. | |
- min_speakers (int): Minimum number of speakers expected in the audio. | |
- max_speakers (int): Maximum number of speakers expected in the audio. | |
- YOUR_HF_TOKEN (str): Your Hugging Face API token for model | |
authentication. | |
- model_name (str): Name of the speaker diarization model to be used | |
(default: "pyannote/speaker-diarization@2.1"). | |
Returns: | |
- result_diarize (dict): Updated metadata after assigning speaker | |
labels to segments. | |
Notes: | |
- This function utilizes a speaker diarization model to label speaker | |
segments in the audio. | |
- It assigns speakers to word-level segments based on diarization results. | |
- Cleans up memory by releasing resources after diarization. | |
- If only one speaker is specified, each segment is automatically assigned | |
as the first speaker, eliminating the need for diarization inference. | |
""" | |
if max(min_speakers, max_speakers) > 1 and model_name: | |
try: | |
diarize_model = whisperx.DiarizationPipeline( | |
model_name=model_name, | |
use_auth_token=YOUR_HF_TOKEN, | |
device=os.environ.get("SONITR_DEVICE"), | |
) | |
except Exception as error: | |
error_str = str(error) | |
gc.collect() | |
torch.cuda.empty_cache() # noqa | |
if "'NoneType' object has no attribute 'to'" in error_str: | |
if model_name == diarization_models["pyannote_2.1"]: | |
raise ValueError( | |
"Accept the license agreement for using Pyannote 2.1." | |
" You need to have an account on Hugging Face and " | |
"accept the license to use the models: " | |
"https://huggingface.co/pyannote/speaker-diarization " | |
"and https://huggingface.co/pyannote/segmentation " | |
"Get your KEY TOKEN here: " | |
"https://hf.co/settings/tokens " | |
) | |
elif model_name == diarization_models["pyannote_3.1"]: | |
raise ValueError( | |
"New Licence Pyannote 3.1: You need to have an account" | |
" on Hugging Face and accept the license to use the " | |
"models: https://huggingface.co/pyannote/speaker-diarization-3.1 " # noqa | |
"and https://huggingface.co/pyannote/segmentation-3.0 " | |
) | |
else: | |
raise error | |
random_sleep() | |
diarize_segments = diarize_audio(diarize_model, audio_wav, min_speakers, max_speakers) | |
result_diarize = whisperx.assign_word_speakers( | |
diarize_segments, result | |
) | |
for segment in result_diarize["segments"]: | |
if "speaker" not in segment: | |
segment["speaker"] = "SPEAKER_00" | |
logger.warning( | |
f"No speaker detected in {segment['start']}. First TTS " | |
f"will be used for the segment text: {segment['text']} " | |
) | |
del diarize_model | |
gc.collect() | |
torch.cuda.empty_cache() # noqa | |
else: | |
result_diarize = result | |
result_diarize["segments"] = [ | |
{**item, "speaker": "SPEAKER_00"} | |
for item in result_diarize["segments"] | |
] | |
return reencode_speakers(result_diarize) | |