|
import gradio as gr |
|
import os |
|
os.system("python -m unidic download") |
|
from pytubefix import YouTube |
|
import torch |
|
from openvoice import se_extractor |
|
from openvoice.api import ToneColorConverter |
|
import whisper |
|
from moviepy.editor import * |
|
from pydub import AudioSegment |
|
from df.enhance import enhance, init_df, load_audio, save_audio |
|
import translators as ts |
|
from melo.api import TTS |
|
from concurrent.futures import ThreadPoolExecutor |
|
import ffmpeg |
|
import nltk |
|
nltk.download('averaged_perceptron_tagger_eng') |
|
|
|
def process_upload(video_file, language_choice): |
|
if language_choice == None: |
|
return None, "Language not selected." |
|
elif video_file == None: |
|
return None, "Video not uploaded." |
|
else: |
|
return process_video(video_file, language_choice) |
|
|
|
def process_youtube(youtube_url, language_choice): |
|
if language_choice == None: |
|
return None, "Language not selected." |
|
elif youtube_url == None: |
|
return None, "YouTube URL not entered." |
|
else: |
|
yt = YouTube(youtube_url) |
|
yt.streams.filter(progressive=True, file_extension='mp4').first().download(filename="original.mp4") |
|
video_file = "original.mp4" |
|
return process_video(video_file, language_choice) |
|
|
|
def process_video(video_file, language_choice): |
|
|
|
ckpt_converter = 'checkpoints_v2/converter' |
|
device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
output_dir = 'outputs_v2' |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
tone_color_converter = ToneColorConverter(f'{ckpt_converter}/config.json', device=device) |
|
tone_color_converter.load_ckpt(f'{ckpt_converter}/checkpoint.pth') |
|
|
|
|
|
reference_video = VideoFileClip(video_file) |
|
reference_audio = os.path.join(output_dir, "reference_audio.wav") |
|
reference_video.audio.write_audiofile(reference_audio) |
|
audio = AudioSegment.from_file(reference_audio) |
|
resampled_audio = audio.set_frame_rate(48000) |
|
resampled_audio.export(reference_audio, format="wav") |
|
|
|
|
|
model, df_state, _ = init_df() |
|
audio, _ = load_audio(reference_audio, sr=df_state.sr()) |
|
enhanced = enhance(model, df_state, audio) |
|
save_audio(reference_audio, enhanced, df_state.sr()) |
|
audio_clip = AudioFileClip(reference_audio) |
|
|
|
src_path = os.path.join(output_dir, "tmp.wav") |
|
|
|
|
|
speed = 1.0 |
|
|
|
|
|
sttmodel = whisper.load_model("base") |
|
sttresult = sttmodel.transcribe(reference_audio, verbose=True) |
|
|
|
|
|
print(sttresult["text"]) |
|
print(sttresult["language"]) |
|
|
|
|
|
segments = sttresult['segments'] |
|
|
|
if sttresult["language"] == language_choice[0:2]: |
|
print("Chosen language is the same as the video's original language. Only adding subtitles.") |
|
segments = sttresult['segments'] |
|
|
|
|
|
srt_path = os.path.join(output_dir, 'subtitles.srt') |
|
with open(srt_path, 'w', encoding='utf-8') as srt_file: |
|
for i, segment in enumerate(segments): |
|
start = segment['start'] |
|
end = segment['end'] |
|
text = segment['text'] |
|
|
|
start_hours, start_minutes = divmod(int(start), 3600) |
|
start_minutes, start_seconds = divmod(start_minutes, 60) |
|
start_milliseconds = int((start * 1000) % 1000) |
|
|
|
end_hours, end_minutes = divmod(int(end), 3600) |
|
end_minutes, end_seconds = divmod(end_minutes, 60) |
|
end_milliseconds = int((end * 1000) % 1000) |
|
|
|
srt_file.write(f"{i+1}\n") |
|
srt_file.write(f"{start_hours:02}:{start_minutes:02}:{start_seconds:02},{start_milliseconds:03} --> " |
|
f"{end_hours:02}:{end_minutes:02}:{end_seconds:02},{end_milliseconds:03}\n") |
|
srt_file.write(f"{text}\n\n") |
|
|
|
|
|
final_video_with_subs_path = os.path.join(output_dir, f'final_video_with_subs.mp4') |
|
try: |
|
( |
|
ffmpeg |
|
.input(video_file) |
|
.output(final_video_with_subs_path, vf=f"subtitles={srt_path}") |
|
.run(overwrite_output=True) |
|
) |
|
except ffmpeg.Error as e: |
|
print('ffmpeg error:', e) |
|
print(e.stderr.decode('utf-8')) |
|
|
|
print(f"Final video with subtitles saved to: {final_video_with_subs_path}") |
|
return final_video_with_subs_path, "Video language and language selection are the same, audio not changed." |
|
else: |
|
|
|
language = 'EN_NEWEST' |
|
match language_choice[0:2]: |
|
case 'en': |
|
language = 'EN_NEWEST' |
|
case 'es': |
|
language = 'ES' |
|
case 'fr': |
|
language = 'FR' |
|
case 'zh': |
|
language = 'ZH' |
|
case 'ja': |
|
language = 'JP' |
|
case 'ko': |
|
language = 'KR' |
|
case _: |
|
language = 'EN_NEWEST' |
|
|
|
|
|
def translate_segment(segment): |
|
return segment["start"], segment["end"], ts.translate_text(query_text=segment["text"], translator="google", to_language=language_choice) |
|
|
|
|
|
batch_size = 2 |
|
translation_segments = [] |
|
for i in range(0, len(segments), batch_size): |
|
batch = segments[i:i + batch_size] |
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
batch_translations = list(executor.map(translate_segment, batch)) |
|
translation_segments.extend(batch_translations) |
|
|
|
|
|
model = TTS(language=language, device=device) |
|
speaker_ids = model.hps.data.spk2id |
|
|
|
def generate_segment_audio_batch(translation_batch, speaker_id): |
|
segment_files = [] |
|
for segment in translation_batch: |
|
start, end, translated_text = segment |
|
start = round(start, 2) |
|
end = min(round(end, 2), audio_clip.duration) |
|
segment_path = os.path.join(output_dir, f'segment_{start}_{end}.wav') |
|
model.tts_to_file(translated_text, speaker_id, segment_path, speed=speed) |
|
|
|
reference_speaker = AudioFileClip.subclip(audio_clip, start, end) |
|
reference_speaker.write_audiofile(f'reference_speaker_{start}_{end}.wav') |
|
try: |
|
target_se, audio_name = se_extractor.get_se(f'reference_speaker_{start}_{end}.wav', tone_color_converter, vad=False) |
|
except NotImplementedError: |
|
target_se, audio_name = se_extractor.get_se(reference_audio, tone_color_converter, vad=False) |
|
|
|
encode_message = "@MyShell" |
|
tone_color_converter.convert( |
|
audio_src_path=segment_path, |
|
src_se=source_se, |
|
tgt_se=target_se, |
|
output_path=segment_path, |
|
message=encode_message |
|
) |
|
|
|
segment_files.append((segment_path, start, end, translated_text)) |
|
return segment_files |
|
|
|
for speaker_key in speaker_ids.keys(): |
|
speaker_id = speaker_ids[speaker_key] |
|
speaker_key = speaker_key.lower().replace('_', '-') |
|
|
|
source_se = torch.load(f'checkpoints_v2/base_speakers/ses/{speaker_key}.pth', map_location=device) |
|
|
|
segment_files = [] |
|
subtitle_entries = [] |
|
for i in range(0, len(translation_segments), batch_size): |
|
batch = translation_segments[i:i + batch_size] |
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
batch_segment_files = list(executor.map(generate_segment_audio_batch, [batch] * len(speaker_ids), [speaker_id] * len(batch))) |
|
batch_segment_files = [item for sublist in batch_segment_files for item in sublist] |
|
|
|
for segment_file, start, end, translated_text in batch_segment_files: |
|
segment_files.append((segment_file, start, end, translated_text)) |
|
|
|
|
|
combined_audio = AudioSegment.empty() |
|
video_segments = [] |
|
previous_end = 0 |
|
subtitle_counter = 1 |
|
for segment_file, start, end, translated_text in segment_files: |
|
segment_audio = AudioSegment.from_file(segment_file) |
|
combined_audio += segment_audio |
|
|
|
|
|
audio_duration = len(segment_audio) / 1000.0 |
|
|
|
|
|
subtitle_entries.append((subtitle_counter, previous_end, previous_end + audio_duration, translated_text)) |
|
subtitle_counter += 1 |
|
|
|
|
|
video_segment = ( |
|
ffmpeg |
|
.input(reference_video.filename, ss=start, to=end) |
|
.filter('setpts', f'PTS / {(end - start) / audio_duration}') |
|
) |
|
video_segments.append((video_segment, ffmpeg.input(segment_file))) |
|
previous_end += audio_duration |
|
|
|
save_path = os.path.join(output_dir, f'output_v2_{speaker_key}.wav') |
|
combined_audio.export(save_path, format="wav") |
|
|
|
|
|
video_and_audio_files = [item for sublist in video_segments for item in sublist] |
|
joined = ( |
|
ffmpeg |
|
.concat(*video_and_audio_files, v=1, a=1) |
|
.node |
|
) |
|
|
|
final_video_path = os.path.join(output_dir, f'final_video_{speaker_key}.mp4') |
|
try: |
|
( |
|
ffmpeg |
|
.output(joined[0], joined[1], final_video_path, vcodec='libx264', acodec='aac') |
|
.run(overwrite_output=True) |
|
) |
|
except ffmpeg.Error as e: |
|
print('ffmpeg error:', e) |
|
print(e.stderr.decode('utf-8')) |
|
|
|
print(f"Final video without subtitles saved to: {final_video_path}") |
|
|
|
|
|
srt_path = os.path.join(output_dir, 'subtitles.srt') |
|
with open(srt_path, 'w', encoding='utf-8') as srt_file: |
|
for entry in subtitle_entries: |
|
index, start, end, text = entry |
|
start_hours, start_minutes = divmod(int(start), 3600) |
|
start_minutes, start_seconds = divmod(start_minutes, 60) |
|
start_milliseconds = int((start * 1000) % 1000) |
|
|
|
end_hours, end_minutes = divmod(int(end), 3600) |
|
end_minutes, end_seconds = divmod(end_minutes, 60) |
|
end_milliseconds = int((end * 1000) % 1000) |
|
|
|
srt_file.write(f"{index}\n") |
|
srt_file.write(f"{start_hours:02}:{start_minutes:02}:{start_seconds:02},{start_milliseconds:03} --> " |
|
f"{end_hours:02}:{end_minutes:02}:{end_seconds:02},{end_milliseconds:03}\n") |
|
srt_file.write(f"{text}\n\n") |
|
|
|
|
|
final_video_with_subs_path = os.path.join(output_dir, f'final_video_with_subs_{speaker_key}.mp4') |
|
try: |
|
( |
|
ffmpeg |
|
.input(final_video_path) |
|
.output(final_video_with_subs_path, vf=f"subtitles={srt_path}") |
|
.run(overwrite_output=True) |
|
) |
|
except ffmpeg.Error as e: |
|
print('ffmpeg error:', e) |
|
print(e.stderr.decode('utf-8')) |
|
|
|
print(f"Final video with subtitles saved to: {final_video_with_subs_path}") |
|
|
|
return final_video_with_subs_path, "Video successfully translated." |
|
|
|
|
|
language_choices = ts.get_languages("google")["en"] |
|
language_choices.remove("auto") |
|
|
|
uploaded_translator = gr.Interface( |
|
fn=process_upload, |
|
inputs=[ |
|
gr.Video(label="Upload a video from your device storage", sources=['upload']), |
|
gr.Dropdown(choices=language_choices, label="Choose Language for Translation (Expressed in ISO 639-1 code)") |
|
], |
|
outputs=[ |
|
gr.Video(label="Translated Video", format='mp4'), |
|
gr.Textbox(show_label=False) |
|
], |
|
title="Video Translation and Voice Cloning", |
|
description="Upload a video, choose a language to translate the audio, and download the processed video with translated audio." |
|
) |
|
|
|
youtube_translator = gr.Interface( |
|
fn=process_youtube, |
|
inputs=[ |
|
gr.Textbox(label="Enter a YouTube video URL"), |
|
gr.Dropdown(choices=language_choices, label="Choose Language for Translation (Expressed in ISO 639-1 code)") |
|
], |
|
outputs=[ |
|
gr.Video(label="Translated Video", format='mp4'), |
|
gr.Textbox(show_label=False) |
|
], |
|
title="Video Translation and Voice Cloning", |
|
description="Upload a video, choose a language to translate the audio, and download the processed video with translated audio." |
|
) |
|
|
|
gr.TabbedInterface([uploaded_translator, youtube_translator], ["Upload video from device", "YouTube URL"]).launch() |
|
|