import os import gradio as gr from pathlib import Path from pydub import AudioSegment from pydub.utils import make_chunks import os import gensim from gensim.test.utils import datapath, get_tmpfile from gensim.scripts.glove2word2vec import glove2word2vec from gensim.models import KeyedVectors import torch import warnings import speech_recognition as sr from transformers import T5ForConditionalGeneration,T5Tokenizer import nltk from flashtext import KeywordProcessor from collections import OrderedDict from sklearn.metrics.pairwise import cosine_similarity nltk.download('punkt') nltk.download('brown') nltk.download('wordnet') nltk.download('stopwords') from nltk.corpus import wordnet as wn from nltk.tokenize import sent_tokenize from textwrap3 import wrap import random import numpy as np from nltk.corpus import stopwords import string import pke import traceback import spacy warnings.filterwarnings("ignore") def Process_audio(fileName): txtf=open("The_audio.txt","w+") myaudio=AudioSegment.from_wav(fileName) chunks_length_ms=8000 chunks=make_chunks(myaudio,chunks_length_ms) for i, chunk in enumerate(chunks): chunkName='./chunked/'+fileName+"_{0}.wav".format(i) print("I am Exporting",chunkName) chunk.export(chunkName,format="wav") File=chunkName r= sr.Recognizer() with sr.AudioFile(File) as source: audio_listened=r.listen(source) try: rec=r.recognize_google(audio_listened) txtf.write(rec+".") except sr.UnknownValueError: print("I dont recognize your audio") except sr.RequestError as e: print("could not get result") try: os.makedirs("chunked") except: pass def UrlToAudio(VideoUrl): url=VideoUrl os.system("youtube-dl -x --audio-format wav " + url) # load audio and pad/trim it to fit 30 seconds base_path = Path(r"") for wav_file_path in base_path.glob("*.wav"): Process_audio(str(wav_file_path)) break summary_model = T5ForConditionalGeneration.from_pretrained('t5-base') summary_tokenizer = T5Tokenizer.from_pretrained('t5-base') device = torch.device("cuda" if torch.cuda.is_available() else "cpu") summary_model = summary_model.to(device) def set_seed(seed: int): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) def postprocesstext (content): final="" for sent in sent_tokenize(content): sent = sent.capitalize() final = final +" "+sent return final def summarizer(text,model,tokenizer): text = text.strip().replace("\n"," ") text = "summarize: "+text # print (text) max_len = 512 encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] outs = model.generate(input_ids=input_ids, attention_mask=attention_mask, early_stopping=True, num_beams=3, num_return_sequences=1, no_repeat_ngram_size=2, min_length = 75, max_length=300) dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] summary = dec[0] summary = postprocesstext(summary) summary= summary.strip() return summary def get_nouns_multipartite(content): out=[] try: extractor = pke.unsupervised.MultipartiteRank() # not contain punctuation marks or stopwords as candidates. pos = {'PROPN','NOUN'} #pos = {'PROPN','NOUN'} stoplist = list(string.punctuation) stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] stoplist += stopwords.words('english') extractor.load_document(input=content,language='en', stoplist=stoplist, normalization=None) extractor.candidate_selection(pos=pos) # 4. build the Multipartite graph and rank candidates using random walk, # alpha controls the weight adjustment mechanism, see TopicRank for # threshold/method parameters. extractor.candidate_weighting(alpha=1.1, threshold=0.75, method='average') keyphrases = extractor.get_n_best(n=15) for val in keyphrases: out.append(val[0]) except: out = [] traceback.print_exc() return out def get_keywords(originaltext,summarytext): keywords = get_nouns_multipartite(originaltext) print ("keywords unsummarized: ",keywords) keyword_processor = KeywordProcessor() for keyword in keywords: keyword_processor.add_keyword(keyword) keywords_found = keyword_processor.extract_keywords(summarytext) keywords_found = list(set(keywords_found)) print ("keywords_found in summarized: ",keywords_found) important_keywords =[] for keyword in keywords: if keyword in keywords_found: important_keywords.append(keyword) return important_keywords[:4] question_model = T5ForConditionalGeneration.from_pretrained('ramsrigouthamg/t5_squad_v1') question_tokenizer = T5Tokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') question_model = question_model.to(device) def get_question(context,answer,model,tokenizer): text = "context: {} answer: {}".format(context,answer) encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] outs = model.generate(input_ids=input_ids, attention_mask=attention_mask, early_stopping=True, num_beams=5, num_return_sequences=1, no_repeat_ngram_size=2, max_length=72) dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] Question = dec[0].replace("question:","") Question= Question.strip() return Question def get_distractors_wordnet(word): distractors=[] try: syn = wn.synsets(word,'n')[0] word= word.lower() orig_word = word if len(word.split())>0: word = word.replace(" ","_") hypernym = syn.hypernyms() if len(hypernym) == 0: return distractors for item in hypernym[0].hyponyms(): name = item.lemmas()[0].name() #print ("name ",name, " word",orig_word) if name == orig_word: continue name = name.replace("_"," ") name = " ".join(w.capitalize() for w in name.split()) if name is not None and name not in distractors: distractors.append(name) except: print ("Wordnet distractors not found") return distractors glove_file = '/home/user/app/glove.6B.300d.txt' tmp_file = '/home/user/app/word2vec-glove.6B.300d.txt' glove2word2vec(glove_file, tmp_file) model = KeyedVectors.load_word2vec_format(tmp_file) def generate_distractors(answer, count): answer = str.lower(answer) ##Extracting closest words for the answer. try: closestWords = model.most_similar(positive=[answer], topn=count) except: #In case the word is not in the vocabulary, or other problem not loading embeddings return [] #Return count many distractors distractors = list(map(lambda x: x[0], closestWords))[0:count] return distractors context1 = gr.inputs.Textbox(lines=10, placeholder="Enter link here...") output = gr.outputs.HTML( label="Question and Answers") radiobutton = gr.inputs.Radio(["Wordnet", "Gensim"]) def generate_question(context1,radiobutton): try: UrlToAudio(context1) f = open("The_audio.txt", "r") context=f.read() summary_text = summarizer(context,summary_model,summary_tokenizer) for wrp in wrap(summary_text, 150): print (wrp) # np = getnounphrases(summary_text,sentence_transformer_model,3) np = get_keywords(context,summary_text) print ("\n\nNoun phrases",np) output="" for answer in np: ques = get_question(summary_text,answer,question_model,question_tokenizer) if radiobutton=="Wordnet": distractors = get_distractors_wordnet(answer) else: distractors = generate_distractors(answer.capitalize(),3) print(distractors) # output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n" output ="\n"+ output + "" + ques + "" # output = output + "
" output ="\n"+ output + "" + "Ans: " +answer.capitalize()+ "" if len(distractors)>0: for distractor in distractors[:4]: output = output + " " + distractor+ "\n" output = output + "
" summary ="Summary: "+ summary_text for answer in np: summary = summary.replace(answer,""+answer+"") summary = summary.replace(answer.capitalize(),""+answer.capitalize()+"") output = output + "

"+summary+"

" return output except: return "Something Went Wrong...Please Check Link or try Again" iface = gr.Interface( fn=generate_question, inputs=[context1,radiobutton], title="VidQuest", examples=[["https://www.youtube.com/watch?v=WSbgixdC9g8","Gensim"]], description="Keep in mind that it might take some minutes. Correct answers appear in green, while incorrect choices appear in red. Use the Gensim tool to find the most appropriate distractions.", outputs=output) iface.launch(debug=True)