|
import os |
|
import re |
|
import sentencepiece |
|
from pytube import YouTube |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
from transformers import MarianMTModel, MarianTokenizer |
|
import torch |
|
from TTS.api import TTS |
|
from moviepy.editor import VideoFileClip |
|
from pydub import AudioSegment |
|
import subprocess |
|
from moviepy.editor import * |
|
|
|
def tov(input, output, text): |
|
video = VideoFileClip(input) |
|
txt_clip = TextClip(text, fontsize=24, color='white').set_position('center').set_duration(video.duration) |
|
result = CompositeVideoClip([video, txt_clip]) |
|
result.write_videofile(output, codec='libx264', audio_codec='aac', remove_temp=True) |
|
|
|
|
|
def video_audio(video_file_path, new_audio_file_path, output_video_file_path, translated_text,new_video_path): |
|
video = VideoFileClip(video_file_path) |
|
new_audio = AudioFileClip(new_audio_file_path) |
|
new_audio = new_audio.subclip(0, video.duration) |
|
video = video.set_audio(new_audio) |
|
|
|
|
|
try: |
|
video.write_videofile(output_video_file_path, codec="libx264", audio_codec="aac", remove_temp=True) |
|
except Exception as e: |
|
print("Error writing video file:", e) |
|
|
|
|
|
|
|
def tos(translated_text_file_path, video_path): |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
tts = TTS("tts_models/fr/mai/tacotron2-DDC").to(device) |
|
output_audio_dir = 'CS370-M5/audio' |
|
os.makedirs(output_audio_dir, exist_ok=True) |
|
|
|
file_name_only = os.path.splitext(os.path.basename(translated_text_file_path))[0] |
|
output_file_path = os.path.join(output_audio_dir, f"{file_name_only}.wav") |
|
|
|
with open(translated_text_file_path, 'r', encoding='utf-8') as file: |
|
translated_text = file.read() |
|
|
|
tts.tts_to_file(text=translated_text, file_path=output_file_path) |
|
|
|
output = f"CS370-M5/videos/{file_name_only}.mp4" |
|
new_video_path = f"CS370-M5/videos/{file_name_only}_new.mp4" |
|
|
|
video_audio(video_path, output_file_path, output,translated_text,new_video_path) |
|
return output |
|
|
|
def translate(input_text_path, output_text_path, source_lang='en', target_lang='fr', model_name="Helsinki-NLP/opus-mt-en-fr", batch_size=8): |
|
model = MarianMTModel.from_pretrained(model_name) |
|
tokenizer = MarianTokenizer.from_pretrained(model_name) |
|
|
|
def batch(model, tokenizer, sentences): |
|
sentences = [f"{source_lang}: {sentence}" for sentence in sentences] |
|
input_ids = tokenizer(sentences, return_tensors="pt", padding=True, truncation=True)["input_ids"] |
|
translation_ids = model.generate(input_ids) |
|
translated_texts = tokenizer.batch_decode(translation_ids, skip_special_tokens=True) |
|
return translated_texts |
|
|
|
def rtff(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
text = file.readlines() |
|
return text |
|
|
|
def wtff(file_path, translated_texts): |
|
with open(file_path, 'w', encoding='utf-8') as file: |
|
file.writelines([f"{line}\n" for line in translated_texts]) |
|
|
|
translated_lines = [] |
|
|
|
input_lines = rtff(input_text_path) |
|
|
|
for i in range(0, len(input_lines), batch_size): |
|
batch = input_lines[i:i + batch_size] |
|
translated_batch = batch(model, tokenizer, batch) |
|
translated_lines.extend(translated_batch) |
|
|
|
wtff(output_text_path, translated_lines) |
|
return translated_lines |
|
|
|
def downloading(video_url, target_lang='fr'): |
|
try: |
|
video_id = re.search(r"(?<=v=)[\w-]+", video_url) |
|
if video_id: |
|
video_id = video_id.group() |
|
yt = YouTube(video_url) |
|
stream = yt.streams.get_highest_resolution() |
|
|
|
modified_title = yt.title.replace(" ", "_") |
|
download_path = 'CS370-M5/videos' |
|
captions_path = 'CS370-M5/captions' |
|
|
|
os.makedirs(download_path, exist_ok=True) |
|
os.makedirs(captions_path, exist_ok=True) |
|
|
|
video_file = f'{download_path}/{modified_title}.mp4' |
|
stream.download(output_path=download_path, filename=modified_title + '.mp4') |
|
|
|
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) |
|
for transcript in transcript_list: |
|
print(f"Language: {transcript.language}") |
|
|
|
transcript = None |
|
|
|
for source_lang in ['en', 'auto']: |
|
try: |
|
transcript = transcript_list.find_generated_transcript([source_lang]).fetch() |
|
break |
|
except Exception as e: |
|
continue |
|
|
|
original_captions = "" |
|
for i, line in enumerate(transcript): |
|
start_time = line['start'] |
|
formatted_time = f"{int(start_time // 60):02d}:{int(start_time % 60):02d}" |
|
original_captions += f"{formatted_time} {line['text']}\n" |
|
|
|
original_filename = f'{captions_path}/{modified_title}_original.txt' |
|
with open(original_filename, 'w', encoding='utf-8') as file: |
|
file.write(original_captions) |
|
|
|
|
|
translated_captions = translate(original_filename, |
|
f'{captions_path}/{modified_title}_translated.txt', |
|
source_lang=source_lang, |
|
target_lang=target_lang) |
|
|
|
translated_text_filename = f'{captions_path}/{modified_title}_translated.txt' |
|
new_video_path = tos(translated_text_filename, video_file) |
|
return original_captions, translated_captions, original_filename, translated_text_filename, new_video_path |
|
else: |
|
print("video id not found.") |
|
return None, None, None, None, None |
|
except Exception as e: |
|
print("Error:", e) |
|
return None, None, None, None, None |
|
|