whispertube_backend / api_functions.py
uzi007's picture
Connection String Updated
7cc40be
import os
import json
import uuid
import requests
from helperfunctions import *
from models import load_models
from languages import CODE2LANG
from s3_handler import S3Handler
from media_download import YoutubeDownloader
# from transcription import StableWhisper
# from translation import Translation
# from summarizer import Extract_Summary, AudioBookNarration
# from audiobook import AudioBook
from celery import Celery
celery = Celery('api_functions.py', broker='pyamqp://guest:guest@localhost//', backend='rpc://')
# celery = Celery('api_functions.py', broker='pyamqp://guest:guest@0.0.0.0//', backend='rpc://')
# Output Directory for Files Storage
output_folder = 'Output'
# S3 Handler
s3 = S3Handler()
# Create a context variable to store the contexts for each user
users_context = dict()
@celery.task
def get_media_metadata_task(user_id: str, url: str):
# User Folder Path
user_folder_path = os.path.join(output_folder, user_id)
# Getting User's Youtube Downloader
youtube_downloader = YoutubeDownloader(url, user_folder_path)
# Getting Youtube Media Info
media_metadata = youtube_downloader.get_media_metadata()
# Storing User's Media Metadata to Directory
media_metadata_path = os.path.join(user_folder_path, 'media_metadata.json')
with open(media_metadata_path, "w") as outfile:
json.dump(media_metadata, outfile)
# Storing User's Media Metadata to S3
s3_path = s3.upload_file(user_id, 'media_metadata.json', media_metadata_path)
# Getting Status
status = 1 if media_metadata else 0
if status:
# Storing Info in the context for this user's session
users_context[user_id] = dict()
users_context[user_id]['downloader'] = youtube_downloader
# users_context[user_id]['media_metadata'] = media_metadata
users_context[user_id]['url'] = url
return {'status': status,
'user_id': user_id,
'media_metadata': media_metadata,
'media_metadata_path': s3_path}
@celery.task
def get_media_formats_task(user_id):
# Getting Media Formats for User
media_formats = users_context[user_id]['downloader'].get_media_formats()
# User Folder Path
user_folder_path = os.path.join(output_folder, user_id)
# Storing User's Media Formats to Directory
media_formats_path = os.path.join(user_folder_path, 'media_formats.json')
with open(media_formats_path, "w") as outfile:
json.dump(media_formats, outfile)
# Storing User's Media Formats to S3
s3_path = s3.upload_file(user_id, 'media_formats.json', media_formats_path)
# Getting Status
status = 1 if media_formats else 0
if status:
# Storing Media Info in the context for this user's session
users_context[user_id]['media_formats'] = media_formats
return {'status': status,
'media_formats': media_formats,
'media_formats_path': s3_path}
@celery.task
def download_media_task(user_id,media_type: str, media_format: str, media_quality: str):
# Downloading Media for User
media_path = users_context[user_id]['downloader'].download(media_type, media_format, media_quality)
# Storing User's Downloaded Media to S3
media_file = f"{media_type.lower()}_{media_quality.lower()}.{media_format.lower()}"
s3_path = s3.upload_file(user_id, media_file, media_path)
# Getting Status
status = 1 if media_path else 0
if status:
# Storing Media Info in the context for this user's session
users_context[user_id]['media_path'] = media_path
users_context[user_id]['media_type'] = media_type
return {'status': status, 'media_path': s3_path}
@celery.task
def get_transcript_task(user_id: str, subtitle_format: str = 'srt', word_level: bool = False):
# If Video Already Downloaded
if 'media_path' in users_context[user_id].keys():
# Retrieving the media_path from the context for this user's session
media_path = users_context[user_id]['media_path']
# Checking if the media_type is Video, then extract it's audio
media_type = users_context[user_id]['media_type']
if media_type == 'video':
media_path = extract_audio(media_path)
else:
# Downloading Audio for Transcription
media_path = users_context[user_id]['downloader'].download('audio', 'mp3', '128kbps')
# Whisper based transcription
user_folder_path = os.path.join(output_folder, user_id)
stable_whisper_transcript = StableWhisper(model=MODELS['transcription'],
media_path=media_path,
output_path=user_folder_path,
subtitle_format=subtitle_format,
word_level=word_level)
transcript = stable_whisper_transcript.generate_transcript()
transcript_path = stable_whisper_transcript.save_transcript()
subtitles_path = stable_whisper_transcript.save_subtitles()
# Storing User's Transcripts to S3
s3_transcript_path = s3.upload_file(user_id, 'transcript.txt', transcript_path)
s3_subtitles_path = s3.upload_file(user_id, f'subtitles.{subtitle_format}', subtitles_path)
# Getting Status
status = 1 if transcript and s3_transcript_path and s3_subtitles_path else 0
if status:
# Storing Transcript Info in the context for this user's session
users_context[user_id]['transcript'] = transcript
users_context[user_id]['transcript_path'] = transcript_path
users_context[user_id]['subtitles_path'] = subtitles_path
return {'status': status,
'transcript': transcript,
'transcript_path': s3_transcript_path,
'subtitles_path': s3_subtitles_path}
@celery.task
def get_translation_task(user_id: str, target_language: str = 'en'):
# If Transcript Available
if 'transcript' in users_context[user_id].keys():
# Retrieving the transcript from the context for this user's session
transcript = users_context[user_id]['transcript']
else:
return {'status': 0, 'message': 'Transcript not generated yet'}
# NLLB based Translation
user_folder_path = os.path.join(output_folder, user_id)
nllb_translator = Translation(model=MODELS['translation'],
transcript_dict=transcript,
source_lang=transcript['language'],
target_lang=target_language,
output_path=user_folder_path)
translated_transcript = nllb_translator.get_translated_transcript()
translated_subtitles = nllb_translator.get_translated_subtitles()
# Storing Translated Transcript as TXT file in UTF-8 format
translated_transcript_path = os.path.join(user_folder_path, 'translated_transcript.txt')
with open(translated_transcript_path, 'w', encoding='utf-8') as file:
file.write(translated_transcript)
# Storing Translated Transcript to S3
s3_transcript_path = s3.upload_file(user_id, 'translated_transcript.txt', translated_transcript_path)
# TODO: Write Translated Transcript as SRT, VTT, ASS files
# Storing Translated Subtitles as JSON file (For Now)
translated_subtitles_path = os.path.join(user_folder_path, 'translated_subtitles.json')
with open(translated_subtitles_path, "w", encoding='utf-8') as file:
json.dump(translated_subtitles_path, file)
# Storing Translated Subtitles to S3
s3_subtitles_path = s3.upload_file(user_id, 'translated_subtitles.json', translated_subtitles_path)
# Getting Status
status = 1 if translated_transcript and translated_subtitles else 0
if status:
# Storing Translated Transcript Info in the context for this user's session
users_context[user_id]['translated_transcript'] = translated_transcript
users_context[user_id]['translated_transcript_path'] = translated_transcript_path
users_context[user_id]['translated_subtitles'] = translated_subtitles
users_context[user_id]['translated_subtitles_path'] = translated_subtitles_path
return {'status': status,
'transcript': translated_transcript,
'subtitles': translated_subtitles,
'transcript_path': s3_transcript_path,
'subtitles_path': s3_subtitles_path}
@celery.task
def get_summary_task(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str,
Response_length: str, Writing_style: str):
# If Transcript Available
if 'transcript' in users_context[user_id].keys():
# Retrieving the transcript from the context for this user's session
text_input = users_context[user_id]['transcript']
else:
return {'status': 0, 'message': 'Transcript not generated yet'}
# Extracting Summary
summary_extractor = Extract_Summary(text_input=text_input)
output = summary_extractor.define_chain(Summary_type=Summary_type,
Summary_strategy=Summary_strategy,
Target_Person_type=Target_Person_type,
Response_length=Response_length,
Writing_style=Writing_style,
key_information=False)
# Getting Status
status = 1 if output else 0
if status:
# Storing Summary Info in the context for this user's session
users_context[user_id]['summary'] = output
return {'status': status, "summary": output}
@celery.task
def get_key_info_task(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str,
Response_length: str, Writing_style: str):
# If Transcript Available
if 'transcript' in users_context[user_id].keys():
# Retrieving the transcript from the context for this user's session
text_input = users_context[user_id]['transcript']
else:
return {'status': 0, 'message': 'Transcript not generated yet'}
# Extracting Key Value Info
summary_extractor = Extract_Summary(text_input=text_input)
output = summary_extractor.define_chain(Summary_type=Summary_type,
Summary_strategy=Summary_strategy,
Target_Person_type=Target_Person_type,
Response_length=Response_length,
Writing_style=Writing_style,
key_information=True)
# Getting Status
status = 1 if output else 0
if status:
# Storing Key Info in the context for this user's session
users_context[user_id]['key_info'] = output
return {'status': status, "key_info": output}
@celery.task
def get_audiobook_task(user_id: str, narration_style: str, speaker: str = "male",
audio_format: str = "mp3", audio_quality: str = "128kbps"):
# If Transcript Available
if 'transcript' in users_context[user_id].keys():
# Retrieving the transcript from the context for this user's session
text_input = users_context[user_id]['transcript']
else:
return {'status': 0, 'message': 'Transcript not generated yet'}
# Extracting Narration
narrator = AudioBookNarration(text_input=text_input)
output = narrator.define_chain(narration_style=narration_style)
# Generating Audiobook
audiobook = AudioBook(output_folder=output_folder)
audio_path = audiobook.generate_audio_from_text(output, speaker=speaker, filename="output_audio")
# Converting the Audio to Required Audio Parameters
audio_path = convert_audio(audio_path, audio_format, audio_quality)
# Storing User's Audiobook to S3
media_file = f"audiobook_{audio_quality.lower()}.{audio_format.lower()}"
s3_path = s3.upload_file(user_id, media_file, audio_path)
# Getting Status
status = 1 if audio_path else 0
if status:
# Storing Audiobook path in the context for this user's session
users_context[user_id]['audiobook_path'] = audio_path
return {'status': status, "audiobook_path": s3_path}
@celery.task
def get_rendered_video_task(user_id: str, video_format: str,
video_quality: str, subtitles_type: str = 'original'):
# # Retrieving the media_path from the context for this user's session
# media_path = users_context[user_id]['media_path']
# Downloading Video with Required Video Parameters for User
media_path = users_context[user_id]['downloader'].download('video', video_format, video_quality)
# Getting Required Subtitles
if 'original' in subtitles_type.lower():
subtitles_path = users_context[user_id]['subtitles_path']
elif 'translated' in subtitles_type.lower():
# Getting Translated Subtitles from the context for this user's session
translated_subtitles = users_context[user_id]['translated_subtitles_path']
# Saving Translated Subtitles
subtitles_path = save_translated_subtitles(translated_subtitles, media_path)
# Burning Subtitles & Rendering Video
rendered_video_path = burn_subtitles(media_path, subtitles_path)
# Storing User's Rendered Video to S3
media_file = f"subtitles_video_{video_quality.lower()}.{video_format.lower()}"
s3_path = s3.upload_file(user_id, media_file, media_path)
# Getting Status
status = 1 if rendered_video_path else 0
return {'status': status, "rendered_video_path": s3_path}