# -*- coding: utf-8 -*- #@title scirpts import time import numpy as np import pandas as pd import torch import faiss from sklearn.preprocessing import normalize from transformers import AutoTokenizer, AutoModelForQuestionAnswering from sentence_transformers import SentenceTransformer,util from pythainlp import Tokenizer import pickle import evaluate from sklearn.metrics.pairwise import cosine_similarity,euclidean_distances import gradio as gr print(torch.cuda.is_available()) __all__ = [ "mdeberta", "wangchanberta-hyp", # Best model ] predict_method = [ "faiss", "faissWithModel", "cosineWithModel", "semanticSearchWithModel", ] DEFAULT_MODEL='wangchanberta-hyp' DEFAULT_SENTENCE_EMBEDDING_MODEL='intfloat/multilingual-e5-base' MODEL_DICT = { 'wangchanberta': 'Chananchida/wangchanberta-th-wiki-qa_ref-params', 'wangchanberta-hyp': 'Chananchida/wangchanberta-th-wiki-qa_hyp-params', 'mdeberta': 'Chananchida/mdeberta-v3-th-wiki-qa_ref-params', 'mdeberta-hyp': 'Chananchida/mdeberta-v3-th-wiki-qa_hyp-params', } DATA_PATH='models/dataset.xlsx' EMBEDDINGS_PATH='models/embeddings.pkl' class ChatbotModel: def __init__(self, model=DEFAULT_MODEL): self._chatbot = Chatbot() self._chatbot.load_data() self._chatbot.load_model(model) self._chatbot.load_embedding_model(DEFAULT_SENTENCE_EMBEDDING_MODEL) self._chatbot.set_vectors() self._chatbot.set_index() def chat(self, question): return self._chatbot.answer_question(question) def eval(self,model,predict_method): return self._chatbot.eval(model_name=model,predict_method=predict_method) class Chatbot: def __init__(self): # Initialize variables self.df = None self.test_df = None self.model = None self.model_name = None self.tokenizer = None self.embedding_model = None self.vectors = None self.index = None self.k = 1 # top k most similar def load_data(self, path: str = DATA_PATH): self.df = pd.read_excel(path, sheet_name='Default') self.df['Context'] = pd.read_excel(path, sheet_name='mdeberta')['Context'] print('Load data done') def load_model(self, model_name: str = DEFAULT_MODEL): self.model = AutoModelForQuestionAnswering.from_pretrained(MODEL_DICT[model_name]) self.tokenizer = AutoTokenizer.from_pretrained(MODEL_DICT[model_name]) self.model_name = model_name print('Load model done') def load_embedding_model(self, model_name: str = DEFAULT_SENTENCE_EMBEDDING_MODEL): if torch.cuda.is_available(): # Check if GPU is available self.embedding_model = SentenceTransformer(model_name, device='cpu') else: self.embedding_model = SentenceTransformer(model_name) print('Load sentence embedding model done') def set_vectors(self): self.vectors = self.prepare_sentences_vector(self.load_embeddings(EMBEDDINGS_PATH)) def set_index(self): if torch.cuda.is_available(): # Check if GPU is available res = faiss.StandardGpuResources() self.index = faiss.IndexFlatL2(self.vectors.shape[1]) gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, self.index) gpu_index_flat.add(self.vectors) self.index = gpu_index_flat else: # If GPU is not available, use CPU-based Faiss index self.index = faiss.IndexFlatL2(self.vectors.shape[1]) self.index.add(self.vectors) def get_embeddings(self, text_list): return self.embedding_model.encode(text_list) def prepare_sentences_vector(self, encoded_list): encoded_list = [i.reshape(1, -1) for i in encoded_list] encoded_list = np.vstack(encoded_list).astype('float32') encoded_list = normalize(encoded_list) return encoded_list def store_embeddings(self, embeddings): with open('models/embeddings.pkl', "wb") as fOut: pickle.dump({'sentences': self.df['Question'], 'embeddings': embeddings}, fOut, protocol=pickle.HIGHEST_PROTOCOL) print('Store embeddings done') def load_embeddings(self, file_path): with open(file_path, "rb") as fIn: stored_data = pickle.load(fIn) stored_sentences = stored_data['sentences'] stored_embeddings = stored_data['embeddings'] print('Load (questions) embeddings done') return stored_embeddings def model_pipeline(self, question, similar_context): inputs = self.tokenizer(question, similar_context, return_tensors="pt") with torch.no_grad(): outputs = self.model(**inputs) answer_start_index = outputs.start_logits.argmax() answer_end_index = outputs.end_logits.argmax() predict_answer_tokens = inputs.input_ids[0, answer_start_index: answer_end_index + 1] Answer = self.tokenizer.decode(predict_answer_tokens) return Answer def faiss_search(self, question_vector): distances, indices = self.index.search(question_vector, self.k) similar_questions = [self.df['Question'][indices[0][i]] for i in range(self.k)] similar_contexts = [self.df['Context'][indices[0][i]] for i in range(self.k)] return similar_questions, similar_contexts, distances, indices def predict_faiss(self, message): message = message.strip() question_vector = self.get_embeddings(message) question_vector = self.prepare_sentences_vector([question_vector]) similar_questions, similar_contexts, distances, indices = self.faiss_search(question_vector) Answers = [self.df['Answer'][i] for i in indices[0]] Answer = Answers[0] return Answer # Function to predict using BERT embedding def predict_bert_embedding(self,message): message = message.strip() question_vector = self.get_embeddings(message) question_vector=self.prepare_sentences_vector([question_vector]) similar_questions, similar_contexts, distances,indices = self.faiss_search(question_vector) Answer = self.model_pipeline(similar_questions, similar_contexts) return Answer # def predict_semantic_search(self,message,corpus_embeddings): # message = message.strip() # query_embedding = self.embedding_model.encode(message, convert_to_tensor=True) # query_embedding = query_embedding.to('cpu') # hits = util.semantic_search(query_embedding, corpus_embeddings, top_k=1) # hit = hits[0][0] # context=self.df['Context'][hit['corpus_id']] # score="{:.4f})".format(hit['score']) # Answer = self.model_pipeline(message, context) # return Answer def predict_semantic_search(self, message): message = message.strip() query_embedding = self.embedding_model.encode([message], convert_to_tensor=True)[0] corpus_embeddings = self.embedding_model.encode(self.df['Question'].tolist(), convert_to_tensor=True) hits = util.semantic_search(query_embedding.unsqueeze(0), corpus_embeddings, top_k=1) hit = hits[0][0] context = self.df['Context'][hit['corpus_id']] Answer = self.model_pipeline(message, context) return Answer def predict_semantic_search(self, message): message = message.strip() query_embedding = self.embedding_model.encode([message], convert_to_tensor=True)[0] # Fix here query_embedding = query_embedding.to('cpu') corpus_embeddings = self.embedding_model.encode(self.df['Question'].tolist(), convert_to_tensor=True) # Fix here hits = util.semantic_search(query_embedding, corpus_embeddings, top_k=1) hit = hits[0][0] context = self.df['Context'][hit['corpus_id']] score = "{:.4f})".format(hit['score']) Answer = self.model_pipeline(message, context) return Answer def predict_without_faiss(self,message): MostSimilarContext = "" min_distance = 1000 message = message.strip(' \t\n') question_vector = self.get_embeddings([message]) question_vector=self.prepare_sentences_vector(question_vector) for j, _question_vector in enumerate(self.vectors): distance = euclidean_distances(question_vector, _question_vector.reshape(1, -1))[0][0] if distance < min_distance: min_distance = distance MostSimilarContext = self.df['Context'][j] similar_question = self.df['Question'][j] if distance <= 0.02469331026: break predict_answer = self.model_pipeline(message, MostSimilarContext) Answer = predict_answer.strip().replace("","@") return Answer bot = ChatbotModel() """#Gradio""" EXAMPLE_PATH = ["หลิน ไห่เฟิง มีชื่อเรียกอีกชื่อว่าอะไร" , "ใครเป็นผู้ตั้งสภาเศรษฐกิจโลกขึ้นในปี พ.ศ. 2514 โดยทุกปีจะมีการประชุมที่ประเทศสวิตเซอร์แลนด์", "โปรดิวเซอร์ของอัลบั้มตลอดกาล ของวงคีรีบูนคือใคร", "สกุลเดิมของหม่อมครูนุ่ม นวรัตน ณ อยุธยา คืออะไร"] demoFaiss = gr.ChatInterface(fn=bot._chatbot.predict_faiss, examples=EXAMPLE_PATH) demoBert = gr.ChatInterface(fn=bot._chatbot.predict_bert_embedding,examples=EXAMPLE_PATH) demoSemantic = gr.ChatInterface(fn=bot._chatbot.predict_semantic_search,examples=EXAMPLE_PATH) demoWithoutFiss = gr.ChatInterface(fn=bot._chatbot.predict_without_faiss,examples=EXAMPLE_PATH) demo = gr.TabbedInterface([demoFaiss, demoWithoutFiss, demoBert, demoSemantic], ["Faiss", "Model", "Faiss & Model", "Semantic Search & Model"]) demo.launch()