import io import os import json import logging import secrets import gradio as gr import numpy as np import openai import pandas as pd from google.oauth2.service_account import Credentials from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload from openai.embeddings_utils import distances_from_embeddings from .gpt_processor import QuestionAnswerer from .work_flow_controller import WorkFlowController OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") openai.api_key = OPENAI_API_KEY class Chatbot: def __init__(self) -> None: self.history = [] self.upload_state = "waiting" self.uid = self.__generate_uid() self.g_drive_service = self.__init_drive_service() self.knowledge_base = None self.context = None self.context_page_num = None self.context_file_name = None def build_knowledge_base(self, files, upload_mode="once"): work_flow_controller = WorkFlowController(files, self.uid) self.csv_result_path = work_flow_controller.csv_result_path self.json_result_path = work_flow_controller.json_result_path if upload_mode == "Upload to Database": self.__get_db_knowledge_base() else: self.__get_local_knowledge_base() def __get_db_knowledge_base(self): filename = "knowledge_base.csv" db = self.__read_db(self.g_drive_service) cur_content = pd.read_csv(self.csv_result_path) for _ in range(10): try: self.__write_into_db(self.g_drive_service, db, cur_content) break except Exception as e: logging.error(e) logging.error("Failed to upload to database, retrying...") continue self.knowledge_base = db self.upload_state = "done" def __get_local_knowledge_base(self): with open(self.csv_result_path, "r", encoding="UTF-8") as fp: knowledge_base = pd.read_csv(fp) knowledge_base["page_embedding"] = ( knowledge_base["page_embedding"].apply(eval).apply(np.array) ) self.knowledge_base = knowledge_base self.upload_state = "done" def __write_into_db(self, service, db: pd.DataFrame, cur_content: pd.DataFrame): db = pd.concat([db, cur_content], ignore_index=True) db.to_csv(f"{self.uid}_knowledge_base.csv", index=False) media = MediaFileUpload(f"{self.uid}_knowledge_base.csv", resumable=True) request = ( service.files() .update(fileId="1m3ozrphHP221hhdCFMFX9-10nzSDfNyW", media_body=media) .execute() ) def __init_drive_service(self): SCOPES = ["https://www.googleapis.com/auth/drive"] SERVICE_ACCOUNT_INFO = os.getenv("CREDENTIALS") service_account_info_dict = json.loads(SERVICE_ACCOUNT_INFO) creds = Credentials.from_service_account_info( service_account_info_dict, scopes=SCOPES ) return build("drive", "v3", credentials=creds) def __read_db(self, service): request = service.files().get_media(fileId="1m3ozrphHP221hhdCFMFX9-10nzSDfNyW") fh = io.BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() print(f"Download {int(status.progress() * 100)}%.") # file_content = fh.getvalue().decode('utf-8') fh.seek(0) return pd.read_csv(fh) def __read_file(self, service, filename) -> pd.DataFrame: query = f"name='{filename}'" results = service.files().list(q=query).execute() files = results.get("files", []) file_id = files[0]["id"] request = service.files().get_media(fileId=file_id) fh = io.BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() print(f"Download {int(status.progress() * 100)}%.") # file_content = fh.getvalue().decode('utf-8') fh.seek(0) return pd.read_csv(fh) def __upload_file(self, service): results = service.files().list(pageSize=10).execute() items = results.get("files", []) if not items: print("No files found.") else: print("Files:") for item in items: print(f"{item['name']} ({item['id']})") media = MediaFileUpload(self.csv_result_path, resumable=True) filename_prefix = "ex_bot_database_" filename = filename_prefix + self.uid + ".csv" request = ( service.files() .create( media_body=media, body={ "name": filename, "parents": [ "1Lp21EZlVlqL-c27VQBC6wTbUC1YpKMsG" ], # Optional, to place the file in a specific folder }, ) .execute() ) def clear_state(self): self.context = None self.context_page_num = None self.context_file_name = None self.knowledge_base = None self.upload_state = "waiting" self.history = [] def send_system_nofification(self): if self.upload_state == "waiting": conversation = [["已上傳文件", "文件處理中(摘要、翻譯等),結束後將自動回覆"]] return conversation elif self.upload_state == "done": conversation = [["已上傳文件", "文件處理完成,請開始提問"]] return conversation def change_md(self): content = self.__construct_summary() return gr.Markdown.update(content, visible=True) def __construct_summary(self): with open(self.json_result_path, "r", encoding="UTF-8") as fp: knowledge_base = json.load(fp) context = """""" for key in knowledge_base.keys(): file_name = knowledge_base[key]["file_name"] total_page = knowledge_base[key]["total_pages"] summary = knowledge_base[key]["summarized_content"] file_context = f""" ### 文件摘要 {file_name} (共 {total_page} 頁)

{summary}

""" context += file_context return context def user(self, message): self.history += [[message, None]] return "", self.history def bot(self): user_message = self.history[-1][0] print(f"user_message: {user_message}") if self.knowledge_base is None: response = [ [user_message, "請先上傳文件"], ] self.history = response return self.history else: self.__get_index_file(user_message) if self.context is None: response = [ [user_message, "無法找到相關文件,請重新提問"], ] self.history = response return self.history else: qa_processor = QuestionAnswerer() bot_message = qa_processor.answer_question( self.context, self.context_page_num, self.context_file_name, self.history, ) print(f"bot_message: {bot_message}") response = [ [user_message, bot_message], ] self.history[-1] = response[0] return self.history def __get_index_file(self, user_message): user_message_embedding = openai.Embedding.create( input=user_message, engine="text-embedding-ada-002" )["data"][0]["embedding"] self.knowledge_base["distance"] = distances_from_embeddings( user_message_embedding, self.knowledge_base["page_embedding"].values, distance_metric="cosine", ) self.knowledge_base = self.knowledge_base.sort_values( by="distance", ascending=True ) if self.knowledge_base["distance"].values[0] > 0.2: self.context = None else: self.context = self.knowledge_base["page_content"].values[0] self.context_page_num = self.knowledge_base["page_num"].values[0] self.context_file_name = self.knowledge_base["file_name"].values[0] def __generate_uid(self): return secrets.token_hex(8) class VideoChatbot: def __init__(self) -> None: self.metadata_keys = ["標題", "逐字稿", "摘要", "關鍵字"] self.metadata = { "c2fK-hxnPSY": { "標題": "可汗學院的創新教學:學生與老師模式解析", "逐字稿": "0:00\n這裡是一個關於西班牙美洲戰爭和AP美國歷史的練習\n0:04\n在可汗學院,我們以學生模式開始,並注意到如果學生要求解釋\n0:11\n它不只是給出答案,它會像一個好的導師一樣,只是試圖引導\n0:15\n學生朝正確的方向前進,並且還注意到老師可以看到\n0:21\n學生正在互動的內容作為安全措施,現在如果我們關閉學生模式,我們\n0:27\n進入老師模式,我們看到當老師要求解釋時,它非常不同,就像\n0:32\n有了老師的指南,它會給出如你所見的非常詳細的解釋,如果老師\n0:39\n想要它的教案,他們只需要要求,他們就會得到一個非常詳細的\n0:44\n教案,包括目標、活動和家庭作業要做的事情,然後如果老師\n0:52\n說太好了,Khanmigo,你說給一個講義或者作為家庭作業給一個反思\n0:58\n實際上給了反思作業,然後它會再次為老師構建那個\n1:03\n如果老師喜歡,他們可以要求自定義這些教案或這些提示或者這些\n1:08\n反思,讓它們更符合他們的學生正在做的事情,這是老師們通常花費\n1:13\n每天好幾個小時工作的事情,我們希望能夠節省\n1:17\n他們很多時間和精力,以利他們自己的健康和他們的學生。", "摘要": "這段文字描述了一個關於西班牙美洲戰爭和AP美國歷史的教學練習。練習首先展示學生模式,強調良好的教導方式並提到教師可以監控學生互動情況作為安全措施。隨後,進入老師模式,提供了詳細的解釋和教案,包括目標、活動和家庭作業。另外,還有一個自定義教案的選項,使其更符合學生的需求。整個過程旨在節省教師的時間和精力,並有助於他們的健康和學生的學習。", "關鍵字": ["AP美國歷史", "學生模式", "老師模式", "教案設計", "自定義教學"], } } def answer_question(self, user_message): self.video_id = "c2fK-hxnPSY" index = self.compute_similariy(user_message) if index is None: return "無法找到相關資訊,請重新提問" context = self.metadata[self.video_id][index] system_prompt = """ 你是一個知識檢索系統,我會給你一份文件,請幫我依照文件內容回答問題,並用繁體中文回答。以下是文件內容 """ messages = [ {"role": "system", "content": f"{system_prompt} + '\n' '''{context}'''"}, {"role": "user", "content": user_message}, ] try: response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, temperature=1, max_tokens=2048, frequency_penalty=0, presence_penalty=0.6, ) bot_answer = response["choices"][0]["message"]["content"] return bot_answer except Exception as e: logging.error(e) logging.error("Failed to answer question") def compute_similariy(self, user_message): threshold = 0.5 user_message_embedding = openai.Embedding.create( input=user_message, engine="text-embedding-ada-002" )["data"][0]["embedding"] index_embedding = {} for index in self.metadata_keys: index_embedding[index] = openai.Embedding.create( input=self.metadata[self.video_id][index], engine="text-embedding-ada-002", )["data"][0]["embedding"] # turn index_embedding into a dataframe index_embedding = pd.DataFrame( { "title": [list(index_embedding.keys())[0]], "embedding": [list(index_embedding.values())[0]], } ) index_embedding["distance"] = distances_from_embeddings( user_message_embedding, index_embedding["embedding"].values, distance_metric="cosine", ) index_embedding = index_embedding.sort_values(by="distance", ascending=True) if index_embedding["distance"].values[0] > threshold: return None else: return index_embedding["title"][0]