from langchain.document_loaders.unstructured import UnstructuredFileLoader from langchain.text_splitter import CharacterTextSplitter from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.chains import RetrievalQA from langchain.chat_models import ChatOpenAI from langchain.schema import AIMessage, HumanMessage, SystemMessage, Document from langchain.document_loaders import PyPDFLoader from transformers import AutoTokenizer, T5ForConditionalGeneration from retrieval.retrieval import Retrieval, BM25 from datetime import datetime import os, time, torch from torch.nn import Softmax import requests API_URL = "https://api-inference.huggingface.co/models/CreatorPhan/ViQA-small" headers = {"Authorization": "Bearer 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"} class Agent: def __init__(self, args=None) -> None: self.args = args self.choices = args.choices self.corpus = Retrieval(k=args.choices) self.context_value = "" self.use_context = False self.softmax = Softmax(dim=1) self.temp = [] self.replace_list = torch.load('retrieval/replace.pt') print("Model is loading...") self.model = T5ForConditionalGeneration.from_pretrained(args.model).to(args.device) self.tokenizer = AutoTokenizer.from_pretrained(args.tokenizer) print("Model loaded!") def load_context(self, doc_path): print('Loading file:', doc_path.name) if doc_path.name[-4:] == '.pdf': context = self.read_pdf(doc_path.name) else: # loader = UnstructuredFileLoader(doc_path.name) context = open(doc_path.name, encoding='utf-8').read() self.retrieval = Retrieval(docs=context) self.choices = self.retrieval.k self.use_context = True return f"Using file from {doc_path.name}" def API_call(self, prompt): response = requests.post(API_URL, headers=headers, json={"inputs": prompt}).json() if isinstance(response, list): return response[0]['generated_text'] else: time.sleep(3) return self.API_call(prompt) def asking(self, question): timestamp = datetime.now() timestamp = timestamp.strftime("[%Y-%m-%d %H:%M:%S]") print(timestamp, end=' ') s_query = time.time() if self.use_context: print("Answering with your context:", question) contexts = self.retrieval.get_context(question) else: print("Answering without your context:", question) contexts = self.corpus.get_context(question) prompts = [] for context in contexts: prompt = f"Trả lời câu hỏi: {question} Trong nội dung: {context['context']}" prompts.append(prompt) s_token = time.time() tokens = self.tokenizer(prompts, max_length=self.args.seq_len, truncation=True, padding='max_length', return_tensors='pt') s_gen = time.time() outputs = self.model.generate( input_ids=tokens.input_ids.to(self.args.device), attention_mask=tokens.attention_mask.to(self.args.device), max_new_tokens=self.args.out_len, output_scores=True, return_dict_in_generate=True ) s_de = time.time() results = [] scores = self.softmax(outputs.scores[0]) scores = scores.max(dim=1).values*100 # print(scores) for i in range(self.choices): result = contexts[i] score = round(scores[i].item()) result['score'] = score answer = self.tokenizer.decode(outputs.sequences[i], skip_special_tokens=True) result['answer'] = answer results.append(result) def get_score(record): return record['score']**2 * record['score_bm'] results.sort(key=get_score, reverse=True) self.temp = results t_mess = "t_query: {:.2f}\t t_token: {:.2f}\t t_gen: {:.2f}\t t_decode: {:.2f}\t".format( s_token-s_query, s_gen-s_token, s_de-s_gen, time.time()-s_de ) print(t_mess, len(self.temp)) if results[0]['score'] > 60: return results[0]['answer'] else: return f"Tôi không chắc nhưng câu trả lời có thể là: {results[0]['answer']}\nBạn có thể tham khảo các câu trả lời bên cạnh!" def get_context(self, context): self.context_value = context self.retrieval = Retrieval(k=self.choices, docs=context) self.choices = self.retrieval.k self.use_context = True return context def load_context_file(self, file): print('Loading file:', file.name) text = '' for line in open(file.name, 'r', encoding='utf8'): text += line self.context_value = text return text def clear_context(self): self.context_value = "" self.use_context = False self.choices = self.args.choices return "" def replace(self, text): for key, value in self.replace_list: text = text.replace(key, value) return text def read_pdf(self, file_path): loader = PyPDFLoader(file_path) pages = loader.load_and_split() text = '' for page in pages: page_content = page.page_content text += self.replace(page_content) return text