Spaces:
Build error
Build error
import os | |
import gradio as gr | |
from pathlib import Path | |
from pydub import AudioSegment | |
from pydub.utils import make_chunks | |
import os | |
import warnings | |
import speech_recognition as sr | |
import torch | |
from transformers import T5ForConditionalGeneration,T5Tokenizer | |
import nltk | |
from flashtext import KeywordProcessor | |
from collections import OrderedDict | |
from sklearn.metrics.pairwise import cosine_similarity | |
nltk.download('punkt') | |
nltk.download('brown') | |
nltk.download('wordnet') | |
nltk.download('stopwords') | |
from nltk.corpus import wordnet as wn | |
from nltk.tokenize import sent_tokenize | |
from textwrap3 import wrap | |
import random | |
import numpy as np | |
from nltk.corpus import stopwords | |
import string | |
import pke | |
import traceback | |
warnings.filterwarnings("ignore") | |
def Process_audio(fileName): | |
txtf=open("The_audio.txt","w+") | |
myaudio=AudioSegment.from_wav(fileName) | |
chunks_length_ms=8000 | |
chunks=make_chunks(myaudio,chunks_length_ms) | |
for i, chunk in enumerate(chunks): | |
chunkName='./chunked/'+fileName+"_{0}.wav".format(i) | |
print("I am Exporting",chunkName) | |
chunk.export(chunkName,format="wav") | |
File=chunkName | |
r= sr.Recognizer() | |
with sr.AudioFile(File) as source: | |
audio_listened=r.listen(source) | |
try: | |
rec=r.recognize_google(audio_listened) | |
txtf.write(rec+".") | |
except sr.UnknownValueError: | |
print("I dont recognize your audio") | |
except sr.RequestError as e: | |
print("could not get result") | |
try: | |
os.makedirs("chunked") | |
except: | |
pass | |
def UrlToAudio(VideoUrl): | |
url=VideoUrl | |
os.system("youtube-dl -x --audio-format wav " + url) | |
# load audio and pad/trim it to fit 30 seconds | |
base_path = Path(r"") | |
for wav_file_path in base_path.glob("*.wav"): | |
Process_audio(str(wav_file_path)) | |
break | |
summary_model = T5ForConditionalGeneration.from_pretrained('t5-base') | |
summary_tokenizer = T5Tokenizer.from_pretrained('t5-base') | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
summary_model = summary_model.to(device) | |
def set_seed(seed: int): | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
def postprocesstext (content): | |
final="" | |
for sent in sent_tokenize(content): | |
sent = sent.capitalize() | |
final = final +" "+sent | |
return final | |
def summarizer(text,model,tokenizer): | |
text = text.strip().replace("\n"," ") | |
text = "summarize: "+text | |
# print (text) | |
max_len = 512 | |
encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
outs = model.generate(input_ids=input_ids, | |
attention_mask=attention_mask, | |
early_stopping=True, | |
num_beams=3, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
min_length = 75, | |
max_length=300) | |
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
summary = dec[0] | |
summary = postprocesstext(summary) | |
summary= summary.strip() | |
return summary | |
def get_nouns_multipartite(content): | |
out=[] | |
try: | |
extractor = pke.unsupervised.MultipartiteRank() | |
# not contain punctuation marks or stopwords as candidates. | |
pos = {'PROPN','NOUN'} | |
#pos = {'PROPN','NOUN'} | |
stoplist = list(string.punctuation) | |
stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] | |
stoplist += stopwords.words('english') | |
extractor.load_document(input=content,language='en', | |
stoplist=stoplist, | |
normalization=None) | |
extractor.candidate_selection(pos=pos) | |
# 4. build the Multipartite graph and rank candidates using random walk, | |
# alpha controls the weight adjustment mechanism, see TopicRank for | |
# threshold/method parameters. | |
extractor.candidate_weighting(alpha=1.1, | |
threshold=0.75, | |
method='average') | |
keyphrases = extractor.get_n_best(n=15) | |
for val in keyphrases: | |
out.append(val[0]) | |
except: | |
out = [] | |
traceback.print_exc() | |
return out | |
def get_keywords(originaltext,summarytext): | |
keywords = get_nouns_multipartite(originaltext) | |
print ("keywords unsummarized: ",keywords) | |
keyword_processor = KeywordProcessor() | |
for keyword in keywords: | |
keyword_processor.add_keyword(keyword) | |
keywords_found = keyword_processor.extract_keywords(summarytext) | |
keywords_found = list(set(keywords_found)) | |
print ("keywords_found in summarized: ",keywords_found) | |
important_keywords =[] | |
for keyword in keywords: | |
if keyword in keywords_found: | |
important_keywords.append(keyword) | |
return important_keywords[:4] | |
question_model = T5ForConditionalGeneration.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
question_tokenizer = T5Tokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
question_model = question_model.to(device) | |
def get_question(context,answer,model,tokenizer): | |
text = "context: {} answer: {}".format(context,answer) | |
encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
outs = model.generate(input_ids=input_ids, | |
attention_mask=attention_mask, | |
early_stopping=True, | |
num_beams=5, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
max_length=72) | |
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
Question = dec[0].replace("question:","") | |
Question= Question.strip() | |
return Question | |
def get_distractors_wordnet(word): | |
distractors=[] | |
try: | |
syn = wn.synsets(word,'n')[0] | |
word= word.lower() | |
orig_word = word | |
if len(word.split())>0: | |
word = word.replace(" ","_") | |
hypernym = syn.hypernyms() | |
if len(hypernym) == 0: | |
return distractors | |
for item in hypernym[0].hyponyms(): | |
name = item.lemmas()[0].name() | |
#print ("name ",name, " word",orig_word) | |
if name == orig_word: | |
continue | |
name = name.replace("_"," ") | |
name = " ".join(w.capitalize() for w in name.split()) | |
if name is not None and name not in distractors: | |
distractors.append(name) | |
except: | |
print ("Wordnet distractors not found") | |
return distractors | |
context1 = gr.inputs.Textbox(lines=10, placeholder="Enter link here...") | |
output = gr.outputs.HTML( label="Question and Answers") | |
radiobutton = gr.inputs.Radio(["Wordnet", "Gensim"]) | |
def generate_question(context1,radiobutton): | |
UrlToAudio(context1) | |
f = open("The_audio.txt", "r") | |
context=f.read() | |
summary_text = summarizer(context,summary_model,summary_tokenizer) | |
for wrp in wrap(summary_text, 150): | |
print (wrp) | |
# np = getnounphrases(summary_text,sentence_transformer_model,3) | |
np = get_keywords(context,summary_text) | |
print ("\n\nNoun phrases",np) | |
output="" | |
for answer in np: | |
ques = get_question(summary_text,answer,question_model,question_tokenizer) | |
if radiobutton=="Wordnet": | |
distractors = get_distractors_wordnet(answer) | |
#else: | |
#distractors = generate_distractors(answer.capitalize(),3) | |
#print(distractors) | |
# output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n" | |
output ="\n"+ output + "<b style='color:blue;'>" + ques + "</b>" | |
# output = output + "<br>" | |
output ="\n"+ output + "<b style='color:green;'>" + "Ans: " +answer.capitalize()+ "</b>" | |
if len(distractors)>0: | |
for distractor in distractors[:4]: | |
output = output + "<b style='color:brown;'>" + distractor+ "</b>\n" | |
output = output + "<br>" | |
summary ="Summary: "+ summary_text | |
for answer in np: | |
summary = summary.replace(answer,"<b>"+answer+"</b>") | |
summary = summary.replace(answer.capitalize(),"<b>"+answer.capitalize()+"</b>") | |
output = output + "<p>"+summary+"</p>" | |
return output | |
iface = gr.Interface( | |
fn=generate_question, | |
inputs=[context1,radiobutton], | |
title="VidQuest", | |
outputs=output) | |
iface.launch(debug=True) |