tan-z-tan's picture
Commit
9b5cb27
raw
history blame contribute delete
No virus
6.55 kB
import gradio as gr
import numpy as np
import pandas as pd
import torch
import torchaudio
from datetime import datetime
from lang_id import identify_languages
from whisper import transcribe
# アプリケーションの状態を保持する変数
data = []
data_df = pd.DataFrame()
current_chunk = []
SAMPLING_RATE = 16000
CHUNK_DURATION = 5 # 初期値としての5秒
def normalize_audio(audio):
# 音量の正規化(最大振幅が1になるようにスケーリング)
audio = audio / np.max(np.abs(audio))
return audio
def resample_audio(audio, orig_sr, target_sr=16000):
if orig_sr != target_sr:
print(f"Resampling audio from {orig_sr} to {target_sr}")
audio = audio.astype(np.float32)
resampler = torchaudio.transforms.Resample(orig_freq=orig_sr, new_freq=target_sr)
audio = resampler(torch.from_numpy(audio).unsqueeze(0)).squeeze(0).numpy()
return audio
def process_chunk(chunk, language_set) -> pd.DataFrame:
print(f"Processing audio chunk of length {len(chunk)}")
rms = np.sqrt(np.mean(chunk**2))
db_level = 20 * np.log10(rms + 1e-9) # 加えた小さな値で-inf値を防ぐ
# 音量の正規化
chunk = normalize_audio(chunk)
length = len(chunk) / SAMPLING_RATE # 音声データの長さ(秒)
s = datetime.now()
selected_scores, all_scores = identify_languages(chunk, language_set)
lang_id_time = (datetime.now() - s).total_seconds()
# 日本語と英語の確率値を取得
ja_prob = selected_scores['Japanese']
en_prob = selected_scores['English']
ja_en = 'ja' if ja_prob > en_prob else 'en'
# Top 3言語を取得
top3_languages = ", ".join([f"{lang} ({all_scores[lang]:.2f})" for lang in sorted(all_scores, key=all_scores.get, reverse=True)[:3]])
# テキストの認識
s = datetime.now()
transcription = transcribe(chunk, language=ja_en)
transcribe_time = (datetime.now() - s).total_seconds()
return pd.DataFrame({
"Length (s)": [length],
"db_level": [db_level],
"Japanese_English": [f"{ja_en} ({ja_prob:.2f}, {en_prob:.2f})"] if db_level > 50 else ["Silent"],
"Language": [top3_languages],
"Lang ID Time": [lang_id_time],
"Transcribe Time": [transcribe_time],
"Text": [transcription],
})
def process_audio_stream(audio, chunk_duration, language_set):
global data_df, current_chunk, SAMPLING_RATE
print("Process_audio_stream")
if audio is None:
return None, data_df
sr, audio_data = audio
# language_set
language_set = [lang.strip() for lang in language_set.split(",")]
print(audio_data.shape, audio_data.dtype)
# 一番最初にSampling rateを揃えておく
audio_data = resample_audio(audio_data, sr, target_sr=SAMPLING_RATE)
audio_sec = 0
current_chunk.append(audio_data)
total_chunk = np.concatenate(current_chunk)
# CHUNK_DURATIONを超えていたら処理
if len(total_chunk) >= SAMPLING_RATE * chunk_duration:
chunk = total_chunk[:SAMPLING_RATE * chunk_duration]
total_chunk = total_chunk[SAMPLING_RATE * chunk_duration:]
audio_sec += chunk_duration
# Check if the audio in the window is too quiet
# rms = np.sqrt(np.mean(chunk**2))
# db_level = 20 * np.log10(rms + 1e-9) # 加えた小さな値で-inf値を防ぐ
# print(db_level)
df = process_chunk(chunk, language_set)
# add db_level
# df["dB Level"] = db_level
data_df = pd.concat([data_df, df], ignore_index=True)
current_chunk = [total_chunk]
return (SAMPLING_RATE, chunk), data_df
else:
return (SAMPLING_RATE, total_chunk), data_df
def process_audio(audio, chunk_duration, language_set):
global data, data_df, current_chunk, SAMPLING_RATE
# reset state
data = []
data_df = pd.DataFrame()
current_chunk = []
print("Process_audio")
print(audio)
if audio is None:
return
sr, audio_data = audio
# language_set
language_set = [lang.strip() for lang in language_set.split(",")]
print(audio_data.shape, audio_data.dtype)
# 一番最初にSampling rateを揃えておく
audio_data = resample_audio(audio_data, sr, target_sr=SAMPLING_RATE)
audio_sec = 0
# Check if the audio in the window is too quiet
rms = np.sqrt(np.mean(audio_data**2))
db_level = 20 * np.log10(rms + 1e-9) # 加えた小さな値で-inf値を防ぐ
print(db_level)
# 音量の正規化
audio_data = normalize_audio(audio_data)
# 新しいデータを現在のチャンクに追加
current_chunk.append(audio_data)
total_chunk = np.concatenate(current_chunk)
while len(total_chunk) >= SAMPLING_RATE * chunk_duration:
chunk = total_chunk[:SAMPLING_RATE * chunk_duration]
total_chunk = total_chunk[SAMPLING_RATE * chunk_duration:] # 処理済みの部分を削除
audio_sec += chunk_duration
print(f"Processing audio chunk of length {len(chunk)}")
df = process_chunk(chunk, language_set)
data_df = pd.concat([data_df, df], ignore_index=True)
yield (SAMPLING_RATE, chunk), data_df
# 未処理の残りのデータを保持
current_chunk = [total_chunk]
# パラメータの入力コンポーネント
chunk_duration_input = gr.Number(value=5, label="Chunk Duration (seconds)")
language_set_input = gr.Textbox(value="Japanese,English", label="Language Set (comma-separated)")
inputs_file = [gr.Audio(sources=["upload"], type="numpy"), chunk_duration_input, language_set_input]
inputs_stream = [gr.Audio(sources=["microphone"], type="numpy", streaming=True), chunk_duration_input, language_set_input]
outputs = [gr.Audio(type="numpy"), gr.DataFrame(headers=["Time", "Volume", "Length (s)"])]
with gr.Blocks() as demo:
with gr.TabItem("Upload"):
gr.Interface(
fn=process_audio,
inputs=inputs_file,
outputs=outputs,
live=False,
title="File Audio Processing",
description="Upload an audio file to see the processing results."
)
with gr.TabItem("Microphone"):
gr.Interface(
fn=process_audio_stream,
inputs=inputs_stream,
outputs=outputs,
live=True,
title="Real-time Audio Processing",
description="Speak into the microphone and see real-time audio processing results."
)
if __name__ == "__main__":
demo.launch()