import torch import gradio as gr import os from pyannote.audio import Pipeline from pydub import AudioSegment # 获取 Hugging Face 认证令牌 HF_TOKEN = os.environ.get("HUGGINGFACE_READ_TOKEN") pipeline = None # 尝试加载 pyannote 模型 try: pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN ) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") pipeline.to(device) except Exception as e: print(f"Error initializing pipeline: {e}") pipeline = None # 时间戳转换为秒 def timestamp_to_seconds(timestamp): h, m, s = map(float, timestamp.split(':')) return 3600 * h + 60 * m + s # 音频拼接函数:拼接目标音频和混合音频,返回目标音频的起始时间和结束时间作为字典 def combine_audio_with_time(target_audio, mixed_audio): if pipeline is None: return "错误: 模型未初始化" # 打印文件路径,确保文件正确传递 print(f"目标音频文件路径: {target_audio}") print(f"混合音频文件路径: {mixed_audio}") # 加载目标说话人的样本音频 try: target_audio_segment = AudioSegment.from_wav(target_audio) except Exception as e: return f"加载目标音频时出错: {e}" # 加载混合音频 try: mixed_audio_segment = AudioSegment.from_wav(mixed_audio) except Exception as e: return f"加载混合音频时出错: {e}" # 记录目标说话人音频的时间点(精确到0.01秒) target_start_time = len(mixed_audio_segment) / 1000 # 秒为单位,精确到 0.01 秒 # 目标音频的结束时间(拼接后的音频长度) target_end_time = target_start_time + len(target_audio_segment) / 1000 # 秒为单位 # 将目标说话人的音频片段添加到混合音频的最后 final_audio = mixed_audio_segment + target_audio_segment final_audio.export("final_output.wav", format="wav") # 返回目标音频的起始时间和结束时间 return {"start_time": target_start_time, "end_time": target_end_time} # 使用 pyannote/speaker-diarization 对拼接后的音频进行说话人分离 @gr.Interface(duration=60 * 2) # 使用 GPU 加速,限制执行时间为 120 秒 def diarize_audio(temp_file): if pipeline is None: return "错误: 模型未初始化" try: diarization = pipeline(temp_file) print("说话人分离结果:") for turn, _, speaker in diarization.itertracks(yield_label=True): print(f"[{turn.start:.3f} --> {turn.end:.3f}] {speaker}") return diarization except Exception as e: return f"处理音频时出错: {e}" # 获取目标说话人的时间段并替换指定的SPEAKER_00 def get_speaker_segments(diarization, target_start_time, target_end_time, final_audio_length): speaker_segments = {} # 遍历所有说话人时间段 for turn, _, speaker in diarization.itertracks(yield_label=True): start = turn.start end = turn.end # 如果是目标说话人 if speaker == 'SPEAKER_00': # 替换目标音频的时间段 if start < target_end_time and end > target_start_time: # 目标音频时间段被截断,重新计算其时间段 new_start = max(start, target_start_time) new_end = min(end, target_end_time) speaker_segments.setdefault(speaker, []).append((new_start, new_end)) else: # 完全不与目标音频重叠的时间段 if end <= target_start_time or start >= target_end_time: speaker_segments.setdefault(speaker, []).append((start, end)) return speaker_segments # 处理音频文件并返回输出 def process_audio(target_audio, mixed_audio): print(f"处理音频:目标音频: {target_audio}, 混合音频: {mixed_audio}") # 进行音频拼接并返回目标音频的起始和结束时间(作为字典) time_dict = combine_audio_with_time(target_audio, mixed_audio) # 如果音频拼接出错,返回错误信息 if isinstance(time_dict, str): return time_dict # 执行说话人分离 diarization_result = diarize_audio("final_output.wav") if isinstance(diarization_result, str) and diarization_result.startswith("错误"): return diarization_result # 出错时返回错误信息 else: # 获取拼接后的音频长度 final_audio_length = len(AudioSegment.from_wav("final_output.wav")) / 1000 # 秒为单位 # 获取目标说话人的时间段(已排除和截断目标音频时间段) speaker_segments = get_speaker_segments( diarization_result, time_dict['start_time'], time_dict['end_time'], final_audio_length ) if speaker_segments and 'SPEAKER_00' in speaker_segments: # 返回目标说话人的时间段(已排除和截断目标音频时间段) return { 'segments': speaker_segments['SPEAKER_00'], 'total_duration': sum(end - start for start, end in speaker_segments['SPEAKER_00']) } else: return "没有找到SPEAKER_00的时间段。" # Gradio 接口 with gr.Blocks() as demo: gr.Markdown(""" # 🗣️ 音频拼接与说话人分类 🗣️ 上传目标音频和混合音频,拼接并进行说话人分类。 结果包括目标说话人(SPEAKER_00)的时间段,已排除和截断目标录音时间段。 """) mixed_audio_input = gr.Audio(type="filepath", label="上传混合音频") target_audio_input = gr.Audio(type="filepath", label="上传目标说话人音频") process_button = gr.Button("处理音频") # 输出结果 diarization_output = gr.Textbox(label="说话人时间段") # 点击按钮时触发处理音频 process_button.click( fn=process_audio, inputs=[target_audio_input, mixed_audio_input], outputs=[diarization_output] ) demo.launch(share=True)