import soundfile as sf import datetime from pyctcdecode import BeamSearchDecoderCTC import torch import os import time import gc import gradio as gr import librosa from transformers import Wav2Vec2ForCTC, Wav2Vec2ProcessorWithLM, AutoModelForSeq2SeqLM, AutoTokenizer from numba import cuda # load pretrained model model = Wav2Vec2ForCTC.from_pretrained("facebook/mms-1b-all") processor = Wav2Vec2ProcessorWithLM.from_pretrained("jlonsako/mms-1b-all-AmhLM") #Define Functions #convert time into .sbv format def format_time(seconds): # Convert seconds to hh:mm:ss,ms format return str(datetime.timedelta(seconds=seconds)).replace('.', ',') #Convert Video/Audio into 16K wav file def preprocessAudio(audioFile): os.system(f"ffmpeg -y -i {audioFile.name} -ar 16000 ./audioToConvert.wav") #Transcribe!!! def Transcribe(file): device = "cuda:0" if torch.cuda.is_available() else "cpu" start_time = time.time() model.load_adapter("amh") model.half() preprocessAudio(file) block_size = 30 batch_size = 22 # or whatever number you choose transcripts = [] speech_segments = [] stream = librosa.stream( "./audioToConvert.wav", block_length=block_size, frame_length=16000, hop_length=16000 ) model.to(device) print("Model loaded to gpu: Entering transcription phase") #Code for timestamping encoding_start = 0 encoding_end = 0 sbv_file = open("subtitle.sbv", "w") for speech_segment in stream: if len(speech_segment.shape) > 1: speech_segment = speech_segment[:,0] + speech_segment[:,1] speech_segments.append(speech_segment) if len(speech_segments) == batch_size: input_values = processor(speech_segments, sampling_rate=16_000, return_tensors="pt", padding=True).input_values.to(device) input_values = input_values.half() with torch.no_grad(): logits = model(input_values).logits if len(logits.shape) == 1: logits = logits.unsqueeze(0) #predicted_ids = torch.argmax(logits, dim=-1) transcriptions = processor.batch_decode(logits.cpu().numpy()).text transcripts.extend(transcriptions) # Write to the .sbv file for i, transcription in enumerate(transcriptions): encoding_start = encoding_end # Maintain the 'encoding_start' across batches encoding_end = encoding_start + block_size formatted_start = format_time(encoding_start) formatted_end = format_time(encoding_end) sbv_file.write(f"{formatted_start},{formatted_end}\n") sbv_file.write(f"{transcription}\n\n") # Clear the batch speech_segments = [] # Freeing up memory del input_values del logits del transcriptions torch.cuda.empty_cache() gc.collect() if speech_segments: input_values = processor(speech_segments, sampling_rate=16_000, return_tensors="pt", padding=True).input_values.to(device) input_values = input_values.half() with torch.no_grad(): logits = model(input_values).logits transcriptions = processor.batch_decode(logits.cpu().numpy()).text transcripts.extend(transcriptions) for i in range(len(speech_segments)): encoding_end = encoding_start + block_size formatted_start = format_time(encoding_start) formatted_end = format_time(encoding_end) sbv_file.write(f"{formatted_start},{formatted_end}\n") sbv_file.write(f"{transcriptions[i]}\n\n") encoding_start = encoding_end # Freeing up memory del input_values del logits del transcriptions torch.cuda.empty_cache() gc.collect() # Join all transcripts into a single transcript transcript = ' '.join(transcripts) sbv_file.close() end_time = time.time() print(f"The script ran for {end_time - start_time} seconds.") return("./subtitle.sbv") demo = gr.Interface(fn=Transcribe, inputs=gr.File(label="Upload an audio file of Amharic content"), outputs=gr.File(label="Download .sbv transcription"), title="Amharic Audio Transcription", description="This application uses Meta MMS and a custom kenLM model to transcribe Amharic Audio files of arbitrary length into .sbv files. Upload an Amharic audio file and get your transcription!" ) demo.launch()