import gradio as gr from pathlib import Path import os from transformers import AutoTokenizer, AutoModel, AutoModelForQuestionAnswering, pipeline from transformers import MarianMTModel, MarianTokenizer from nltk.tokenize import sent_tokenize from nltk.tokenize import LineTokenizer import math import torch import nltk import numpy as np import time import hashlib from tqdm import tqdm device = "cuda:0" if torch.cuda.is_available() else "cpu" import textract from scipy.special import softmax import pandas as pd from datetime import datetime nltk.download('punkt') docs = None # Definimos los modelos: # Traducción mname = "Helsinki-NLP/opus-mt-es-en" tokenizer_es_en = MarianTokenizer.from_pretrained(mname) model_es_en = MarianMTModel.from_pretrained(mname) model_es_en.to(device) mname = "Helsinki-NLP/opus-mt-en-es" tokenizer_en_es = MarianTokenizer.from_pretrained(mname) model_en_es = MarianMTModel.from_pretrained(mname) model_en_es.to(device) lt = LineTokenizer() # Responder preguntas tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/multi-qa-mpnet-base-dot-v1") model = AutoModel.from_pretrained("sentence-transformers/multi-qa-mpnet-base-dot-v1").to(device).eval() tokenizer_ans = AutoTokenizer.from_pretrained("deepset/roberta-large-squad2") model_ans = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-large-squad2").to(device).eval() if device == 'cuda:0': pipe = pipeline("question-answering",model_ans,tokenizer =tokenizer_ans,device = 0) else: pipe = pipeline("question-answering",model_ans,tokenizer =tokenizer_ans) def validate_dataset(dataset): global docs docs = None # clear it out if dataset is modified docs_ready = dataset.iloc[-1, 0] != "" if docs_ready: return "✨Listo✨" else: return "⚠️Esperando documentos..." def traducir_parrafos(parrafos, tokenizer, model, tam_bloque=8, ): parrafos_traducidos = [] for parrafo in parrafos: frases = sent_tokenize(parrafo) batches = math.ceil(len(frases) / tam_bloque) traducido = [] for i in range(batches): bloque_enviado = frases[i*tam_bloque:(i+1)*tam_bloque] model_inputs = tokenizer(bloque_enviado, return_tensors="pt", padding=True, truncation=True, max_length=500).to(device) with torch.no_grad(): bloque_traducido = model.generate(**model_inputs) traducido += bloque_traducido traducido = [tokenizer.decode(t, skip_special_tokens=True) for t in traducido] parrafos_traducidos += [" ".join(traducido)] return parrafos_traducidos def traducir_es_en(texto): parrafos = lt.tokenize(texto) par_tra = traducir_parrafos(parrafos, tokenizer_es_en, model_es_en) return "\n".join(par_tra) def traducir_en_es(texto): parrafos = lt.tokenize(texto) par_tra = traducir_parrafos(parrafos, tokenizer_en_es, model_en_es) return "\n".join(par_tra) def request_pathname(files): if files is None: return [[]] return [[file.name, file.name.split('/')[-1]] for file in files] def cls_pooling(model_output): return model_output.last_hidden_state[:,0] def encode_query(query): encoded_input = tokenizer(query, truncation=True, return_tensors='pt').to(device) with torch.no_grad(): model_output = model(**encoded_input, return_dict=True) embeddings = cls_pooling(model_output) return embeddings.cpu() def encode_docs(docs,maxlen = 64, stride = 32): encoded_input = [] embeddings = [] spans = [] file_names = [] name, text = docs text = text.split(" ") if len(text) < maxlen: text = " ".join(text) encoded_input.append(tokenizer(temp_text, return_tensors='pt', truncation = True).to(device)) spans.append(temp_text) file_names.append(name) else: num_iters = int(len(text)/maxlen)+1 for i in range(num_iters): if i == 0: temp_text = " ".join(text[i*maxlen:(i+1)*maxlen+stride]) else: temp_text = " ".join(text[(i-1)*maxlen:(i)*maxlen][-stride:] + text[i*maxlen:(i+1)*maxlen]) encoded_input.append(tokenizer(temp_text, return_tensors='pt', truncation = True).to(device)) spans.append(temp_text) file_names.append(name) with torch.no_grad(): for encoded in tqdm(encoded_input): model_output = model(**encoded, return_dict=True) embeddings.append(cls_pooling(model_output)) embeddings = np.float32(torch.stack(embeddings).transpose(0, 1).cpu()) np.save("emb_{}.npy".format(name),dict(zip(list(range(len(embeddings))),embeddings))) np.save("spans_{}.npy".format(name),dict(zip(list(range(len(spans))),spans))) np.save("file_{}.npy".format(name),dict(zip(list(range(len(file_names))),file_names))) return embeddings, spans, file_names def predict(query,data): query = traducir_es_en(query) name_to_save = data.name.split("/")[-1].split(".")[0][:-8] k=2 st = str([query,name_to_save]) st_hashed = str(hashlib.sha256(st.encode()).hexdigest()) #just to speed up examples load hist = st + " " + st_hashed now = datetime.now() current_time = now.strftime("%H:%M:%S") try: #if the same question was already asked for this document, upload question and answer df = pd.read_csv("{}.csv".format(hash(st))) list_outputs = [] for i in range(k): temp = [df.iloc[n] for n in range(k)][i] tupla = (traducir_en_es(temp.Respuesta), traducir_en_es(temp.Contexto), traducir_en_es(temp.Probabilidades)) # text = '' # text += 'Probabilidades: '+ temp.Probabilidades + '\n\n' # text += 'Respuesta: ' +temp.Respuesta + '\n\n' # text += 'Contexto: '+temp.Contexto + '\n\n' list_outputs.append(tupla) return list_outputs[0] except Exception as e: print(e) print(st) if name_to_save+".txt" in os.listdir(): #if the document was already used, load its embeddings doc_emb = np.load('emb_{}.npy'.format(name_to_save),allow_pickle='TRUE').item() doc_text = np.load('spans_{}.npy'.format(name_to_save),allow_pickle='TRUE').item() file_names_dicto = np.load('file_{}.npy'.format(name_to_save),allow_pickle='TRUE').item() doc_emb = np.array(list(doc_emb.values())).reshape(-1,768) doc_text = list(doc_text.values()) file_names = list(file_names_dicto.values()) else: text = textract.process("{}".format(data.name)).decode('utf8') text = text.replace("\r", " ") text = text.replace("\n", " ") text = text.replace(" . "," ") text = traducir_es_en(text) doc_emb, doc_text, file_names = encode_docs((name_to_save,text),maxlen = 64, stride = 32) doc_emb = doc_emb.reshape(-1, 768) with open("{}.txt".format(name_to_save),"w",encoding="utf-8") as f: f.write(text) #once embeddings are calculated, run MIPS start = time.time() query_emb = encode_query(query) scores = np.matmul(query_emb, doc_emb.transpose(1,0))[0].tolist() doc_score_pairs = list(zip(doc_text, scores, file_names)) doc_score_pairs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True) probs_sum = 0 probs = softmax(sorted(scores,reverse = True)[:k]) table = {"Contexto":[],"Respuesta":[],"Probabilidades":[]} #get answers for each pair of question (from user) and top best passages for i, (passage, _, names) in enumerate(doc_score_pairs[:k]): passage = passage.replace("\n","") #passage = passage.replace(" . "," ") if probs[i] > 0.1 or (i < 3 and probs[i] > 0.05): #generate answers for more likely passages but no less than 2 QA = {'question':query,'context':passage} ans = pipe(QA) probabilities = "P(a|p): {}, P(a|p,q): {}, P(p|q): {}".format(round(ans["score"],5), round(ans["score"]*probs[i],5), round(probs[i],5)) table["Contexto"].append(passage) table["Respuesta"].append(str(ans["answer"]).upper()) table["Probabilidades"].append(probabilities) else: table["Contexto"].append(passage) table["Respuesta"].append("no_answer_calculated") table["Probabilidades"].append("P(p|q): {}".format(round(probs[i],5))) #format answers for ~nice output and save it for future (if the same question is asked again using same pdf) df = pd.DataFrame(table) print(df) print("time: "+ str(time.time()-start)) with open("HISTORY.txt","a", encoding = "utf-8") as f: f.write(hist) f.write(" " + str(current_time)) f.write("\n") f.close() df.to_csv("{}.csv".format(hash(st)), index=False) list_outputs = [] for i in range(k): temp = [df.iloc[n] for n in range(k)][i] tupla = (traducir_en_es(temp.Respuesta), traducir_en_es(temp.Contexto), traducir_en_es(temp.Probabilidades)) # text = '' # text += 'Probabilidades: '+ temp.Probabilidades + '\n\n' # text += 'Respuesta: ' +temp.Respuesta + '\n\n' # text += 'Contexto: '+temp.Contexto + '\n\n' list_outputs.append(tupla) return list_outputs[0] with gr.Blocks() as demo: gr.Markdown(""" # Document Question and Answer adaptado al castellano por Pablo Ascorbe. Este espacio ha sido clonado y adaptado de: https://huggingface.co/spaces/whitead/paper-qa La idea es utilizar un modelo preentrenado de HuggingFace como "distilbert-base-cased-distilled-squad" y responder las preguntas en inglés, para ello, será necesario hacer primero una traducción de los textos en castellano a inglés y luego volver a traducir en sentido contrario. ## Instrucciones: Adjunte su documento, ya sea en formato .txt o .pdf, y pregunte lo que desee. """) file = gr.File( label="Sus documentos subidos (PDF o txt)") # dataset = gr.Dataframe( # headers=["filepath", "citation string"], # datatype=["str", "str"], # col_count=(2, "fixed"), # interactive=True, # label="Documentos y citas" # ) # buildb = gr.Textbox("⚠️Esperando documentos...", # label="Estado", interactive=False, show_label=True) # dataset.change(validate_dataset, inputs=[ # dataset], outputs=[buildb]) # uploaded_files.change(request_pathname, inputs=[ # uploaded_files], outputs=[dataset]) query = gr.Textbox( placeholder="Introduzca su pregunta aquí...", label="Pregunta") ask = gr.Button("Preguntar") gr.Markdown("## Respuesta") answer = gr.Markdown(label="Respuesta") prob = gr.Markdown(label="Probabilidades") with gr.Accordion("Contexto", open=False): gr.Markdown( "### Contexto\n\nEl siguiente contexto ha sido utilizado para generar la respuesta:") context = gr.Markdown(label="Contexto") # ask.click(fn=do_ask, inputs=[query, buildb, # dataset], outputs=[answer, context]) ask.click(fn=predict, inputs=[query, file], outputs=[answer, context, prob]) examples = gr.Examples(examples=["¿Cuándo suelen comenzar las adicciones?","Entrevista Miguel Ruiz.txt"]) demo.queue(concurrency_count=20) demo.launch(show_error=True) # iface = gr.Interface(fn =predict, # inputs = [gr.inputs.Textbox(default="What is Open-domain question answering?"), # gr.inputs.File(), # ], # outputs = [ # gr.outputs.Carousel(['text']), # ], # description=description, # title = title, # allow_flagging ="manual",flagging_options = ["correct","wrong"], # allow_screenshot=False) # iface.launch(enable_queue=True, show_error =True) # Definimos los modelos: # Traducción # mname = "Helsinki-NLP/opus-mt-es-en" # tokenizer_es_en = MarianTokenizer.from_pretrained(mname) # model_es_en = MarianMTModel.from_pretrained(mname) # model_es_en.to(device) # mname = "Helsinki-NLP/opus-mt-en-es" # tokenizer_en_es = MarianTokenizer.from_pretrained(mname) # model_en_es = MarianMTModel.from_pretrained(mname) # model_en_es.to(device) # lt = LineTokenizer() # Responder preguntas # question_answerer = pipeline("question-answering", model='distilbert-base-cased-distilled-squad') # def request_pathname(files): # if files is None: # return [[]] # return [[file.name, file.name.split('/')[-1]] for file in files] # def traducir_parrafos(parrafos, tokenizer, model, tam_bloque=8, ): # parrafos_traducidos = [] # for parrafo in parrafos: # frases = sent_tokenize(parrafo) # batches = math.ceil(len(frases) / tam_bloque) # traducido = [] # for i in range(batches): # bloque_enviado = frases[i*tam_bloque:(i+1)*tam_bloque] # model_inputs = tokenizer(bloque_enviado, return_tensors="pt", # padding=True, truncation=True, # max_length=500).to(device) # with torch.no_grad(): # bloque_traducido = model.generate(**model_inputs) # traducido += bloque_traducido # traducido = [tokenizer.decode(t, skip_special_tokens=True) for t in traducido] # parrafos_traducidos += [" ".join(traducido)] # return parrafos_traducidos # def traducir_es_en(texto): # parrafos = lt.tokenize(texto) # par_tra = traducir_parrafos(parrafos, tokenizer_es_en, model_es_en) # return "\n".join(par_tra) # def traducir_en_es(texto): # parrafos = lt.tokenize(texto) # par_tra = traducir_parrafos(parrafos, tokenizer_en_es, model_en_es) # return "\n".join(par_tra) # def validate_dataset(dataset): # global docs # docs = None # clear it out if dataset is modified # docs_ready = dataset.iloc[-1, 0] != "" # if docs_ready: # return "✨Listo✨" # else: # return "⚠️Esperando documentos..." # def do_ask(question, button, dataset): # global docs # docs_ready = dataset.iloc[-1, 0] != "" # if button == "✨Listo✨" and docs_ready: # for _, row in dataset.iterrows(): # path = row['filepath'] # text = Path(f'{path}').read_text() # text_en = traducir_es_en(text) # QA_input = { # 'question': traducir_es_en(question), # 'context': text_en # } # return traducir_en_es(question_answerer(QA_input)['answer']) # else: # return "" # # def do_ask(question, button, dataset, progress=gr.Progress()): # # global docs # # docs_ready = dataset.iloc[-1, 0] != "" # # if button == "✨Listo✨" and docs_ready: # # if docs is None: # don't want to rebuild index if it's already built # # import paperqa # # docs = paperqa.Docs() # # # dataset is pandas dataframe # # for _, row in dataset.iterrows(): # # key = None # # if ',' not in row['citation string']: # # key = row['citation string'] # # docs.add(row['filepath'], row['citation string'], key=key) # # else: # # return "" # # progress(0, "Construyendo índices...") # # docs._build_faiss_index() # # progress(0.25, "Encolando...") # # result = docs.query(question) # # progress(1.0, "¡Hecho!") # # return result.formatted_answer, result.context # with gr.Blocks() as demo: # gr.Markdown(""" # # Document Question and Answer adaptado al castellano por Pablo Ascorbe. # Este espacio ha sido clonado y adaptado de: https://huggingface.co/spaces/whitead/paper-qa # La idea es utilizar un modelo preentrenado de HuggingFace como "distilbert-base-cased-distilled-squad" # y responder las preguntas en inglés, para ello, será necesario hacer primero una traducción de los textos en castellano # a inglés y luego volver a traducir en sentido contrario. # ## Instrucciones: # Adjunte su documento, ya sea en formato .txt o .pdf, y pregunte lo que desee. # """) # uploaded_files = gr.File( # label="Sus documentos subidos (PDF o txt)", file_count="multiple", ) # dataset = gr.Dataframe( # headers=["filepath", "citation string"], # datatype=["str", "str"], # col_count=(2, "fixed"), # interactive=True, # label="Documentos y citas" # ) # buildb = gr.Textbox("⚠️Esperando documentos...", # label="Estado", interactive=False, show_label=True) # dataset.change(validate_dataset, inputs=[ # dataset], outputs=[buildb]) # uploaded_files.change(request_pathname, inputs=[ # uploaded_files], outputs=[dataset]) # query = gr.Textbox( # placeholder="Introduzca su pregunta aquí...", label="Pregunta") # ask = gr.Button("Preguntar") # gr.Markdown("## Respuesta") # answer = gr.Markdown(label="Respuesta") # with gr.Accordion("Contexto", open=False): # gr.Markdown( # "### Contexto\n\nEl siguiente contexto ha sido utilizado para generar la respuesta:") # context = gr.Markdown(label="Contexto") # # ask.click(fn=do_ask, inputs=[query, buildb, # # dataset], outputs=[answer, context]) # ask.click(fn=do_ask, inputs=[query, buildb, # dataset], outputs=[answer]) # demo.queue(concurrency_count=20) # demo.launch(show_error=True)