Spaces:
Build error
Build error
import os | |
import gradio as gr | |
from pathlib import Path | |
from pydub import AudioSegment | |
from pydub.utils import make_chunks | |
import os | |
import gensim | |
from gensim.test.utils import datapath, get_tmpfile | |
from gensim.scripts.glove2word2vec import glove2word2vec | |
from gensim.models import KeyedVectors | |
import torch | |
import warnings | |
import speech_recognition as sr | |
from transformers import T5ForConditionalGeneration,T5Tokenizer | |
import nltk | |
from flashtext import KeywordProcessor | |
from collections import OrderedDict | |
from sklearn.metrics.pairwise import cosine_similarity | |
nltk.download('punkt') | |
nltk.download('brown') | |
nltk.download('wordnet') | |
nltk.download('stopwords') | |
from nltk.corpus import wordnet as wn | |
from nltk.tokenize import sent_tokenize | |
from textwrap3 import wrap | |
import random | |
import numpy as np | |
from nltk.corpus import stopwords | |
import string | |
import pke | |
import traceback | |
import spacy | |
warnings.filterwarnings("ignore") | |
############################################### | |
# Models # | |
############################################### | |
summary_model = T5ForConditionalGeneration.from_pretrained('t5-base') | |
summary_tokenizer = T5Tokenizer.from_pretrained('t5-base') | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
summary_model = summary_model.to(device) | |
glove_file = 'glove.6B.300d.txt' | |
tmp_file = 'word2vec-glove.6B.300d.txt' | |
glove2word2vec(glove_file, tmp_file) | |
model = KeyedVectors.load_word2vec_format(tmp_file) | |
question_model = T5ForConditionalGeneration.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
question_tokenizer = T5Tokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
question_model = question_model.to(device) | |
############################################### | |
def Process_audio(fileName): | |
text='' | |
txtf=[] | |
myaudio=AudioSegment.from_wav(fileName) | |
chunks_length_ms=8000 | |
chunks=make_chunks(myaudio,chunks_length_ms) | |
for i, chunk in enumerate(chunks): | |
chunkName='./chunked/'+fileName+"_{0}.wav".format(i) | |
print("I am Exporting",chunkName) | |
chunk.export(chunkName,format="wav") | |
File=chunkName | |
r= sr.Recognizer() | |
with sr.AudioFile(File) as source: | |
audio_listened=r.listen(source) | |
try: | |
rec=r.recognize_google(audio_listened) | |
txtf.append(rec+".") | |
text+=rec+"." | |
except sr.UnknownValueError: | |
print("I dont recognize your audio") | |
except sr.RequestError as e: | |
print("could not get result") | |
return text | |
try: | |
os.makedirs("chunked") | |
except: | |
pass | |
def UrlToAudio(VideoUrl): | |
url=VideoUrl | |
text=[] | |
os.system("yt-dlp -x --audio-format wav " + url) | |
# load audio and pad/trim it to fit 30 seconds | |
base_path = Path(r"") | |
for wav_file_path in base_path.glob("*.wav"): | |
text.append(Process_audio(str(wav_file_path))) | |
break | |
return ''.join(text) | |
def set_seed(seed: int): | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
def postprocesstext (content): | |
final="" | |
for sent in sent_tokenize(content): | |
sent = sent.capitalize() | |
final = final +" "+sent | |
return final | |
def summarizer(text,model,tokenizer): | |
text = text.strip().replace("\n"," ") | |
text = "summarize: "+text | |
# print (text) | |
max_len = 512 | |
encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
outs = model.generate(input_ids=input_ids, | |
attention_mask=attention_mask, | |
early_stopping=True, | |
num_beams=3, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
min_length = 75, | |
max_length=300) | |
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
summary = dec[0] | |
summary = postprocesstext(summary) | |
summary= summary.strip() | |
return summary | |
def get_nouns_multipartite(content): | |
out=[] | |
try: | |
extractor = pke.unsupervised.MultipartiteRank() | |
# not contain punctuation marks or stopwords as candidates. | |
pos = {'PROPN','NOUN'} | |
#pos = {'PROPN','NOUN'} | |
stoplist = list(string.punctuation) | |
stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] | |
stoplist += stopwords.words('english') | |
extractor.load_document(input=content,language='en', | |
stoplist=stoplist, | |
normalization=None) | |
extractor.candidate_selection(pos=pos) | |
# 4. build the Multipartite graph and rank candidates using random walk, | |
# alpha controls the weight adjustment mechanism, see TopicRank for | |
# threshold/method parameters. | |
extractor.candidate_weighting(alpha=1.1, | |
threshold=0.75, | |
method='average') | |
keyphrases = extractor.get_n_best(n=15) | |
for val in keyphrases: | |
out.append(val[0]) | |
except: | |
out = [] | |
traceback.print_exc() | |
return out | |
def get_keywords(originaltext,summarytext): | |
keywords = get_nouns_multipartite(originaltext) | |
print ("keywords unsummarized: ",keywords) | |
keyword_processor = KeywordProcessor() | |
for keyword in keywords: | |
keyword_processor.add_keyword(keyword) | |
keywords_found = keyword_processor.extract_keywords(summarytext) | |
keywords_found = list(set(keywords_found)) | |
print ("keywords_found in summarized: ",keywords_found) | |
important_keywords =[] | |
for keyword in keywords: | |
if keyword in keywords_found: | |
important_keywords.append(keyword) | |
return important_keywords[:4] | |
def get_question(context,answer,model,tokenizer): | |
text = "context: {} answer: {}".format(context,answer) | |
encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
outs = model.generate(input_ids=input_ids, | |
attention_mask=attention_mask, | |
early_stopping=True, | |
num_beams=5, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
max_length=72) | |
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
Question = dec[0].replace("question:","") | |
Question= Question.strip() | |
return Question | |
def get_distractors_wordnet(word): | |
distractors=[] | |
try: | |
syn = wn.synsets(word,'n')[0] | |
word= word.lower() | |
orig_word = word | |
if len(word.split())>0: | |
word = word.replace(" ","_") | |
hypernym = syn.hypernyms() | |
if len(hypernym) == 0: | |
return distractors | |
for item in hypernym[0].hyponyms(): | |
name = item.lemmas()[0].name() | |
#print ("name ",name, " word",orig_word) | |
if name == orig_word: | |
continue | |
name = name.replace("_"," ") | |
name = " ".join(w.capitalize() for w in name.split()) | |
if name is not None and name not in distractors: | |
distractors.append(name) | |
except: | |
print ("Wordnet distractors not found") | |
return distractors | |
def generate_distractors(answer, count): | |
answer = str.lower(answer) | |
##Extracting closest words for the answer. | |
try: | |
closestWords = model.most_similar(positive=[answer], topn=count) | |
except: | |
#In case the word is not in the vocabulary, or other problem not loading embeddings | |
return [] | |
#Return count many distractors | |
distractors = list(map(lambda x: x[0], closestWords))[0:count] | |
return distractors | |
context1 = gr.Textbox(lines=10, placeholder="Enter link here...") | |
output = [gr.HTML( label="Question and Answers"),gr.Textbox(label="YT Video Summary")] | |
radiobutton = gr.Radio(["Wordnet", "Gensim"]) | |
def generate_question(context1,radiobutton): | |
# try: | |
context=UrlToAudio(context1) | |
# f = open("The_audio.txt", "w+") | |
# context=f.read() | |
summary_text = summarizer(context,summary_model,summary_tokenizer) | |
for wrp in wrap(summary_text, 150): | |
print (wrp) | |
# np = getnounphrases(summary_text,sentence_transformer_model,3) | |
np = get_keywords(context,summary_text) | |
print ("\n\nNoun phrases",np) | |
output="" | |
for answer in np: | |
ques = get_question(summary_text,answer,question_model,question_tokenizer) | |
if radiobutton=="Wordnet": | |
distractors = get_distractors_wordnet(answer) | |
else: | |
distractors = generate_distractors(answer.capitalize(),3) | |
print(distractors) | |
# output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n" | |
output ="<br>"+ output + "<b style='color:blue;'>" + ques + "</b>" | |
# output = output + "<br>" | |
output ="<br>"+ output + "<li><b style='color:green;'>" + "Ans: " +answer.capitalize()+ "</b></li><br>" | |
if len(distractors)>0: | |
for distractor in distractors[:4]: | |
output = output + "<li><b style='color:brown;'>" + distractor+ "</b></li><br>" | |
output = output + "<br>" | |
# summary ="Summary: "+ summary_text | |
# for answer in np: | |
# summary = summary.replace(answer,"<b>"+answer+"</b>") | |
# summary = summary.replace(answer.capitalize(),"<b>"+answer.capitalize()+"</b>") | |
# output = output + "<p>"+summary+"</p>" | |
return output ,summary_text | |
# except: | |
# return "Something Went Wrong...Please Check Link or try Again" | |
iface = gr.Interface( | |
fn=generate_question, | |
inputs=[context1,radiobutton], | |
title="VidQuest", | |
examples=[["https://www.youtube.com/watch?v=J4Qsr93L1qs","Gensim"]], | |
description="This Space Generates MCQs from a Youtube video.Keep in mind that it might take some minutes. Correct answers appear in green, while incorrect choices appear in red. Use the Gensim tool to find the most appropriate distractions.", | |
outputs=output) | |
iface.launch(debug=True) |