import os import uuid import asyncio import subprocess import json from zipfile import ZipFile import gradio as gr import ffmpeg import cv2 import edge_tts from googletrans import Translator from huggingface_hub import HfApi import moviepy.editor as mp import spaces # Constants and initialization HF_TOKEN = os.environ.get("HF_TOKEN") REPO_ID = "artificialguybr/video-dubbing" MAX_VIDEO_DURATION = 60 # seconds api = HfApi(token=HF_TOKEN) # Extract and set permissions for ffmpeg ZipFile("ffmpeg.zip").extractall() os.chmod('ffmpeg', os.stat('ffmpeg').st_mode | os.stat.S_IEXEC) print("Starting the program...") def generate_unique_filename(extension): return f"{uuid.uuid4()}{extension}" def cleanup_files(*files): for file in files: if file and os.path.exists(file): os.remove(file) print(f"Removed file: {file}") def check_for_faces(video_path): face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') cap = cv2.VideoCapture(video_path) while True: ret, frame = cap.read() if not ret: break gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if face_cascade.detectMultiScale(gray, 1.1, 4): return True return False @spaces.GPU(duration=90) def transcribe_audio(file_path): print(f"Starting transcription of file: {file_path}") temp_audio = None if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')): print("Video file detected. Extracting audio...") try: video = mp.VideoFileClip(file_path) temp_audio = generate_unique_filename(".wav") video.audio.write_audiofile(temp_audio) file_path = temp_audio except Exception as e: print(f"Error extracting audio from video: {e}") raise output_file = generate_unique_filename(".json") command = [ "insanely-fast-whisper", "--file-name", file_path, "--device-id", "0", "--model-name", "openai/whisper-large-v3", "--task", "transcribe", "--timestamp", "chunk", "--transcript-path", output_file ] try: result = subprocess.run(command, check=True, capture_output=True, text=True) print(f"Transcription output: {result.stdout}") except subprocess.CalledProcessError as e: print(f"Error running insanely-fast-whisper: {e}") raise try: with open(output_file, "r") as f: transcription = json.load(f) except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") raise result = transcription.get("text", " ".join([chunk["text"] for chunk in transcription.get("chunks", [])])) cleanup_files(output_file, temp_audio) return result async def text_to_speech(text, voice, output_file): communicate = edge_tts.Communicate(text, voice) await communicate.save(output_file) @spaces.GPU def process_video(radio, video, target_language, has_closeup_face): try: if target_language is None: raise ValueError("Please select a Target Language for Dubbing.") run_uuid = uuid.uuid4().hex[:6] output_filename = f"{run_uuid}_resized_video.mp4" ffmpeg.input(video).output(output_filename, vf='scale=-2:720').run() video_path = output_filename if not os.path.exists(video_path): raise FileNotFoundError(f"Error: {video_path} does not exist.") video_info = ffmpeg.probe(video_path) video_duration = float(video_info['streams'][0]['duration']) if video_duration > MAX_VIDEO_DURATION: cleanup_files(video_path) raise ValueError(f"Video duration exceeds {MAX_VIDEO_DURATION} seconds. Please upload a shorter video.") ffmpeg.input(video_path).output(f"{run_uuid}_output_audio.wav", acodec='pcm_s24le', ar=48000, map='a').run() subprocess.run(f"ffmpeg -y -i {run_uuid}_output_audio.wav -af lowpass=3000,highpass=100 {run_uuid}_output_audio_final.wav", shell=True, check=True) whisper_text = transcribe_audio(f"{run_uuid}_output_audio_final.wav") print(f"Transcription successful: {whisper_text}") language_mapping = { 'English': ('en', 'en-US-EricNeural'), 'Spanish': ('es', 'es-ES-AlvaroNeural'), 'French': ('fr', 'fr-FR-HenriNeural'), 'German': ('de', 'de-DE-ConradNeural'), 'Italian': ('it', 'it-IT-DiegoNeural'), 'Portuguese': ('pt', 'pt-PT-DuarteNeural'), 'Polish': ('pl', 'pl-PL-MarekNeural'), 'Turkish': ('tr', 'tr-TR-AhmetNeural'), 'Russian': ('ru', 'ru-RU-DmitryNeural'), 'Dutch': ('nl', 'nl-NL-MaartenNeural'), 'Czech': ('cs', 'cs-CZ-AntoninNeural'), 'Arabic': ('ar', 'ar-SA-HamedNeural'), 'Chinese (Simplified)': ('zh-CN', 'zh-CN-YunxiNeural'), 'Japanese': ('ja', 'ja-JP-KeitaNeural'), 'Korean': ('ko', 'ko-KR-InJoonNeural'), 'Hindi': ('hi', 'hi-IN-MadhurNeural'), 'Swedish': ('sv', 'sv-SE-MattiasNeural'), 'Danish': ('da', 'da-DK-JeppeNeural'), 'Finnish': ('fi', 'fi-FI-HarriNeural'), 'Greek': ('el', 'el-GR-NestorasNeural') } target_language_code, voice = language_mapping[target_language] translator = Translator() translated_text = translator.translate(whisper_text, dest=target_language_code).text print(translated_text) asyncio.run(text_to_speech(translated_text, voice, f"{run_uuid}_output_synth.wav")) if has_closeup_face or check_for_faces(video_path): try: subprocess.run(f"python Wav2Lip/inference.py --checkpoint_path 'Wav2Lip/checkpoints/wav2lip_gan.pth' --face '{video_path}' --audio '{run_uuid}_output_synth.wav' --pads 0 15 0 0 --resize_factor 1 --nosmooth --outfile '{run_uuid}_output_video.mp4'", shell=True, check=True) except subprocess.CalledProcessError: gr.Warning("Wav2lip didn't detect a face. Please try again with the option disabled.") subprocess.run(f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4", shell=True) else: subprocess.run(f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4", shell=True) output_video_path = f"{run_uuid}_output_video.mp4" if not os.path.exists(output_video_path): raise FileNotFoundError(f"Error: {output_video_path} was not generated.") cleanup_files( f"{run_uuid}_resized_video.mp4", f"{run_uuid}_output_audio.wav", f"{run_uuid}_output_audio_final.wav", f"{run_uuid}_output_synth.wav" ) return output_video_path, "" except Exception as e: print(f"Error in process_video: {str(e)}") return None, f"Error: {str(e)}" def swap(radio): return gr.update(source="upload" if radio == "Upload" else "webcam") # Gradio interface setup video = gr.Video() radio = gr.Radio(["Upload", "Record"], value="Upload", show_label=False) iface = gr.Interface( fn=process_video, inputs=[ radio, video, gr.Dropdown(choices=list(language_mapping.keys()), label="Target Language for Dubbing", value="Spanish"), gr.Checkbox(label="Video has a close-up face. Use Wav2lip.", value=False, info="Say if video have close-up face. For Wav2lip. Will not work if checked wrongly.") ], outputs=[ gr.Video(label="Processed Video"), gr.Textbox(label="Error Message") ], live=False, title="AI Video Dubbing", description="""This tool was developed by [@artificialguybr](https://twitter.com/artificialguybr) using entirely open-source tools. Special thanks to Hugging Face for the GPU support. Thanks [@yeswondwer](https://twitter.com/@yeswondwerr) for original code. Test the [Video Transcription and Translate](https://huggingface.co/spaces/artificialguybr/VIDEO-TRANSLATION-TRANSCRIPTION) space!""", allow_flagging=False ) with gr.Blocks() as demo: iface.render() radio.change(swap, inputs=[radio], outputs=video) gr.Markdown(""" **Note:** - Video limit is 1 minute. It will dubbing all people using just one voice. - Generation may take up to 5 minutes. - The tool uses open-source models for all models. It's an alpha version. - Quality can be improved but would require more processing time per video. For scalability and hardware limitations, speed was chosen, not just quality. - If you need more than 1 minute, duplicate the Space and change the limit on app.py. - If you incorrectly mark the 'Video has a close-up face' checkbox, the dubbing may not work as expected. """) print("Launching Gradio interface...") demo.queue() demo.launch()