uzi007's picture
APIs updated & yt-dlp added
fe9dbf9
raw
history blame
12 kB
import os
import json
# import pytorch_test
import uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from media_download import YoutubeDownloader
# from transcription import StableWhisper
# from summarizer import Extract_Summary, AudioBookNarration
# from audiobook import AudioBook
from helperfunctions import *
### API Configurations
app = FastAPI()
# Output Directory for Files Storage
output_folder = 'Output'
# Create a context variable to store the contexts for each user
users_context = dict()
# # CORS (Cross-Origin Resource Sharing)
# origins = [
# "http://localhost",
# "http://localhost:4200",
# ]
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"], # origins,
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
### APIs
@app.get("/get_media_metadata")
async def get_media_metadata(request: Request, url: str):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Getting User's Youtube Downloader
youtube_downloader = YoutubeDownloader(url, output_folder)
# Getting Youtube Media Info
media_metadata = youtube_downloader.get_media_metadata()
# Getting Status
status = 1 if media_metadata else 0
if status:
# Storing Info in the context for this user's session
users_context[user_ip] = dict()
users_context[user_ip]['downloader'] = youtube_downloader
# users_context[user_ip]['media_metadata'] = media_metadata
users_context[user_ip]['url'] = url
return {'status': status, 'media_metadata': media_metadata}
@app.get("/get_media_formats")
async def get_media_formats(request: Request):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Downloading Media for User
media_formats = users_context[user_ip]['downloader'].get_media_formats()
# Getting Status
status = 1 if media_formats else 0
if status:
# Storing Media Info in the context for this user's session
users_context[user_ip]['media_formats'] = media_formats
return {'status': status, 'media_formats': media_formats}
@app.get("/download_media")
async def download_media(request: Request, media_type: str, media_format: str, media_quality: str):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Downloading Media for User
media_path = users_context[user_ip]['downloader'].download(media_type, media_format, media_quality)
# Getting Status
status = 1 if media_path else 0
if status:
# Storing Media Info in the context for this user's session
users_context[user_ip]['media_path'] = media_path
users_context[user_ip]['media_type'] = media_type
return {'status': status, 'media_path': media_path}
@app.get("/get_transcript")
async def get_transcript(request: Request, subtitle_format: str = 'srt', word_level: bool = False):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Retrieving the media_path from the context for this user's session
media_path = users_context[user_ip]['media_path']
# Checking if the media_type is Video, then extract it's audio
media_type = users_context[user_ip]['media_type']
if media_type == 'video':
media_path = extract_audio(media_path)
# # Whisper based transcription
# stable_whisper_transcript = StableWhisper(media_path, output_folder, subtitle_format=subtitle_format, word_level=word_level)
# transcript = stable_whisper_transcript.generate_transcript()
# transcript_path = stable_whisper_transcript.save_transcript()
temp_dir = 'temp'
if word_level:
transcript_path = os.path.join(temp_dir, 'word_level_transcript.json')
with open(transcript_path, "r") as json_file:
transcript = json.load(json_file)
else:
transcript_path = os.path.join(temp_dir, 'sentence_level_transcript.json')
with open(transcript_path, "r") as json_file:
transcript = json.load(json_file)
# Getting Status
status = 1 if transcript else 0
if status:
# Storing Transcript Info in the context for this user's session
users_context[user_ip]['transcript'] = transcript
users_context[user_ip]['transcript_path'] = transcript_path
return {'status': status, "transcript": transcript}
@app.get("/get_translation")
async def get_translation(request: Request, target_language: str = 'en'):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Retrieving the transcript from the context for this user's session
transcript = users_context[user_ip]['transcript']
# # # NLLB based Translation
# nllb_translator = Translation(transcript, transcript['language'], target_language, 'output_path')
# translated_transcript = nllb_translator.get_translated_transcript()
# translated_subtitles = nllb_translator.get_translated_subtitles()
temp_dir = 'temp'
translated_transcript_path = os.path.join(temp_dir, 'translated_transcript.txt')
with open(translated_transcript_path, "r", encoding="utf-8") as f:
translated_transcript = f.read()
translated_subtitles_path = os.path.join(temp_dir, 'translated_subtitles.json')
with open(translated_subtitles_path, "r", encoding="utf-8") as json_file:
translated_subtitles = json.load(json_file)
# Getting Status
status = 1 if translated_transcript and translated_subtitles else 0
if status:
# Storing Translated Transcript Info in the context for this user's session
users_context[user_ip]['translated_transcript'] = translated_transcript
users_context[user_ip]['translated_subtitles'] = translated_subtitles
# users_context[user_ip]['transcript_path'] = transcript_path
return {'status': status, "transcript": translated_transcript, "subtitles": translated_subtitles}
@app.get("/get_summary")
async def get_summary(request: Request, Summary_type: str, Summary_strategy: str, Target_Person_type: str,
Response_length: str, Writing_style: str, text_input: str = None):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Getting Transcript if not provided
if not text_input:
text_input = users_context[user_ip]['transcript']
# # Extracting Summary
# summary_extractor = Extract_Summary(text_input=text_input)
# output = summary_extractor.define_chain(Summary_type=Summary_type,
# Summary_strategy=Summary_strategy,
# Target_Person_type=Target_Person_type,
# Response_length=Response_length,
# Writing_style=Writing_style,
# key_information=False)
temp_dir = 'temp'
file_path = os.path.join(temp_dir, 'summary.txt')
with open(file_path, 'r') as file:
output = file.read()
# Getting Status
status = 1 if output else 0
if status:
# Storing Summary Info in the context for this user's session
users_context[user_ip]['summary'] = output
return {'status': status, "summary": output}
@app.get("/get_key_info")
async def get_key_info(request: Request, Summary_type: str, Summary_strategy: str, Target_Person_type: str,
Response_length: str, Writing_style: str, text_input: str = None):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Getting Transcript if not provided
if not text_input:
text_input = users_context[user_ip]['transcript']
# # Extracting Summary
# summary_extractor = Extract_Summary(text_input=text_input)
# output = summary_extractor.define_chain(Summary_type=Summary_type,
# Summary_strategy=Summary_strategy,
# Target_Person_type=Target_Person_type,
# Response_length=Response_length,
# Writing_style=Writing_style,
# key_information=True)
temp_dir = 'temp'
file_path = os.path.join(temp_dir, 'key_info.txt')
with open(file_path, 'r') as file:
output = file.read()
# Getting Status
status = 1 if output else 0
if status:
# Storing Key Info in the context for this user's session
users_context[user_ip]['key_info'] = output
return {'status': status, "key_info": output}
# @app.get("/get_narration")
# async def get_narration(request: Request, narration_style: str, text_input: str = None):
# # Getting User's IP
# # user_ip = request.client.host
# user_ip = 1
# # Getting Transcript if not provided
# if not text_input:
# text_input = users_context[user_ip]['transcript']
# # # Extracting Narration
# # narrator = AudioBookNarration(text_input=text_input)
# # output = narrator.define_chain(narration_style=narration_style)
# temp_dir = 'temp'
# file_path = os.path.join(temp_dir, 'narration.txt')
# with open(file_path, 'r') as file:
# output = file.read()
# # Getting Status
# status = 1 if output else 0
# if status:
# # Storing Narration Info in the context for this user's session
# users_context[user_ip]['narration'] = output
# return {'status': status, "narration": output}
@app.get("/get_audiobook")
async def get_audiobook(request: Request, output_type : str, narration_style: str, speaker: str = "male", text_input: str = None):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Getting Transcript if not provided
if not text_input:
text_input = users_context[user_ip]['transcript']
# # Extracting Narration
# narrator = AudioBookNarration(text_input=text_input)
# output = narrator.define_chain(narration_style=narration_style)
# # Generating Audiobook
# audiobook = AudioBook(output_folder=output_folder)
# audio_path = audiobook.generate_audio_from_text(output, speaker=speaker, filename="output_audio")
temp_dir = 'temp'
file_path = os.path.join(temp_dir, 'narration.txt')
audio_path = file_path
# Getting Status
status = 1 if audio_path else 0
if status:
# Storing Audiobook path in the context for this user's session
users_context[user_ip]['audiobook_path'] = audio_path
return {'status': status, "audiobook_path": audio_path}
@app.get("/get_rendered_video")
async def get_rendered_video(request: Request, subtitles_type: str = 'original'):
# Getting User's IP
# user_ip = request.client.host
user_ip = 1
# Retrieving the media_path from the context for this user's session
media_path = users_context[user_ip]['media_path']
# Getting Required Subtitles
if subtitles_type == 'original':
subtitles_path = users_context[user_ip]['transcript_path']
elif subtitles_type == 'translated':
# Getting Translated Subtitles from the context for this user's session
translated_subtitles = users_context[user_ip]['translated_subtitles']
# Saving Translated Subtitles
subtitles_path = save_translated_subtitles(translated_subtitles, media_path)
# Burning Subtitles & Rendering Video
rendered_video_path = burn_subtitles(media_path, subtitles_path)
# Getting Status
status = 1 if rendered_video_path else 0
return {'status': status, "rendered_video_path": rendered_video_path}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)