dubbing / app.py
lele1894's picture
Upload 6 files
718902f verified
import gradio as gr
import whisper
import yt_dlp
import os
from deep_translator import GoogleTranslator
from edge_tts import Communicate
import asyncio
import logging
import tempfile
import re
import requests
import json
from urllib.parse import parse_qs, urlparse
import torch
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 设置缓存目录
CACHE_DIR = "model_cache"
os.makedirs(CACHE_DIR, exist_ok=True)
# 创建必要的文件夹
os.makedirs("uploads", exist_ok=True)
os.makedirs("subtitles", exist_ok=True)
os.makedirs("audio", exist_ok=True)
os.makedirs("output", exist_ok=True)
try:
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeVideoClip, concatenate_audioclips, CompositeAudioClip
logger.info("Successfully imported moviepy")
except ImportError as e:
logger.error(f"Error importing moviepy: {e}")
raise
# 延迟加载模型
model = None
def get_model():
global model
if model is None:
logger.info("Loading Whisper model...")
model = whisper.load_model("base", download_root=CACHE_DIR)
logger.info("Whisper model loaded")
return model
translator = GoogleTranslator(source='en', target='zh-CN')
# Edge TTS 支持的中文语音列表
CHINESE_VOICES = {
# 中国台湾
"zh-TW-HsiaoChenNeural": "曉臻 - 女声 (台湾国语)",
"zh-TW-YunJheNeural": "雲哲 - 男声 (台湾国语)",
"zh-TW-HsiaoYuNeural": "曉雨 - 女声 (台湾国语)",
# 中国大陆
"zh-CN-YunxiNeural": "云希 - 男声 (大陆标准普通话)",
"zh-CN-YunjianNeural": "云健 - 男声 (大陆标准普通话)",
"zh-CN-XiaoyiNeural": "晓伊 - 女声 (大陆标准普通话)",
"zh-CN-YunyangNeural": "云扬 - 男声 (大陆新闻播报)",
"zh-CN-XiaochenNeural": "晓辰 - 女声 (大陆标准普通话)",
"zh-CN-XiaohanNeural": "晓涵 - 女声 (大陆标准普通话)",
"zh-CN-XiaomengNeural": "晓梦 - 女声 (大陆标准普通话)",
"zh-CN-XiaomoNeural": "晓墨 - 女声 (大陆标准普通话)",
"zh-CN-XiaoxuanNeural": "晓萱 - 女声 (大陆标准普通话)",
"zh-CN-XiaoyanNeural": "晓颜 - 女声 (大陆标准普通话)",
"zh-CN-XiaoyouNeural": "晓悠 - 女声 (大陆标准普通话)",
# 中国香港
"zh-HK-HiuGaaiNeural": "曉薇 - 女声 (香港粤语)",
"zh-HK-HiuMaanNeural": "曉曼 - 女声 (香港粤语)",
"zh-HK-WanLungNeural": "云龍 - 男声 (香港粤语)",
}
def extract_video_id(url):
"""从YouTube URL中提取视频ID"""
if 'youtu.be' in url:
return url.split('/')[-1]
if 'youtube.com' in url:
query = urlparse(url).query
return parse_qs(query).get('v', [None])[0]
return None
def get_youtube_formats(url, cookies=None):
video_id = extract_video_id(url)
if not video_id:
raise gr.Error("无效的YouTube链接")
ydl_opts = {
'quiet': True,
'no_warnings': True,
'format': 'best',
}
if cookies:
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(cookies)
ydl_opts['cookiefile'] = f.name
try:
url = f"https://www.youtube.com/watch?v={video_id}"
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
if info is None:
raise gr.Error("无法获取视频信息,请检查链接或尝试使用cookies")
# 分别显示视频和音频格式
video_formats = []
audio_formats = []
# 格式化标题行
format_header = "{:<6} {:<10} {:<16} {:<8} {:<12} {:<10} {:<8} {:<15}".format(
"ID", "格式", "分辨率/采样率", "帧率", "编码", "比特率", "大小", "说明"
)
separator = "-" * 90
for f in info['formats']:
if not (f.get('vcodec') == 'none' and f.get('acodec') == 'none'):
# 视频格式
if f.get('vcodec') != 'none' and f.get('acodec') == 'none':
resolution = f"{f.get('width', 'N/A')}x{f.get('height', 'N/A')}"
format_info = "{:<6} {:<10} {:<16} {:<8} {:<12} {:<10} {:<8} {:<15}".format(
f['format_id'],
f.get('ext', 'N/A'),
resolution,
f"{f.get('fps', 'N/A')}fps",
f.get('vcodec', 'N/A').split('.')[0],
f"{f.get('vbr', f.get('tbr', 'N/A'))}k",
f"{f.get('filesize', 0)/1024/1024:.1f}M",
f.get('format_note', '')
)
video_formats.append(format_info)
# 音频格式
elif f.get('vcodec') == 'none' and f.get('acodec') != 'none':
format_info = "{:<6} {:<10} {:<16} {:<8} {:<12} {:<10} {:<8} {:<15}".format(
f['format_id'],
f.get('ext', 'N/A'),
f"{f.get('asr', 'N/A')}Hz",
"-",
f.get('acodec', 'N/A').split('.')[0],
f"{f.get('abr', f.get('tbr', 'N/A'))}k",
f"{f.get('filesize', 0)/1024/1024:.1f}M",
f.get('format_note', '')
)
audio_formats.append(format_info)
# 组合最终显示文本
formats_text = "【视频格式】(仅视频流)\n"
formats_text += format_header + "\n" + separator + "\n"
formats_text += "\n".join(video_formats)
formats_text += "\n\n【音频格式】(仅音频流)\n"
formats_text += format_header + "\n" + separator + "\n"
formats_text += "\n".join(audio_formats)
formats_text += "\n\n提示:\n"
formats_text += "1. 下载时需要组合视频和音频格式,用+号连接,例如:137+140\n"
formats_text += "2. 建议选择相近大小的视频和音频格式组合\n"
formats_text += "3. 一般选择 mp4+m4a 或 webm+webm 的组合\n"
formats_text += "4. 数字越大通常表示质量越好"
return formats_text
except Exception as e:
logger.error(f"获取格式列表失败: {e}")
raise gr.Error(f"获取格式列表失败: {str(e)}")
except Exception as e:
logger.error(f"Error getting formats: {e}")
raise gr.Error(f"获取格式列表失败: {str(e)}")
def download_youtube(url, format_id=None, cookies=None):
video_id = extract_video_id(url)
if not video_id:
raise gr.Error("无效的YouTube链接")
ydl_opts = {
'format': 'best',
'outtmpl': 'uploads/%(title)s.%(ext)s',
'quiet': True,
'no_warnings': True,
}
if format_id and format_id != "best":
format_ids = re.split(r'[,+]', format_id.split(':')[0])
ydl_opts['format'] = '+'.join(format_ids)
if cookies:
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(cookies)
ydl_opts['cookiefile'] = f.name
with tempfile.TemporaryDirectory() as temp_dir:
ydl_opts['outtmpl'] = os.path.join(temp_dir, '%(title)s.%(ext)s')
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=True)
if info is None:
raise gr.Error("无法获取视频信息,请检查链接或尝试使用cookies")
filename = ydl.prepare_filename(info)
final_path = os.path.join("uploads", os.path.basename(filename))
os.rename(filename, final_path)
return final_path
except Exception as e:
logger.error(f"下载视频失败: {e}")
raise gr.Error(f"下载视频失败: {str(e)}")
except Exception as e:
logger.error(f"下载视频失败: {e}")
raise gr.Error(f"下载视频失败: {str(e)}")
def generate_subtitles(video_path):
model = get_model() # 延迟加载模型
options = {
"language": "en",
"beam_size": 1, # 减小beam size加快处理
"best_of": 1, # 减小候选数量
"fp16": torch.cuda.is_available()
}
result = model.transcribe(video_path, **options)
srt_path = "subtitles/english.srt"
with open(srt_path, "w", encoding="utf-8") as f:
for i, seg in enumerate(result["segments"], 1):
start = format_timestamp(seg["start"])
end = format_timestamp(seg["end"])
text = seg["text"].strip()
f.write(f"{i}\n{start} --> {end}\n{text}\n\n")
return srt_path
def translate_subtitles(en_srt):
cn_srt = "subtitles/chinese.srt"
with open(en_srt, "r", encoding="utf-8") as f:
lines = f.readlines()
with open(cn_srt, "w", encoding="utf-8") as f:
i = 0
while i < len(lines):
if lines[i].strip().isdigit():
f.write(lines[i]) # 序号
f.write(lines[i+1]) # 时间轴
text = lines[i+2].strip()
translated = translator.translate(text)
f.write(f"{translated}\n\n")
i += 4
else:
i += 1
return cn_srt
async def generate_speech(cn_srt, voice_id, rate=1.5):
"""生成配音语音
rate: 语速倍率,范围0.5-3.0,默认1.5
"""
audio_files = []
tasks = []
# 将倍率转换为百分比字符串
rate_percent = f"{(rate - 1) * 100:+.0f}%"
with open(cn_srt, "r", encoding="utf-8") as f:
lines = f.readlines()
i = 0
while i < len(lines):
if lines[i].strip().isdigit():
text = lines[i+2].strip()
audio_path = f"audio/speech_{i}.mp3"
communicate = Communicate(text, voice_id, rate=rate_percent)
tasks.append((communicate.save(audio_path), audio_path))
i += 4
else:
i += 1
# 并行处理所有TTS任务
for task, audio_path in tasks:
await task
audio_files.append(audio_path)
return audio_files
def merge_video_audio(video_path, audio_files, subtitles, progress_callback=None, original_volume=0.1):
try:
# 加载视频
video = VideoFileClip(video_path)
# 获取实际的视频持续时间(减去一小段以避免末尾问题)
actual_duration = video.duration - 0.1 # 减去0.1秒
video = video.subclip(0, actual_duration)
# 获取原始音频并降低音量到10%
if video.audio is not None:
original_audio = video.audio.volumex(original_volume)
else:
original_audio = None
# 加载音频文件
audio_clips = []
with open(subtitles, "r", encoding="utf-8") as f:
lines = f.readlines()
i = 0
while i < len(lines):
if lines[i].strip().isdigit():
# 解析时间轴
times = lines[i+1].strip().split(" --> ")
start = parse_timestamp(times[0])
end = parse_timestamp(times[1])
# 加载对应的音频
audio = AudioFileClip(audio_files[len(audio_clips)])
# 设置音频的开始时间
audio = audio.set_start(start)
audio_clips.append(audio)
i += 4
else:
i += 1
# 合并所有音频,包括原始音频
if original_audio is not None:
audio_clips.insert(0, original_audio) # 将原始音频添加到列表开头
final_audio = CompositeAudioClip(audio_clips)
# 创建最终视频
final_video = video.set_audio(final_audio)
# 设置输出路径
output_path = os.path.join("output", "final_video.mp4")
# 导出视频
final_video.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
fps=video.fps,
audio_bitrate="192k",
bitrate="4000k",
threads=4,
preset='medium'
)
# 清理
video.close()
if original_audio is not None:
original_audio.close()
final_video.close()
for clip in audio_clips:
if clip != original_audio: # 避免重复关闭原始音频
clip.close()
return output_path
except Exception as e:
logger.error(f"合并视频音频失败: {e}")
raise Exception(f"合并视频音频失败: {str(e)}")
def format_timestamp(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = seconds % 60
return f"{hours:02d}:{minutes:02d}:{seconds:06.3f}".replace(".", ",")
def validate_youtube_url(url):
if not url:
return None
if not any(x in url for x in ["youtube.com", "youtu.be", "youtube", "youtu"]):
raise gr.Error("请输入有效的YouTube链接,例如: https://www.youtube.com/watch?v=xxxxx")
return url
def parse_timestamp(timestamp):
"""将SRT时间戳转换为秒"""
hours, minutes, seconds = timestamp.replace(",", ".").split(":")
return float(hours) * 3600 + float(minutes) * 60 + float(seconds)
def add_log(logs, message):
"""添加日志到标签"""
if not logs:
return message
return logs + "\n" + message
def generate_and_translate_subtitles(video_input, youtube_url, format_id=None, cookies=None):
"""生成并翻译字幕"""
try:
if youtube_url:
logger.info("正在下载YouTube视频...")
youtube_url = validate_youtube_url(youtube_url)
video_path = download_youtube(youtube_url,
format_id.split(':')[0] if format_id else None,
cookies if cookies else None)
else:
video_path = video_input
logger.info("正在处理上传的视频...")
logger.info("正在识别语音...")
en_srt = generate_subtitles(video_path)
logger.info("正在翻译字幕...")
cn_srt = translate_subtitles(en_srt)
logger.info("字幕生成完成!")
# 更新预览内容
en_content = load_srt_content(en_srt)
cn_content = load_srt_content(cn_srt)
all_files = get_all_files() # 获取所有文件
return en_srt, cn_srt, video_path, en_content, cn_content, all_files
except Exception as e:
logger.error(f"生成字幕失败: {e}")
raise gr.Error(str(e))
def process_video(video_input, youtube_url, format_id=None, cookies=None, cn_srt=None, voice_name=None, rate=1.5):
try:
if youtube_url:
logger.info("正在下载YouTube视频...")
youtube_url = validate_youtube_url(youtube_url)
video_path = download_youtube(youtube_url,
format_id.split(':')[0] if format_id else None,
cookies if cookies else None)
elif video_input:
video_path = video_input
logger.info("正在处理上传的视频...")
else:
raise gr.Error("请上传视频文件或提供YouTube链接")
# 如果上传了中文字幕,使用上传的字幕文件路径
if cn_srt and hasattr(cn_srt, 'name'):
cn_srt_path = cn_srt.name
en_srt = None
logger.info("使用上传的中文字幕...")
else:
logger.info("正在识别语音...")
en_srt = generate_subtitles(video_path)
logger.info("正在翻译字幕...")
cn_srt_path = translate_subtitles(en_srt)
logger.info("正在生成配音...")
voice_id = next(k for k, v in CHINESE_VOICES.items() if v == voice_name)
audio_files = asyncio.run(generate_speech(cn_srt_path, voice_id, rate))
logger.info("正在合成视频...")
output_video = merge_video_audio(video_path, audio_files, cn_srt_path)
logger.info("处理完成!")
all_files = get_all_files()
return en_srt, cn_srt_path, all_files, video_path, output_video
except Exception as e:
logger.error(f"处理视频失败: {e}")
raise gr.Error(str(e))
def load_srt_content(srt_file):
"""加载字幕文件内容"""
if not srt_file:
return ""
with open(srt_file, "r", encoding="utf-8") as f:
return f.read()
def save_srt_content(content, file_path):
"""保存字幕内容到文件"""
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
return file_path
def apply_edit(cn_content, video_path, en_srt, voice_name, original_video, rate=1.5):
"""应用字幕编辑并继续处理"""
if not cn_content:
raise gr.Error("请先编辑字幕")
# 检查视频来源
if video_path:
actual_video_path = video_path
elif original_video:
actual_video_path = original_video
else:
raise gr.Error("请先上传视频")
try:
# 保存修改后的字幕
edited_srt = "subtitles/chinese_edited.srt"
save_srt_content(cn_content, edited_srt)
logger.info("字幕修改已保存,继续处理...")
# 继续处理配音和视频合成
logger.info("正在生成配音...")
voice_id = next(k for k, v in CHINESE_VOICES.items() if v == voice_name)
audio_files = asyncio.run(generate_speech(edited_srt, voice_id, rate))
logger.info("正在合成视频...")
output_video = merge_video_audio(actual_video_path, audio_files, edited_srt)
logger.info("处理完成!")
all_files = get_all_files() # 获取所有文件
return edited_srt, all_files, output_video
except Exception as e:
logger.error(f"处理失败: {e}")
raise gr.Error(str(e))
def preview_uploaded_srt(file):
"""预览上传的字幕文件"""
if file is None:
return None
try:
return load_srt_content(file.name)
except Exception as e:
logger.error(f"预览字幕失败: {e}")
raise gr.Error(f"预览字幕失败: {str(e)}")
def get_all_files():
"""获取所有生成的文件,并添加类型标识"""
all_files = []
# 获取上传的视频文件
if os.path.exists("uploads"):
for f in os.listdir("uploads"):
file_path = os.path.join("uploads", f)
if os.path.exists(file_path): # 检查原始文件是否存在
all_files.append(file_path)
# 获取字幕文件
if os.path.exists("subtitles"):
for f in os.listdir("subtitles"):
file_path = os.path.join("subtitles", f)
if os.path.exists(file_path): # 检查原始文件是否存在
if "english" in f:
display_name = "[英文字幕] " + f
else:
display_name = "[中文字幕] " + f
all_files.append({
"name": display_name,
"path": file_path
})
# 获取音频文件
if os.path.exists("audio"):
for f in os.listdir("audio"):
file_path = os.path.join("audio", f)
if os.path.exists(file_path): # 检查原始文件是否存在
all_files.append({
"name": "[配音文件] " + f,
"path": file_path
})
# 获取输出文件
if os.path.exists("output"):
for f in os.listdir("output"):
file_path = os.path.join("output", f)
if os.path.exists(file_path): # 检查原始文件是否存在
all_files.append({
"name": "[配音视频] " + f,
"path": file_path
})
# 返回文件列表
return [f["path"] if isinstance(f, dict) else f for f in all_files]
async def generate_preview_speech(voice_id, rate=1.5):
"""生成试听音频"""
try:
preview_text = "这是一段试听音频,用于预览配音效果。"
audio_path = "audio/preview_speech.mp3"
# 将倍率转换为百分比字符串
rate_percent = f"{(rate - 1) * 100:+.0f}%"
communicate = Communicate(preview_text, voice_id, rate=rate_percent)
await communicate.save(audio_path)
return audio_path
except Exception as e:
logger.error(f"生成试听音频失败: {e}")
raise gr.Error(f"生成试听音频失败: {str(e)}")
def preview_voice(voice_name, rate):
"""试听选中的配音声音"""
try:
voice_id = next(k for k, v in CHINESE_VOICES.items() if v == voice_name)
audio_path = asyncio.run(generate_preview_speech(voice_id, rate))
return audio_path
except Exception as e:
logger.error(f"试听失败: {e}")
raise gr.Error(str(e))
# Gradio界面
with gr.Blocks(css="""
.single-line-input textarea {
height: 2.5em !important;
min-height: 2.5em !important;
max-height: 2.5em !important;
overflow-y: hidden !important;
}
""") as app:
gr.Markdown("# 视频配音助手")
with gr.Row():
# 视频输入和YouTube下载区域
with gr.Column(scale=2):
# YouTube下载区域
with gr.Row():
youtube_url = gr.Textbox(label="或输入YouTube链接")
format_btn = gr.Button("获取可用格式列表", scale=0)
with gr.Row():
format_input = gr.Textbox(
label="输入格式ID",
placeholder="例如: 137+140",
value="137+140",
scale=1
)
cookies_input = gr.Textbox(
label="Cookies(可选)",
placeholder="Netscape格式",
lines=6,
max_lines=10,
show_label=True,
container=True,
scale=2,
)
format_info = gr.Textbox(
label="可用格式列表",
interactive=False,
lines=6
)
# 配音设置区域
with gr.Row():
with gr.Column(scale=3):
voice_list = gr.Dropdown(
label="选择配音声音",
choices=list(CHINESE_VOICES.values()),
value="曉臻 - 女声 (台湾国语)",
interactive=True
)
voice_rate = gr.Slider(
label="语速调整",
minimum=0.5,
maximum=3.0,
value=1.5,
step=0.1,
interactive=True
)
with gr.Column(scale=1):
preview_btn = gr.Button("试听")
# 试听音频播放器
preview_audio = gr.Audio(
label="试听音频",
visible=True,
interactive=False
)
with gr.Row():
generate_btn = gr.Button("生成字幕并编辑", scale=1)
process_btn = gr.Button("直接开始处理", scale=1)
# 上传区域
with gr.Column(scale=1):
video_input = gr.Video(label="上传视频文件")
cn_srt_upload = gr.File(label="上传中文字幕", file_types=[".srt"])
# 字幕预览区域
with gr.Row():
with gr.Column(scale=1):
en_preview = gr.TextArea(
label="英文字幕预览",
interactive=False,
lines=10
)
with gr.Column(scale=1):
cn_preview = gr.TextArea(
label="中文字幕预览",
interactive=True,
lines=10
)
# 编辑按钮
with gr.Row():
edit_btn = gr.Button("应用字幕修改")
# 隐藏的字幕文件组件
output_en_srt = gr.File(label="英文字幕", file_types=[".srt"], visible=False)
output_cn_srt = gr.File(label="中文字幕", file_types=[".srt"], visible=False)
# 输出区域
with gr.Row():
with gr.Column(scale=1):
original_video = gr.Video(label="原始视频")
all_files_output = gr.Files(label="所有文件")
with gr.Column(scale=1):
video_output = gr.Video(label="最终视频")
generate_btn.click(
fn=generate_and_translate_subtitles,
inputs=[video_input, youtube_url, format_input, cookies_input],
outputs=[output_en_srt, output_cn_srt, original_video, en_preview, cn_preview, all_files_output]
)
process_btn.click(
fn=process_video,
inputs=[video_input, youtube_url, format_input, cookies_input, cn_srt_upload, voice_list, voice_rate],
outputs=[output_en_srt, output_cn_srt, all_files_output, original_video, video_output]
)
# 添加字幕预览更新事件
def update_preview(en_srt_file, cn_srt_file):
"""更新字幕预览"""
en_content = load_srt_content(en_srt_file) if en_srt_file else ""
cn_content = load_srt_content(cn_srt_file) if cn_srt_file else ""
return en_content, cn_content
# 字幕文件变化时更新预览
output_en_srt.change(
fn=update_preview,
inputs=[output_en_srt, output_cn_srt],
outputs=[en_preview, cn_preview]
)
output_cn_srt.change(
fn=update_preview,
inputs=[output_en_srt, output_cn_srt],
outputs=[en_preview, cn_preview]
)
# 编辑按钮点击事件
edit_btn.click(
fn=apply_edit,
inputs=[
cn_preview,
video_input,
output_en_srt,
voice_list,
original_video,
voice_rate
],
outputs=[
output_cn_srt,
all_files_output,
video_output
]
)
# 添加中文字幕上传预览功能
cn_srt_upload.change(
fn=preview_uploaded_srt,
inputs=[cn_srt_upload],
outputs=[cn_preview]
)
# 添加格式列表更新事件
format_btn.click(
fn=get_youtube_formats,
inputs=[youtube_url, cookies_input],
outputs=[format_info]
)
# 添加试听按钮点击事件
preview_btn.click(
fn=preview_voice,
inputs=[voice_list, voice_rate],
outputs=[preview_audio]
)
app.launch(server_name="0.0.0.0", server_port=7860)