Spaces:
Runtime error
Runtime error
import os | |
import json | |
import uuid | |
import requests | |
from helperfunctions import * | |
from models import load_models | |
from languages import CODE2LANG | |
from s3_handler import S3Handler | |
from media_download import YoutubeDownloader | |
# from transcription import StableWhisper | |
# from translation import Translation | |
# from summarizer import Extract_Summary, AudioBookNarration | |
# from audiobook import AudioBook | |
from celery import Celery | |
celery = Celery('api_functions.py', broker='pyamqp://guest:guest@localhost//', backend='rpc://') | |
# celery = Celery('api_functions.py', broker='pyamqp://guest:guest@0.0.0.0//', backend='rpc://') | |
# Output Directory for Files Storage | |
output_folder = 'Output' | |
# S3 Handler | |
s3 = S3Handler() | |
# Create a context variable to store the contexts for each user | |
users_context = dict() | |
def get_media_metadata_task(user_id: str, url: str): | |
# User Folder Path | |
user_folder_path = os.path.join(output_folder, user_id) | |
# Getting User's Youtube Downloader | |
youtube_downloader = YoutubeDownloader(url, user_folder_path) | |
# Getting Youtube Media Info | |
media_metadata = youtube_downloader.get_media_metadata() | |
# Storing User's Media Metadata to Directory | |
media_metadata_path = os.path.join(user_folder_path, 'media_metadata.json') | |
with open(media_metadata_path, "w") as outfile: | |
json.dump(media_metadata, outfile) | |
# Storing User's Media Metadata to S3 | |
s3_path = s3.upload_file(user_id, 'media_metadata.json', media_metadata_path) | |
# Getting Status | |
status = 1 if media_metadata else 0 | |
if status: | |
# Storing Info in the context for this user's session | |
users_context[user_id] = dict() | |
users_context[user_id]['downloader'] = youtube_downloader | |
# users_context[user_id]['media_metadata'] = media_metadata | |
users_context[user_id]['url'] = url | |
return {'status': status, | |
'user_id': user_id, | |
'media_metadata': media_metadata, | |
'media_metadata_path': s3_path} | |
def get_media_formats_task(user_id): | |
# Getting Media Formats for User | |
media_formats = users_context[user_id]['downloader'].get_media_formats() | |
# User Folder Path | |
user_folder_path = os.path.join(output_folder, user_id) | |
# Storing User's Media Formats to Directory | |
media_formats_path = os.path.join(user_folder_path, 'media_formats.json') | |
with open(media_formats_path, "w") as outfile: | |
json.dump(media_formats, outfile) | |
# Storing User's Media Formats to S3 | |
s3_path = s3.upload_file(user_id, 'media_formats.json', media_formats_path) | |
# Getting Status | |
status = 1 if media_formats else 0 | |
if status: | |
# Storing Media Info in the context for this user's session | |
users_context[user_id]['media_formats'] = media_formats | |
return {'status': status, | |
'media_formats': media_formats, | |
'media_formats_path': s3_path} | |
def download_media_task(user_id,media_type: str, media_format: str, media_quality: str): | |
# Downloading Media for User | |
media_path = users_context[user_id]['downloader'].download(media_type, media_format, media_quality) | |
# Storing User's Downloaded Media to S3 | |
media_file = f"{media_type.lower()}_{media_quality.lower()}.{media_format.lower()}" | |
s3_path = s3.upload_file(user_id, media_file, media_path) | |
# Getting Status | |
status = 1 if media_path else 0 | |
if status: | |
# Storing Media Info in the context for this user's session | |
users_context[user_id]['media_path'] = media_path | |
users_context[user_id]['media_type'] = media_type | |
return {'status': status, 'media_path': s3_path} | |
def get_transcript_task(user_id: str, subtitle_format: str = 'srt', word_level: bool = False): | |
# If Video Already Downloaded | |
if 'media_path' in users_context[user_id].keys(): | |
# Retrieving the media_path from the context for this user's session | |
media_path = users_context[user_id]['media_path'] | |
# Checking if the media_type is Video, then extract it's audio | |
media_type = users_context[user_id]['media_type'] | |
if media_type == 'video': | |
media_path = extract_audio(media_path) | |
else: | |
# Downloading Audio for Transcription | |
media_path = users_context[user_id]['downloader'].download('audio', 'mp3', '128kbps') | |
# Whisper based transcription | |
user_folder_path = os.path.join(output_folder, user_id) | |
stable_whisper_transcript = StableWhisper(model=MODELS['transcription'], | |
media_path=media_path, | |
output_path=user_folder_path, | |
subtitle_format=subtitle_format, | |
word_level=word_level) | |
transcript = stable_whisper_transcript.generate_transcript() | |
transcript_path = stable_whisper_transcript.save_transcript() | |
subtitles_path = stable_whisper_transcript.save_subtitles() | |
# Storing User's Transcripts to S3 | |
s3_transcript_path = s3.upload_file(user_id, 'transcript.txt', transcript_path) | |
s3_subtitles_path = s3.upload_file(user_id, f'subtitles.{subtitle_format}', subtitles_path) | |
# Getting Status | |
status = 1 if transcript and s3_transcript_path and s3_subtitles_path else 0 | |
if status: | |
# Storing Transcript Info in the context for this user's session | |
users_context[user_id]['transcript'] = transcript | |
users_context[user_id]['transcript_path'] = transcript_path | |
users_context[user_id]['subtitles_path'] = subtitles_path | |
return {'status': status, | |
'transcript': transcript, | |
'transcript_path': s3_transcript_path, | |
'subtitles_path': s3_subtitles_path} | |
def get_translation_task(user_id: str, target_language: str = 'en'): | |
# If Transcript Available | |
if 'transcript' in users_context[user_id].keys(): | |
# Retrieving the transcript from the context for this user's session | |
transcript = users_context[user_id]['transcript'] | |
else: | |
return {'status': 0, 'message': 'Transcript not generated yet'} | |
# NLLB based Translation | |
user_folder_path = os.path.join(output_folder, user_id) | |
nllb_translator = Translation(model=MODELS['translation'], | |
transcript_dict=transcript, | |
source_lang=transcript['language'], | |
target_lang=target_language, | |
output_path=user_folder_path) | |
translated_transcript = nllb_translator.get_translated_transcript() | |
translated_subtitles = nllb_translator.get_translated_subtitles() | |
# Storing Translated Transcript as TXT file in UTF-8 format | |
translated_transcript_path = os.path.join(user_folder_path, 'translated_transcript.txt') | |
with open(translated_transcript_path, 'w', encoding='utf-8') as file: | |
file.write(translated_transcript) | |
# Storing Translated Transcript to S3 | |
s3_transcript_path = s3.upload_file(user_id, 'translated_transcript.txt', translated_transcript_path) | |
# TODO: Write Translated Transcript as SRT, VTT, ASS files | |
# Storing Translated Subtitles as JSON file (For Now) | |
translated_subtitles_path = os.path.join(user_folder_path, 'translated_subtitles.json') | |
with open(translated_subtitles_path, "w", encoding='utf-8') as file: | |
json.dump(translated_subtitles_path, file) | |
# Storing Translated Subtitles to S3 | |
s3_subtitles_path = s3.upload_file(user_id, 'translated_subtitles.json', translated_subtitles_path) | |
# Getting Status | |
status = 1 if translated_transcript and translated_subtitles else 0 | |
if status: | |
# Storing Translated Transcript Info in the context for this user's session | |
users_context[user_id]['translated_transcript'] = translated_transcript | |
users_context[user_id]['translated_transcript_path'] = translated_transcript_path | |
users_context[user_id]['translated_subtitles'] = translated_subtitles | |
users_context[user_id]['translated_subtitles_path'] = translated_subtitles_path | |
return {'status': status, | |
'transcript': translated_transcript, | |
'subtitles': translated_subtitles, | |
'transcript_path': s3_transcript_path, | |
'subtitles_path': s3_subtitles_path} | |
def get_summary_task(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str, | |
Response_length: str, Writing_style: str): | |
# If Transcript Available | |
if 'transcript' in users_context[user_id].keys(): | |
# Retrieving the transcript from the context for this user's session | |
text_input = users_context[user_id]['transcript'] | |
else: | |
return {'status': 0, 'message': 'Transcript not generated yet'} | |
# Extracting Summary | |
summary_extractor = Extract_Summary(text_input=text_input) | |
output = summary_extractor.define_chain(Summary_type=Summary_type, | |
Summary_strategy=Summary_strategy, | |
Target_Person_type=Target_Person_type, | |
Response_length=Response_length, | |
Writing_style=Writing_style, | |
key_information=False) | |
# Getting Status | |
status = 1 if output else 0 | |
if status: | |
# Storing Summary Info in the context for this user's session | |
users_context[user_id]['summary'] = output | |
return {'status': status, "summary": output} | |
def get_key_info_task(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str, | |
Response_length: str, Writing_style: str): | |
# If Transcript Available | |
if 'transcript' in users_context[user_id].keys(): | |
# Retrieving the transcript from the context for this user's session | |
text_input = users_context[user_id]['transcript'] | |
else: | |
return {'status': 0, 'message': 'Transcript not generated yet'} | |
# Extracting Key Value Info | |
summary_extractor = Extract_Summary(text_input=text_input) | |
output = summary_extractor.define_chain(Summary_type=Summary_type, | |
Summary_strategy=Summary_strategy, | |
Target_Person_type=Target_Person_type, | |
Response_length=Response_length, | |
Writing_style=Writing_style, | |
key_information=True) | |
# Getting Status | |
status = 1 if output else 0 | |
if status: | |
# Storing Key Info in the context for this user's session | |
users_context[user_id]['key_info'] = output | |
return {'status': status, "key_info": output} | |
def get_audiobook_task(user_id: str, narration_style: str, speaker: str = "male", | |
audio_format: str = "mp3", audio_quality: str = "128kbps"): | |
# If Transcript Available | |
if 'transcript' in users_context[user_id].keys(): | |
# Retrieving the transcript from the context for this user's session | |
text_input = users_context[user_id]['transcript'] | |
else: | |
return {'status': 0, 'message': 'Transcript not generated yet'} | |
# Extracting Narration | |
narrator = AudioBookNarration(text_input=text_input) | |
output = narrator.define_chain(narration_style=narration_style) | |
# Generating Audiobook | |
audiobook = AudioBook(output_folder=output_folder) | |
audio_path = audiobook.generate_audio_from_text(output, speaker=speaker, filename="output_audio") | |
# Converting the Audio to Required Audio Parameters | |
audio_path = convert_audio(audio_path, audio_format, audio_quality) | |
# Storing User's Audiobook to S3 | |
media_file = f"audiobook_{audio_quality.lower()}.{audio_format.lower()}" | |
s3_path = s3.upload_file(user_id, media_file, audio_path) | |
# Getting Status | |
status = 1 if audio_path else 0 | |
if status: | |
# Storing Audiobook path in the context for this user's session | |
users_context[user_id]['audiobook_path'] = audio_path | |
return {'status': status, "audiobook_path": s3_path} | |
def get_rendered_video_task(user_id: str, video_format: str, | |
video_quality: str, subtitles_type: str = 'original'): | |
# # Retrieving the media_path from the context for this user's session | |
# media_path = users_context[user_id]['media_path'] | |
# Downloading Video with Required Video Parameters for User | |
media_path = users_context[user_id]['downloader'].download('video', video_format, video_quality) | |
# Getting Required Subtitles | |
if 'original' in subtitles_type.lower(): | |
subtitles_path = users_context[user_id]['subtitles_path'] | |
elif 'translated' in subtitles_type.lower(): | |
# Getting Translated Subtitles from the context for this user's session | |
translated_subtitles = users_context[user_id]['translated_subtitles_path'] | |
# Saving Translated Subtitles | |
subtitles_path = save_translated_subtitles(translated_subtitles, media_path) | |
# Burning Subtitles & Rendering Video | |
rendered_video_path = burn_subtitles(media_path, subtitles_path) | |
# Storing User's Rendered Video to S3 | |
media_file = f"subtitles_video_{video_quality.lower()}.{video_format.lower()}" | |
s3_path = s3.upload_file(user_id, media_file, media_path) | |
# Getting Status | |
status = 1 if rendered_video_path else 0 | |
return {'status': status, "rendered_video_path": s3_path} |