File size: 5,121 Bytes
d8b6286 7d6a9ed 68f6bb9 6d3bc8f 7d6a9ed 8a8a249 68f6bb9 8a8a249 68f6bb9 6d3bc8f 68f6bb9 8a8a249 913c46d 68f6bb9 7d6a9ed 6d3bc8f 7d6a9ed 6d3bc8f 7d6a9ed 913c46d b7bc027 7d6a9ed b7bc027 913c46d 8a8a249 7d6a9ed 68f6bb9 b7bc027 68f6bb9 6d3bc8f 68f6bb9 6d3bc8f 8a8a249 6d3bc8f 8a8a249 6d3bc8f 68f6bb9 6d3bc8f 68f6bb9 6d3bc8f 68f6bb9 8a8a249 68f6bb9 913c46d 68f6bb9 b7bc027 68f6bb9 6d3bc8f 68f6bb9 6d3bc8f 68f6bb9 6d3bc8f 68f6bb9 b7bc027 68f6bb9 6d3bc8f 68f6bb9 6d3bc8f b7bc027 68f6bb9 8a8a249 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import torch
import spaces
import gradio as gr
import os
from pyannote.audio import Pipeline
from pydub import AudioSegment
# 获取 Hugging Face 认证令牌
HF_TOKEN = os.environ.get("HUGGINGFACE_READ_TOKEN")
pipeline = None
# 尝试加载 pyannote 模型
try:
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pipeline.to(device)
except Exception as e:
print(f"Error initializing pipeline: {e}")
pipeline = None
# 音频拼接函数:拼接目标音频和混合音频,返回目标音频的起始时间和结束时间作为字典
def combine_audio_with_time(target_audio, mixed_audio):
if pipeline is None:
return "错误: 模型未初始化"
# 加载目标说话人的样本音频
target_audio_segment = AudioSegment.from_wav(target_audio)
# 加载混合音频
mixed_audio_segment = AudioSegment.from_wav(mixed_audio)
# 记录目标说话人音频的时间点(精确到0.01秒)
target_start_time = len(mixed_audio_segment) / 1000 # 秒为单位,精确到 0.01 秒
# 目标音频的结束时间(拼接后的音频长度)
target_end_time = target_start_time + len(target_audio_segment) / 1000 # 秒为单位
# 将目标说话人的音频片段添加到混合音频的最后
mixed_audio_segment + target_audio_segment
# 返回字典,包含目标音频的起始和结束时间
return {"start_time": target_start_time, "end_time": target_end_time}
# 使用 pyannote/speaker-diarization 对拼接后的音频进行说话人分离
@spaces.GPU(duration=60 * 2) # 使用 GPU 加速,限制执行时间为 120 秒
def diarize_audio(temp_file):
if pipeline is None:
return "错误: 模型未初始化"
try:
diarization = pipeline(temp_file)
except Exception as e:
return f"处理音频时出错: {e}"
# 返回 diarization 输出
return str(diarization)
# 生成标签文件的函数
def generate_labels_from_diarization(diarization_output):
labels_path = 'labels.txt'
successful_lines = 0
try:
with open(labels_path, 'w') as outfile:
lines = diarization_output.strip().split('\n')
for line in lines:
try:
parts = line.strip()[1:-1].split(' --> ')
start_time = parts[0].strip()
end_time = parts[1].split(']')[0].strip()
label = line.split()[-1].strip()
start_seconds = timestamp_to_seconds(start_time)
end_seconds = timestamp_to_seconds(end_time)
outfile.write(f"{start_seconds}\t{end_seconds}\t{label}\n")
successful_lines += 1
except Exception as e:
print(f"处理行时出错: '{line.strip()}'. 错误: {e}")
print(f"成功处理了 {successful_lines} 行。")
return labels_path if successful_lines > 0 else None
except Exception as e:
print(f"写入文件时出错: {e}")
return None
# 将时间戳转换为秒
def timestamp_to_seconds(timestamp):
try:
h, m, s = map(float, timestamp.split(':'))
return 3600 * h + 60 * m + s
except ValueError as e:
print(f"转换时间戳时出错: '{timestamp}'. 错误: {e}")
return None
# 处理音频文件并返回输出
def process_audio(target_audio, mixed_audio):
# 进行音频拼接并返回目标音频的起始和结束时间(作为字典)
time_dict = combine_audio_with_time(target_audio, mixed_audio)
# 执行说话人分离
diarization_result = diarize_audio("final_output.wav")
if diarization_result.startswith("错误"):
return diarization_result, None, None # 出错时返回错误信息
else:
# 生成标签文件
label_file = generate_labels_from_diarization(diarization_result)
return diarization_result, label_file, time_dict # 返回说话人分离结果、标签文件和目标音频的时间段
# Gradio 接口
with gr.Blocks() as demo:
gr.Markdown("""
# 🗣️ 音频拼接与说话人分类 🗣️
上传目标说话人音频和混合音频,拼接并进行说话人分类。结果包括说话人分离输出、标签文件和目标音频的时间段。
""")
target_audio_input = gr.Audio(type="filepath", label="上传目标说话人音频")
mixed_audio_input = gr.Audio(type="filepath", label="上传混合音频")
process_button = gr.Button("处理音频")
# 输出结果
diarization_output = gr.Textbox(label="说话人分离结果")
label_file_link = gr.File(label="下载标签文件")
time_range_output = gr.Textbox(label="目标音频时间段")
# 点击按钮时触发处理音频
process_button.click(
fn=process_audio,
inputs=[target_audio_input, mixed_audio_input],
outputs=[diarization_output, label_file_link, time_range_output]
)
demo.launch(share=True)
|