import gradio as gr import whisper import yt_dlp import os from deep_translator import GoogleTranslator from edge_tts import Communicate import asyncio import logging import tempfile import re import requests import json from urllib.parse import parse_qs, urlparse import torch # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 设置缓存目录 CACHE_DIR = "model_cache" os.makedirs(CACHE_DIR, exist_ok=True) # 创建必要的文件夹 os.makedirs("uploads", exist_ok=True) os.makedirs("subtitles", exist_ok=True) os.makedirs("audio", exist_ok=True) os.makedirs("output", exist_ok=True) try: from moviepy.editor import VideoFileClip, AudioFileClip, CompositeVideoClip, concatenate_audioclips, CompositeAudioClip logger.info("Successfully imported moviepy") except ImportError as e: logger.error(f"Error importing moviepy: {e}") raise # 延迟加载模型 model = None def get_model(): global model if model is None: logger.info("Loading Whisper model...") model = whisper.load_model("base", download_root=CACHE_DIR) logger.info("Whisper model loaded") return model translator = GoogleTranslator(source='en', target='zh-CN') # Edge TTS 支持的中文语音列表 CHINESE_VOICES = { # 中国台湾 "zh-TW-HsiaoChenNeural": "曉臻 - 女声 (台湾国语)", "zh-TW-YunJheNeural": "雲哲 - 男声 (台湾国语)", "zh-TW-HsiaoYuNeural": "曉雨 - 女声 (台湾国语)", # 中国大陆 "zh-CN-YunxiNeural": "云希 - 男声 (大陆标准普通话)", "zh-CN-YunjianNeural": "云健 - 男声 (大陆标准普通话)", "zh-CN-XiaoyiNeural": "晓伊 - 女声 (大陆标准普通话)", "zh-CN-YunyangNeural": "云扬 - 男声 (大陆新闻播报)", "zh-CN-XiaochenNeural": "晓辰 - 女声 (大陆标准普通话)", "zh-CN-XiaohanNeural": "晓涵 - 女声 (大陆标准普通话)", "zh-CN-XiaomengNeural": "晓梦 - 女声 (大陆标准普通话)", "zh-CN-XiaomoNeural": "晓墨 - 女声 (大陆标准普通话)", "zh-CN-XiaoxuanNeural": "晓萱 - 女声 (大陆标准普通话)", "zh-CN-XiaoyanNeural": "晓颜 - 女声 (大陆标准普通话)", "zh-CN-XiaoyouNeural": "晓悠 - 女声 (大陆标准普通话)", # 中国香港 "zh-HK-HiuGaaiNeural": "曉薇 - 女声 (香港粤语)", "zh-HK-HiuMaanNeural": "曉曼 - 女声 (香港粤语)", "zh-HK-WanLungNeural": "云龍 - 男声 (香港粤语)", } def extract_video_id(url): """从YouTube URL中提取视频ID""" if 'youtu.be' in url: return url.split('/')[-1] if 'youtube.com' in url: query = urlparse(url).query return parse_qs(query).get('v', [None])[0] return None def get_youtube_formats(url, cookies=None): video_id = extract_video_id(url) if not video_id: raise gr.Error("无效的YouTube链接") ydl_opts = { 'quiet': True, 'no_warnings': True, 'format': 'best', } if cookies: with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: f.write(cookies) ydl_opts['cookiefile'] = f.name try: url = f"https://www.youtube.com/watch?v={video_id}" with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(url, download=False) if info is None: raise gr.Error("无法获取视频信息,请检查链接或尝试使用cookies") # 分别显示视频和音频格式 video_formats = [] audio_formats = [] # 格式化标题行 format_header = "{:<6} {:<10} {:<16} {:<8} {:<12} {:<10} {:<8} {:<15}".format( "ID", "格式", "分辨率/采样率", "帧率", "编码", "比特率", "大小", "说明" ) separator = "-" * 90 for f in info['formats']: if not (f.get('vcodec') == 'none' and f.get('acodec') == 'none'): # 视频格式 if f.get('vcodec') != 'none' and f.get('acodec') == 'none': resolution = f"{f.get('width', 'N/A')}x{f.get('height', 'N/A')}" format_info = "{:<6} {:<10} {:<16} {:<8} {:<12} {:<10} {:<8} {:<15}".format( f['format_id'], f.get('ext', 'N/A'), resolution, f"{f.get('fps', 'N/A')}fps", f.get('vcodec', 'N/A').split('.')[0], f"{f.get('vbr', f.get('tbr', 'N/A'))}k", f"{f.get('filesize', 0)/1024/1024:.1f}M", f.get('format_note', '') ) video_formats.append(format_info) # 音频格式 elif f.get('vcodec') == 'none' and f.get('acodec') != 'none': format_info = "{:<6} {:<10} {:<16} {:<8} {:<12} {:<10} {:<8} {:<15}".format( f['format_id'], f.get('ext', 'N/A'), f"{f.get('asr', 'N/A')}Hz", "-", f.get('acodec', 'N/A').split('.')[0], f"{f.get('abr', f.get('tbr', 'N/A'))}k", f"{f.get('filesize', 0)/1024/1024:.1f}M", f.get('format_note', '') ) audio_formats.append(format_info) # 组合最终显示文本 formats_text = "【视频格式】(仅视频流)\n" formats_text += format_header + "\n" + separator + "\n" formats_text += "\n".join(video_formats) formats_text += "\n\n【音频格式】(仅音频流)\n" formats_text += format_header + "\n" + separator + "\n" formats_text += "\n".join(audio_formats) formats_text += "\n\n提示:\n" formats_text += "1. 下载时需要组合视频和音频格式,用+号连接,例如:137+140\n" formats_text += "2. 建议选择相近大小的视频和音频格式组合\n" formats_text += "3. 一般选择 mp4+m4a 或 webm+webm 的组合\n" formats_text += "4. 数字越大通常表示质量越好" return formats_text except Exception as e: logger.error(f"获取格式列表失败: {e}") raise gr.Error(f"获取格式列表失败: {str(e)}") except Exception as e: logger.error(f"Error getting formats: {e}") raise gr.Error(f"获取格式列表失败: {str(e)}") def download_youtube(url, format_id=None, cookies=None): video_id = extract_video_id(url) if not video_id: raise gr.Error("无效的YouTube链接") ydl_opts = { 'format': 'best', 'outtmpl': 'uploads/%(title)s.%(ext)s', 'quiet': True, 'no_warnings': True, } if format_id and format_id != "best": format_ids = re.split(r'[,+]', format_id.split(':')[0]) ydl_opts['format'] = '+'.join(format_ids) if cookies: with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: f.write(cookies) ydl_opts['cookiefile'] = f.name with tempfile.TemporaryDirectory() as temp_dir: ydl_opts['outtmpl'] = os.path.join(temp_dir, '%(title)s.%(ext)s') try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(url, download=True) if info is None: raise gr.Error("无法获取视频信息,请检查链接或尝试使用cookies") filename = ydl.prepare_filename(info) final_path = os.path.join("uploads", os.path.basename(filename)) os.rename(filename, final_path) return final_path except Exception as e: logger.error(f"下载视频失败: {e}") raise gr.Error(f"下载视频失败: {str(e)}") except Exception as e: logger.error(f"下载视频失败: {e}") raise gr.Error(f"下载视频失败: {str(e)}") def generate_subtitles(video_path): model = get_model() # 延迟加载模型 options = { "language": "en", "beam_size": 1, # 减小beam size加快处理 "best_of": 1, # 减小候选数量 "fp16": torch.cuda.is_available() } result = model.transcribe(video_path, **options) srt_path = "subtitles/english.srt" with open(srt_path, "w", encoding="utf-8") as f: for i, seg in enumerate(result["segments"], 1): start = format_timestamp(seg["start"]) end = format_timestamp(seg["end"]) text = seg["text"].strip() f.write(f"{i}\n{start} --> {end}\n{text}\n\n") return srt_path def translate_subtitles(en_srt): cn_srt = "subtitles/chinese.srt" with open(en_srt, "r", encoding="utf-8") as f: lines = f.readlines() with open(cn_srt, "w", encoding="utf-8") as f: i = 0 while i < len(lines): if lines[i].strip().isdigit(): f.write(lines[i]) # 序号 f.write(lines[i+1]) # 时间轴 text = lines[i+2].strip() translated = translator.translate(text) f.write(f"{translated}\n\n") i += 4 else: i += 1 return cn_srt async def generate_speech(cn_srt, voice_id, rate=1.5): """生成配音语音 rate: 语速倍率,范围0.5-3.0,默认1.5 """ audio_files = [] tasks = [] # 将倍率转换为百分比字符串 rate_percent = f"{(rate - 1) * 100:+.0f}%" with open(cn_srt, "r", encoding="utf-8") as f: lines = f.readlines() i = 0 while i < len(lines): if lines[i].strip().isdigit(): text = lines[i+2].strip() audio_path = f"audio/speech_{i}.mp3" communicate = Communicate(text, voice_id, rate=rate_percent) tasks.append((communicate.save(audio_path), audio_path)) i += 4 else: i += 1 # 并行处理所有TTS任务 for task, audio_path in tasks: await task audio_files.append(audio_path) return audio_files def merge_video_audio(video_path, audio_files, subtitles, progress_callback=None, original_volume=0.1): try: # 加载视频 video = VideoFileClip(video_path) # 获取实际的视频持续时间(减去一小段以避免末尾问题) actual_duration = video.duration - 0.1 # 减去0.1秒 video = video.subclip(0, actual_duration) # 获取原始音频并降低音量到10% if video.audio is not None: original_audio = video.audio.volumex(original_volume) else: original_audio = None # 加载音频文件 audio_clips = [] with open(subtitles, "r", encoding="utf-8") as f: lines = f.readlines() i = 0 while i < len(lines): if lines[i].strip().isdigit(): # 解析时间轴 times = lines[i+1].strip().split(" --> ") start = parse_timestamp(times[0]) end = parse_timestamp(times[1]) # 加载对应的音频 audio = AudioFileClip(audio_files[len(audio_clips)]) # 设置音频的开始时间 audio = audio.set_start(start) audio_clips.append(audio) i += 4 else: i += 1 # 合并所有音频,包括原始音频 if original_audio is not None: audio_clips.insert(0, original_audio) # 将原始音频添加到列表开头 final_audio = CompositeAudioClip(audio_clips) # 创建最终视频 final_video = video.set_audio(final_audio) # 设置输出路径 output_path = os.path.join("output", "final_video.mp4") # 导出视频 final_video.write_videofile( output_path, codec='libx264', audio_codec='aac', fps=video.fps, audio_bitrate="192k", bitrate="4000k", threads=4, preset='medium' ) # 清理 video.close() if original_audio is not None: original_audio.close() final_video.close() for clip in audio_clips: if clip != original_audio: # 避免重复关闭原始音频 clip.close() return output_path except Exception as e: logger.error(f"合并视频音频失败: {e}") raise Exception(f"合并视频音频失败: {str(e)}") def format_timestamp(seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = seconds % 60 return f"{hours:02d}:{minutes:02d}:{seconds:06.3f}".replace(".", ",") def validate_youtube_url(url): if not url: return None if not any(x in url for x in ["youtube.com", "youtu.be", "youtube", "youtu"]): raise gr.Error("请输入有效的YouTube链接,例如: https://www.youtube.com/watch?v=xxxxx") return url def parse_timestamp(timestamp): """将SRT时间戳转换为秒""" hours, minutes, seconds = timestamp.replace(",", ".").split(":") return float(hours) * 3600 + float(minutes) * 60 + float(seconds) def add_log(logs, message): """添加日志到标签""" if not logs: return message return logs + "\n" + message def generate_and_translate_subtitles(video_input, youtube_url, format_id=None, cookies=None): """生成并翻译字幕""" try: if youtube_url: logger.info("正在下载YouTube视频...") youtube_url = validate_youtube_url(youtube_url) video_path = download_youtube(youtube_url, format_id.split(':')[0] if format_id else None, cookies if cookies else None) else: video_path = video_input logger.info("正在处理上传的视频...") logger.info("正在识别语音...") en_srt = generate_subtitles(video_path) logger.info("正在翻译字幕...") cn_srt = translate_subtitles(en_srt) logger.info("字幕生成完成!") # 更新预览内容 en_content = load_srt_content(en_srt) cn_content = load_srt_content(cn_srt) all_files = get_all_files() # 获取所有文件 return en_srt, cn_srt, video_path, en_content, cn_content, all_files except Exception as e: logger.error(f"生成字幕失败: {e}") raise gr.Error(str(e)) def process_video(video_input, youtube_url, format_id=None, cookies=None, cn_srt=None, voice_name=None, rate=1.5): try: if youtube_url: logger.info("正在下载YouTube视频...") youtube_url = validate_youtube_url(youtube_url) video_path = download_youtube(youtube_url, format_id.split(':')[0] if format_id else None, cookies if cookies else None) elif video_input: video_path = video_input logger.info("正在处理上传的视频...") else: raise gr.Error("请上传视频文件或提供YouTube链接") # 如果上传了中文字幕,使用上传的字幕文件路径 if cn_srt and hasattr(cn_srt, 'name'): cn_srt_path = cn_srt.name en_srt = None logger.info("使用上传的中文字幕...") else: logger.info("正在识别语音...") en_srt = generate_subtitles(video_path) logger.info("正在翻译字幕...") cn_srt_path = translate_subtitles(en_srt) logger.info("正在生成配音...") voice_id = next(k for k, v in CHINESE_VOICES.items() if v == voice_name) audio_files = asyncio.run(generate_speech(cn_srt_path, voice_id, rate)) logger.info("正在合成视频...") output_video = merge_video_audio(video_path, audio_files, cn_srt_path) logger.info("处理完成!") all_files = get_all_files() return en_srt, cn_srt_path, all_files, video_path, output_video except Exception as e: logger.error(f"处理视频失败: {e}") raise gr.Error(str(e)) def load_srt_content(srt_file): """加载字幕文件内容""" if not srt_file: return "" with open(srt_file, "r", encoding="utf-8") as f: return f.read() def save_srt_content(content, file_path): """保存字幕内容到文件""" with open(file_path, "w", encoding="utf-8") as f: f.write(content) return file_path def apply_edit(cn_content, video_path, en_srt, voice_name, original_video, rate=1.5): """应用字幕编辑并继续处理""" if not cn_content: raise gr.Error("请先编辑字幕") # 检查视频来源 if video_path: actual_video_path = video_path elif original_video: actual_video_path = original_video else: raise gr.Error("请先上传视频") try: # 保存修改后的字幕 edited_srt = "subtitles/chinese_edited.srt" save_srt_content(cn_content, edited_srt) logger.info("字幕修改已保存,继续处理...") # 继续处理配音和视频合成 logger.info("正在生成配音...") voice_id = next(k for k, v in CHINESE_VOICES.items() if v == voice_name) audio_files = asyncio.run(generate_speech(edited_srt, voice_id, rate)) logger.info("正在合成视频...") output_video = merge_video_audio(actual_video_path, audio_files, edited_srt) logger.info("处理完成!") all_files = get_all_files() # 获取所有文件 return edited_srt, all_files, output_video except Exception as e: logger.error(f"处理失败: {e}") raise gr.Error(str(e)) def preview_uploaded_srt(file): """预览上传的字幕文件""" if file is None: return None try: return load_srt_content(file.name) except Exception as e: logger.error(f"预览字幕失败: {e}") raise gr.Error(f"预览字幕失败: {str(e)}") def get_all_files(): """获取所有生成的文件,并添加类型标识""" all_files = [] # 获取上传的视频文件 if os.path.exists("uploads"): for f in os.listdir("uploads"): file_path = os.path.join("uploads", f) if os.path.exists(file_path): # 检查原始文件是否存在 all_files.append(file_path) # 获取字幕文件 if os.path.exists("subtitles"): for f in os.listdir("subtitles"): file_path = os.path.join("subtitles", f) if os.path.exists(file_path): # 检查原始文件是否存在 if "english" in f: display_name = "[英文字幕] " + f else: display_name = "[中文字幕] " + f all_files.append({ "name": display_name, "path": file_path }) # 获取音频文件 if os.path.exists("audio"): for f in os.listdir("audio"): file_path = os.path.join("audio", f) if os.path.exists(file_path): # 检查原始文件是否存在 all_files.append({ "name": "[配音文件] " + f, "path": file_path }) # 获取输出文件 if os.path.exists("output"): for f in os.listdir("output"): file_path = os.path.join("output", f) if os.path.exists(file_path): # 检查原始文件是否存在 all_files.append({ "name": "[配音视频] " + f, "path": file_path }) # 返回文件列表 return [f["path"] if isinstance(f, dict) else f for f in all_files] async def generate_preview_speech(voice_id, rate=1.5): """生成试听音频""" try: preview_text = "这是一段试听音频,用于预览配音效果。" audio_path = "audio/preview_speech.mp3" # 将倍率转换为百分比字符串 rate_percent = f"{(rate - 1) * 100:+.0f}%" communicate = Communicate(preview_text, voice_id, rate=rate_percent) await communicate.save(audio_path) return audio_path except Exception as e: logger.error(f"生成试听音频失败: {e}") raise gr.Error(f"生成试听音频失败: {str(e)}") def preview_voice(voice_name, rate): """试听选中的配音声音""" try: voice_id = next(k for k, v in CHINESE_VOICES.items() if v == voice_name) audio_path = asyncio.run(generate_preview_speech(voice_id, rate)) return audio_path except Exception as e: logger.error(f"试听失败: {e}") raise gr.Error(str(e)) # Gradio界面 with gr.Blocks(css=""" .single-line-input textarea { height: 2.5em !important; min-height: 2.5em !important; max-height: 2.5em !important; overflow-y: hidden !important; } """) as app: gr.Markdown("# 视频配音助手") with gr.Row(): # 视频输入和YouTube下载区域 with gr.Column(scale=2): # YouTube下载区域 with gr.Row(): youtube_url = gr.Textbox(label="或输入YouTube链接") format_btn = gr.Button("获取可用格式列表", scale=0) with gr.Row(): format_input = gr.Textbox( label="输入格式ID", placeholder="例如: 137+140", value="137+140", scale=1 ) cookies_input = gr.Textbox( label="Cookies(可选)", placeholder="Netscape格式", lines=6, max_lines=10, show_label=True, container=True, scale=2, ) format_info = gr.Textbox( label="可用格式列表", interactive=False, lines=6 ) # 配音设置区域 with gr.Row(): with gr.Column(scale=3): voice_list = gr.Dropdown( label="选择配音声音", choices=list(CHINESE_VOICES.values()), value="曉臻 - 女声 (台湾国语)", interactive=True ) voice_rate = gr.Slider( label="语速调整", minimum=0.5, maximum=3.0, value=1.5, step=0.1, interactive=True ) with gr.Column(scale=1): preview_btn = gr.Button("试听") # 试听音频播放器 preview_audio = gr.Audio( label="试听音频", visible=True, interactive=False ) with gr.Row(): generate_btn = gr.Button("生成字幕并编辑", scale=1) process_btn = gr.Button("直接开始处理", scale=1) # 上传区域 with gr.Column(scale=1): video_input = gr.Video(label="上传视频文件") cn_srt_upload = gr.File(label="上传中文字幕", file_types=[".srt"]) # 字幕预览区域 with gr.Row(): with gr.Column(scale=1): en_preview = gr.TextArea( label="英文字幕预览", interactive=False, lines=10 ) with gr.Column(scale=1): cn_preview = gr.TextArea( label="中文字幕预览", interactive=True, lines=10 ) # 编辑按钮 with gr.Row(): edit_btn = gr.Button("应用字幕修改") # 隐藏的字幕文件组件 output_en_srt = gr.File(label="英文字幕", file_types=[".srt"], visible=False) output_cn_srt = gr.File(label="中文字幕", file_types=[".srt"], visible=False) # 输出区域 with gr.Row(): with gr.Column(scale=1): original_video = gr.Video(label="原始视频") all_files_output = gr.Files(label="所有文件") with gr.Column(scale=1): video_output = gr.Video(label="最终视频") generate_btn.click( fn=generate_and_translate_subtitles, inputs=[video_input, youtube_url, format_input, cookies_input], outputs=[output_en_srt, output_cn_srt, original_video, en_preview, cn_preview, all_files_output] ) process_btn.click( fn=process_video, inputs=[video_input, youtube_url, format_input, cookies_input, cn_srt_upload, voice_list, voice_rate], outputs=[output_en_srt, output_cn_srt, all_files_output, original_video, video_output] ) # 添加字幕预览更新事件 def update_preview(en_srt_file, cn_srt_file): """更新字幕预览""" en_content = load_srt_content(en_srt_file) if en_srt_file else "" cn_content = load_srt_content(cn_srt_file) if cn_srt_file else "" return en_content, cn_content # 字幕文件变化时更新预览 output_en_srt.change( fn=update_preview, inputs=[output_en_srt, output_cn_srt], outputs=[en_preview, cn_preview] ) output_cn_srt.change( fn=update_preview, inputs=[output_en_srt, output_cn_srt], outputs=[en_preview, cn_preview] ) # 编辑按钮点击事件 edit_btn.click( fn=apply_edit, inputs=[ cn_preview, video_input, output_en_srt, voice_list, original_video, voice_rate ], outputs=[ output_cn_srt, all_files_output, video_output ] ) # 添加中文字幕上传预览功能 cn_srt_upload.change( fn=preview_uploaded_srt, inputs=[cn_srt_upload], outputs=[cn_preview] ) # 添加格式列表更新事件 format_btn.click( fn=get_youtube_formats, inputs=[youtube_url, cookies_input], outputs=[format_info] ) # 添加试听按钮点击事件 preview_btn.click( fn=preview_voice, inputs=[voice_list, voice_rate], outputs=[preview_audio] ) app.launch(server_name="0.0.0.0", server_port=7860)