File size: 4,630 Bytes
d8b6286
 
7d6a9ed
68f6bb9
7d6a9ed
68f6bb9
d8b6286
7d6a9ed
68f6bb9
 
 
 
 
 
 
 
 
 
 
 
 
 
913c46d
68f6bb9
 
 
7d6a9ed
 
 
 
 
 
913c46d
 
 
7d6a9ed
 
 
913c46d
 
 
 
7d6a9ed
68f6bb9
9e4c424
68f6bb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
913c46d
68f6bb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import torch
import spaces
import gradio as gr
import os
from pydub import AudioSegment
from pyannote.audio.pipelines import SpeakerDiarization


# 初始化 pyannote/speaker-diarization 模型
HF_TOKEN = os.environ.get("HUGGINGFACE_READ_TOKEN")
pipeline = None
try:
    pipeline = SpeakerDiarization.from_pretrained(
        "pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN
    )
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    pipeline.to(device)
except Exception as e:
    print(f"Error initializing pipeline: {e}")
    pipeline = None

# 音频处理函数:拼接目标音频和混合音频
def combine_audio_with_time(target_audio, mixed_audio):
    if pipeline is None:
        return "错误: 模型未初始化"

    # 加载目标说话人的样本音频
    target_audio_segment = AudioSegment.from_wav(target_audio.name)

    # 加载混合音频
    mixed_audio_segment = AudioSegment.from_wav(mixed_audio.name)

    # 记录目标说话人音频的时间点(精确到0.01秒)
    target_start_time = len(mixed_audio_segment) / 1000  # 秒为单位,精确到 0.01 秒

    # 将目标说话人的音频片段添加到混合音频的最后
    final_audio = mixed_audio_segment + target_audio_segment

    # 保存拼接后的音频并返回时间点
    final_audio.export("final_output.wav", format="wav")

    return "final_output.wav", target_start_time

# 使用 pyannote/speaker-diarization 对拼接后的音频进行说话人分离
@spaces.GPU(duration=60 * 2)
def diarize_audio(temp_file):
    if pipeline is None:
        return "错误: 模型未初始化"
    
    try:
        diarization = pipeline(temp_file)
    except Exception as e:
        return f"处理音频时出错: {e}"

    # 返回 diarization 输出
    return str(diarization)

# 处理并生成标签文件
def generate_labels_from_diarization(diarization_output):
    labels_path = 'labels.txt'
    successful_lines = 0

    try:
        with open(labels_path, 'w') as outfile:
            lines = diarization_output.strip().split('\n')
            for line in lines:
                try:
                    parts = line.strip()[1:-1].split(' --> ')
                    start_time = parts[0].strip()
                    end_time = parts[1].split(']')[0].strip()
                    label = line.split()[-1].strip()
                    start_seconds = timestamp_to_seconds(start_time)
                    end_seconds = timestamp_to_seconds(end_time)
                    outfile.write(f"{start_seconds}\t{end_seconds}\t{label}\n")
                    successful_lines += 1
                except Exception as e:
                    print(f"处理行时出错: '{line.strip()}'. 错误: {e}")
        print(f"成功处理了 {successful_lines} 行。")
        return labels_path if successful_lines > 0 else None
    except Exception as e:
        print(f"写入文件时出错: {e}")
        return None

# 将时间戳转换为秒
def timestamp_to_seconds(timestamp):
    try:
        h, m, s = map(float, timestamp.split(':'))
        return 3600 * h + 60 * m + s
    except ValueError as e:
        print(f"转换时间戳时出错: '{timestamp}'. 错误: {e}")
        return None
        
@spaces.GPU(duration=60 * 2)
# 处理音频文件
def process_audio(audio):
    diarization_result = diarize_audio(save_audio(audio))
    if diarization_result.startswith("错误"):
        return diarization_result, None  # 如果出错,返回错误信息和空的标签文件
    else:
        label_file = generate_labels_from_diarization(diarization_result)
        return diarization_result, label_file

# 保存上传的音频
def save_audio(audio):
    with open(audio.name, "rb") as f:
        audio_data = f.read()

    # 保存上传的音频文件到临时位置
    with open("temp.wav", "wb") as f:
        f.write(audio_data)

    return "temp.wav"

# Gradio 接口
with gr.Blocks() as demo:
    gr.Markdown("""
    # 🗣️ 音频拼接与说话人分类 🗣️
    上传目标说话人音频和混合音频,拼接并进行说话人分类。
    """)

    audio_input = gr.Audio(type="filepath", label="上传目标说话人音频")
    mixed_audio_input = gr.Audio(type="filepath", label="上传混合音频")

    process_button = gr.Button("处理音频")
    diarization_output = gr.Textbox(label="说话人分离结果")
    label_file_link = gr.File(label="下载标签文件")

    # 处理音频
    process_button.click(
        fn=process_audio,
        inputs=[audio_input],
        outputs=[diarization_output, label_file_link]
    )

demo.launch(share=False)