notivai-backend / app.py
Dhrumit1314's picture
Update app.py
90de860 verified
raw
history blame
11.3 kB
import os
import re
import time
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt
import moviepy.editor as mp
import requests
import spacy
import speech_recognition as sr
import tensorflow as tf
from flask import Flask, jsonify, request
from flask_cors import CORS
from io import BytesIO
from requests import get
from string import punctuation
from tqdm import tqdm
from transformers import BartTokenizer, T5ForConditionalGeneration, T5Tokenizer, TFBartForConditionalGeneration
from youtube_transcript_api import YouTubeTranscriptApi as yta
from wordcloud import WordCloud
from heapq import nlargest
from werkzeug.utils import secure_filename
from spacy.lang.en.stop_words import STOP_WORDS
import huggingface_hub as hf_hub
# Change the directory to the backend folder
# os.chdir("E:/Centennial/SEMESTER 6/Software Development Project/backend/")
# Create a Flask app
app = Flask(__name__)
CORS(app)
# Function to extract video ID from YouTube link
def extract_video_id(youtube_link):
pattern = re.compile(r'(?<=v=)[a-zA-Z0-9_-]+(?=&|\b|$)')
match = pattern.search(youtube_link)
if match:
return match.group()
else:
return None
@app.route('/', methods=['GET'])
def hello():
hostname = request.host
domain = request.url_root
user_agent = request.user_agent.string
h = f"<h1>Hello World</h1><p>Hostname: {hostname}</p><p>Domain: {domain}</p><p>User Agent: {user_agent}</p>"
return h
@app.route('/get_humiliated',methods=['GET'])
def hello2():
h = "<h1>Hello this is testing!!</h1>"
return h
# Route for uploading video files
@app.route('/upload_video', methods=['POST'])
def upload_video():
start_time = time.time()
if 'video' not in request.files:
return jsonify({'error': 'No video file found in the request'})
video = request.files['video']
if video.filename == '':
return jsonify({'error': 'No selected video file'})
if video.mimetype.split('/')[0] != 'video':
return jsonify({'error': 'The file uploaded is not a video'})
model_name = request.form.get('modelName')
print("MODEL:", model_name)
try:
# Save the uploaded video to a temporary directory
video_path = os.path.join(os.getcwd(), 'temp', secure_filename(video.filename))
video.save(video_path)
# Save the video file to Hugging Face Spaces
video_file_id = hf_hub.upload(video_path, name=video.filename)
# Transcribe audio from the uploaded video
transcript = transcribe_audio(video_path)
# Save the transcribed audio to a temporary WAV file
audio_path = os.path.join(os.getcwd(), 'temp', 'transcribed_audio.wav')
with open(audio_path, 'w') as audio_file:
audio_file.write(transcript)
# Save the audio file to Hugging Face Spaces
audio_file_id = hf_hub.upload(audio_path, name='transcribed_audio.wav')
# Summarize the transcribed text using the selected model
summary = ""
if model_name == 'T5':
summary = summarize_text_t5(transcript)
elif model_name == 'BART':
summary = summarize_text_bart(transcript)
else:
summary = summarizer(transcript)
# Calculate the elapsed time and print a success message
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Video saved successfully. Time taken: {elapsed_time} seconds")
# Return the transcript, summary, model name, video file ID, and audio file ID in the response
return jsonify({'message': 'successful', 'transcript': transcript, 'summary': summary, 'modelName': model_name,
'videoFileId': video_file_id, 'audioFileId': audio_file_id})
except Exception as e:
# If an error occurs during processing, return an error message
return jsonify({'error': str(e)})
# def upload_video():
# start_time = time.time()
# if 'video' not in request.files:
# return jsonify({'error': 'No video file found in the request'})
# video = request.files['video']
# if video.mimetype.split('/')[0] != 'video':
# return jsonify({'error': 'The file uploaded is not a video'})
# model_name = request.form.get('modelName')
# print("MODEL:", model_name)
# # backend_folder = 'backend_videos'
# # if not os.path.exists(backend_folder):
# # os.makedirs(backend_folder)
# video_path = os.path.join(os.getcwd(), secure_filename(video.filename))
# video.save(video_path)
# transcript = transcribe_audio(video_path)
# summary = ""
# if model_name == 'T5':
# summary = summarize_text_t5(transcript)
# elif model_name == 'BART':
# summary = summarize_text_bart(transcript)
# else:
# summary = summarizer(transcript)
# end_time = time.time()
# elapsed_time = end_time - start_time
# print(f"Video saved successfully. Time taken: {elapsed_time} seconds")
# return jsonify({'message': 'successful', 'transcript': transcript, 'summary': summary, 'modelName': model_name})
# Route for uploading YouTube video links
@app.route('/youtube_upload_video', methods=['POST'])
def upload_youtube_video():
start_time = time.time()
transcript = "Testing text"
summary = "Testing text"
model_name = request.form.get('modelName')
youtube_link = request.form.get('link')
print('link', youtube_link)
video_id = extract_video_id(youtube_link)
if video_id is None:
return jsonify({'message': 'successful', 'transcript': "error with youtube link", 'summary': "error with youtube link", 'modelName': model_name})
transcript = generate_and_save_transcript_with_visuals(video_id)
summary = ""
if model_name == 'T5':
summary = summarize_text_t5(transcript)
elif model_name == 'BART':
summary = summarize_text_bart(transcript)
else:
summary = summarizer(transcript)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Video saved successfully. Time taken: {elapsed_time} seconds")
return jsonify({'message': 'successful', 'transcript': transcript, 'summary': summary, 'modelName': model_name})
# Function to generate transcript and visuals for YouTube videos
def generate_and_save_transcript_with_visuals(video_id, file_name="yt_generated_transcript.txt"):
try:
data = yta.get_transcript(video_id)
transcript = ''
for value in tqdm(data, desc="Downloading Transcript", unit=" lines"):
for key, val in value.items():
if key == 'text':
transcript += val + ' '
transcript = transcript.strip()
return transcript
except Exception as e:
print(f"Error: {str(e)}")
# Transcribe audio from video
def transcribe_audio(file_path, chunk_duration=30):
video = mp.VideoFileClip(file_path)
audio = video.audio
audio.write_audiofile("sample_audio.wav", codec='pcm_s16le')
r = sr.Recognizer()
with sr.AudioFile("sample_audio.wav") as source:
audio = r.record(source)
total_duration = len(audio.frame_data) / audio.sample_rate
total_chunks = int(total_duration / chunk_duration) + 1
all_text = []
def transcribe_chunk(start):
nonlocal all_text
chunk = audio.get_segment(start * 1000, (start + chunk_duration) * 1000)
try:
text = r.recognize_google(chunk)
all_text.append(text)
print(f" Chunk {start}-{start+chunk_duration}: {text}")
except sr.UnknownValueError:
all_text.append("")
except sr.RequestError as e:
all_text.append(f"[Error: {e}]")
num_threads = min(total_chunks, total_chunks + 5)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
list(tqdm(executor.map(transcribe_chunk, range(0, int(total_duration), chunk_duration)),
total=total_chunks, desc="Transcribing on multithreading: "))
wordcloud = WordCloud(width=800, height=400, background_color="white").generate(' '.join(all_text))
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()
return ' '.join(all_text)
# Load pre-trained models and tokenizers
tokenizer_bart = BartTokenizer.from_pretrained('facebook/bart-large')
tokenizer_t5 = T5Tokenizer.from_pretrained('t5-small')
with tf.device('/CPU:0'):
model_t5 = T5ForConditionalGeneration.from_pretrained("Dhrumit1314/T5_TextSummary")
model_bart = TFBartForConditionalGeneration.from_pretrained("Dhrumit1314/BART_TextSummary")
# Function to summarize text using T5 model
def summarize_text_t5(text):
start_time = time.time()
t5_prepared_Text = "summarize: "+text
tokenized_text = tokenizer_t5.encode(t5_prepared_Text, return_tensors="pt")
summary_ids = model_t5.generate(tokenized_text,
num_beams=4,
no_repeat_ngram_size=2,
min_length=256,
max_length=512,
early_stopping=True)
output = tokenizer_t5.decode(summary_ids[0], skip_special_tokens=True)
end_time = time.time()
print(f"Execution time for T5 Model: {end_time - start_time} seconds")
return output
def summarize_text_bart(text):
start_time = time.time()
inputs = tokenizer_bart([text], max_length=1024, return_tensors='tf')
summary_ids = model_bart.generate(inputs['input_ids'], num_beams=4, max_length=256, early_stopping=True)
output = [tokenizer_bart.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=False) for g in summary_ids]
end_time = time.time()
print(f"Execution time for BART Model: {end_time - start_time} seconds")
return output[0]
# Spacy summarizer
def summarizer(rawdocs):
stopwords = list(STOP_WORDS)
nlp = spacy.load('en_core_web_sm')
doc = nlp(rawdocs)
tokens = [token.text for token in doc]
word_freq = {}
for word in doc:
if word.text.lower() not in stopwords and word.text.lower() not in punctuation:
if word.text not in word_freq.keys():
word_freq[word.text] = 1
else:
word_freq[word.text] += 1
max_freq = max(word_freq.values())
for word in word_freq.keys():
word_freq[word] = word_freq[word]/max_freq
sent_tokens = [sent for sent in doc.sents]
sent_scores = {}
for sent in sent_tokens:
for word in sent:
if word.text in word_freq.keys():
if sent not in sent_scores.keys():
sent_scores[sent] = word_freq[word.text]
else:
sent_scores[sent] += word_freq[word.text]
select_len = int(len(sent_tokens) * 0.3)
summary = nlargest(select_len, sent_scores, key=sent_scores.get)
final_summary = [word.text for word in summary]
summary = ' '.join(final_summary)
return summary
# Main run function
if __name__ == '__main__':
# os.chdir("E:/Centennial/SEMESTER 6/Software Development Project/backend/")
app.run(debug=True, port=7860, host='0.0.0.0', use_reloader=False)