Spaces:
Paused
Paused
import spaces | |
import os | |
import gradio as gr | |
import torch | |
import torchaudio | |
from transformers import pipeline | |
from pytube import YouTube | |
import re | |
import numpy as np | |
from scipy.signal import wiener | |
from io import BytesIO | |
import noisereduce as nr | |
import soundfile as sf | |
pipe = pipeline(model="anzorq/w2v-bert-2.0-kbd-v2", device=0) | |
# Define the replacements for Kabardian transcription | |
replacements = [ | |
('гъ', 'ɣ'), ('дж', 'j'), ('дз', 'ӡ'), ('жь', 'ʐ'), ('кӏ', 'қ'), | |
('кхъ', 'qҳ'), ('къ', 'q'), ('лъ', 'ɬ'), ('лӏ', 'ԯ'), ('пӏ', 'ԥ'), | |
('тӏ', 'ҭ'), ('фӏ', 'ჶ'), ('хь', 'h'), ('хъ', 'ҳ'), ('цӏ', 'ҵ'), | |
('щӏ', 'ɕ'), ('я', 'йа') | |
] | |
# Reverse replacements for transcription | |
reverse_replacements = {v: k for k, v in replacements} | |
reverse_pattern = re.compile('|'.join(re.escape(key) for key in reverse_replacements)) | |
def replace_symbols_back(text): | |
return reverse_pattern.sub(lambda match: reverse_replacements[match.group(0)], text) | |
def preprocess_audio(audio_tensor, original_sample_rate, apply_normalization): | |
audio_tensor = audio_tensor.to(dtype=torch.float32) | |
audio_tensor = torch.mean(audio_tensor, dim=0, keepdim=True) # Convert to mono | |
if apply_normalization: | |
audio_tensor = audio_tensor / torch.max(torch.abs(audio_tensor)) # Normalize | |
audio_tensor = torchaudio.functional.resample(audio_tensor, orig_freq=original_sample_rate, new_freq=16000) # Resample | |
return audio_tensor | |
def spectral_gating(audio_tensor): | |
audio_data = audio_tensor.numpy() | |
reduced_noise = nr.reduce_noise(y=audio_data, sr=16_000) | |
return torch.tensor(reduced_noise, dtype=audio_tensor.dtype) | |
def wiener_filter(audio_tensor): | |
audio_data = audio_tensor.numpy() | |
filtered_audio = wiener(audio_data) | |
return torch.tensor(filtered_audio, dtype=audio_tensor.dtype) | |
def transcribe_speech(audio_path, progress=gr.Progress()): | |
if audio_path is None: | |
return "No audio received.", None | |
progress(0.1, desc="Preprocessing audio...") | |
audio_tensor, original_sample_rate = torchaudio.load(audio_path) | |
audio_tensor = preprocess_audio(audio_tensor, original_sample_rate, apply_normalization) | |
progress(0.7, desc="Transcribing audio...") | |
audio_np = audio_tensor.numpy().squeeze() | |
transcription = pipe(audio_np, chunk_length_s=10)['text'] | |
transcription = replace_symbols_back(transcription) | |
return transcription | |
def transcribe_from_youtube(url, apply_wiener_filter, apply_normalization, apply_spectral_gating, progress=gr.Progress()): | |
progress(0, "Downloading YouTube audio...") | |
try: | |
yt = YouTube(url) | |
stream = yt.streams.filter(only_audio=True).first() | |
audio_data = BytesIO() | |
stream.stream_to_buffer(audio_data) | |
audio_data.seek(0) | |
audio_tensor, original_sample_rate = torchaudio.load(audio_data) | |
audio_tensor = preprocess_audio(audio_tensor, original_sample_rate, apply_normalization) | |
if apply_wiener_filter: | |
progress(0.4, "Applying Wiener filter...") | |
audio_tensor = wiener_filter(audio_tensor) | |
if apply_spectral_gating: | |
progress(0.6, "Applying Spectral Gating filter...") | |
audio_tensor = spectral_gating(audio_tensor) | |
progress(0.8, "Transcribing audio...") | |
audio_np = audio_tensor.numpy().squeeze() | |
transcription = pipe(audio_np, chunk_length_s=10)['text'] | |
transcription = replace_symbols_back(transcription) | |
audio_np = audio_tensor.numpy().squeeze() | |
sf.write("temp_audio.wav", audio_np, 16000, subtype='PCM_16') | |
except Exception as e: | |
return str(e), None | |
return transcription, "temp_audio.wav" | |
def populate_metadata(url): | |
yt = YouTube(url) | |
return yt.thumbnail_url, yt.title | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.HTML( | |
""" | |
<div style="text-align: center; max-width: 500px; margin: 0 auto;"> | |
<div> | |
<h1>Kabardian Speech Transcription</h1> | |
</div> | |
<p style="margin-bottom: 10px; font-size: 94%"> | |
Kabardian speech to text transcription using a fine-tuned Wav2Vec2-BERT model | |
</p> | |
</div> | |
""" | |
) | |
with gr.Tab("Microphone Input"): | |
gr.Markdown("## Transcribe speech from microphone") | |
mic_audio = gr.Audio(sources=['microphone','upload'], type="filepath", label="Record or upload an audio") | |
transcribe_button = gr.Button("Transcribe") | |
transcription_output = gr.Textbox(label="Transcription") | |
transcribe_button.click(fn=transcribe_speech, inputs=[mic_audio], outputs=[transcription_output]) | |
with gr.Tab("YouTube URL"): | |
gr.Markdown("## Transcribe speech from YouTube video") | |
youtube_url = gr.Textbox(label="Enter YouTube video URL") | |
with gr.Accordion("Audio Improvements", open=False): | |
apply_normalization = gr.Checkbox(label="Normalize audio volume", value=False) | |
apply_spectral_gating = gr.Checkbox(label="Apply Spectral Gating filter", info="Noise reduction", value=False) | |
apply_wiener = gr.Checkbox(label="Apply Wiener filter", info="Noise reduction", value=False) | |
with gr.Row(): | |
img = gr.Image(label="Thumbnail", height=240, width=240, scale=1) | |
title = gr.Label(label="Video Title", scale=2) | |
transcribe_button = gr.Button("Transcribe") | |
transcription_output = gr.Textbox(label="Transcription", placeholder="Transcription Output", lines=10) | |
audio_output = gr.Audio(label="Processed Audio") | |
transcribe_button.click(fn=transcribe_from_youtube, inputs=[youtube_url, apply_wiener, apply_normalization, apply_spectral_gating], outputs=[transcription_output, audio_output]) | |
youtube_url.change(populate_metadata, inputs=[youtube_url], outputs=[img, title]) | |
demo.launch() |