ChenyuRabbitLove's picture
fix/fix build KB function
a779e10
import io
import os
import json
import logging
import secrets
import gradio as gr
import numpy as np
import openai
import pandas as pd
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
from openai.embeddings_utils import distances_from_embeddings
from .gpt_processor import QuestionAnswerer
from .work_flow_controller import WorkFlowController
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
openai.api_key = OPENAI_API_KEY
class Chatbot:
def __init__(self) -> None:
self.history = []
self.upload_state = "waiting"
self.uid = self.__generate_uid()
self.g_drive_service = self.__init_drive_service()
self.knowledge_base = None
self.context = None
self.context_page_num = None
self.context_file_name = None
def build_knowledge_base(self, files, upload_mode="once"):
work_flow_controller = WorkFlowController(files, self.uid)
self.csv_result_path = work_flow_controller.csv_result_path
self.json_result_path = work_flow_controller.json_result_path
if upload_mode == "Upload to Database":
self.__get_db_knowledge_base()
else:
self.__get_local_knowledge_base()
def __get_db_knowledge_base(self):
filename = "knowledge_base.csv"
db = self.__read_db(self.g_drive_service)
cur_content = pd.read_csv(self.csv_result_path)
for _ in range(10):
try:
self.__write_into_db(self.g_drive_service, db, cur_content)
break
except Exception as e:
logging.error(e)
logging.error("Failed to upload to database, retrying...")
continue
self.knowledge_base = db
self.upload_state = "done"
def __get_local_knowledge_base(self):
with open(self.csv_result_path, "r", encoding="UTF-8") as fp:
knowledge_base = pd.read_csv(fp)
knowledge_base["page_embedding"] = (
knowledge_base["page_embedding"].apply(eval).apply(np.array)
)
self.knowledge_base = knowledge_base
self.upload_state = "done"
def __write_into_db(self, service, db: pd.DataFrame, cur_content: pd.DataFrame):
db = pd.concat([db, cur_content], ignore_index=True)
db.to_csv(f"{self.uid}_knowledge_base.csv", index=False)
media = MediaFileUpload(f"{self.uid}_knowledge_base.csv", resumable=True)
request = (
service.files()
.update(fileId="1m3ozrphHP221hhdCFMFX9-10nzSDfNyW", media_body=media)
.execute()
)
def __init_drive_service(self):
SCOPES = ["https://www.googleapis.com/auth/drive"]
SERVICE_ACCOUNT_INFO = os.getenv("CREDENTIALS")
service_account_info_dict = json.loads(SERVICE_ACCOUNT_INFO)
creds = Credentials.from_service_account_info(
service_account_info_dict, scopes=SCOPES
)
return build("drive", "v3", credentials=creds)
def __read_db(self, service):
request = service.files().get_media(fileId="1m3ozrphHP221hhdCFMFX9-10nzSDfNyW")
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f"Download {int(status.progress() * 100)}%.")
# file_content = fh.getvalue().decode('utf-8')
fh.seek(0)
return pd.read_csv(fh)
def __read_file(self, service, filename) -> pd.DataFrame:
query = f"name='{filename}'"
results = service.files().list(q=query).execute()
files = results.get("files", [])
file_id = files[0]["id"]
request = service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f"Download {int(status.progress() * 100)}%.")
# file_content = fh.getvalue().decode('utf-8')
fh.seek(0)
return pd.read_csv(fh)
def __upload_file(self, service):
results = service.files().list(pageSize=10).execute()
items = results.get("files", [])
if not items:
print("No files found.")
else:
print("Files:")
for item in items:
print(f"{item['name']} ({item['id']})")
media = MediaFileUpload(self.csv_result_path, resumable=True)
filename_prefix = "ex_bot_database_"
filename = filename_prefix + self.uid + ".csv"
request = (
service.files()
.create(
media_body=media,
body={
"name": filename,
"parents": [
"1Lp21EZlVlqL-c27VQBC6wTbUC1YpKMsG"
], # Optional, to place the file in a specific folder
},
)
.execute()
)
def clear_state(self):
self.context = None
self.context_page_num = None
self.context_file_name = None
self.knowledge_base = None
self.upload_state = "waiting"
self.history = []
def send_system_nofification(self):
if self.upload_state == "waiting":
conversation = [["已上傳文件", "文件處理中(摘要、翻譯等),結束後將自動回覆"]]
return conversation
elif self.upload_state == "done":
conversation = [["已上傳文件", "文件處理完成,請開始提問"]]
return conversation
def change_md(self):
content = self.__construct_summary()
return gr.Markdown.update(content, visible=True)
def __construct_summary(self):
with open(self.json_result_path, "r", encoding="UTF-8") as fp:
knowledge_base = json.load(fp)
context = """"""
for key in knowledge_base.keys():
file_name = knowledge_base[key]["file_name"]
total_page = knowledge_base[key]["total_pages"]
summary = knowledge_base[key]["summarized_content"]
file_context = f"""
### 文件摘要
{file_name} (共 {total_page} 頁)<br><br>
{summary}<br><br>
"""
context += file_context
return context
def user(self, message):
self.history += [[message, None]]
return "", self.history
def bot(self):
user_message = self.history[-1][0]
print(f"user_message: {user_message}")
if self.knowledge_base is None:
response = [
[user_message, "請先上傳文件"],
]
self.history = response
return self.history
else:
self.__get_index_file(user_message)
if self.context is None:
response = [
[user_message, "無法找到相關文件,請重新提問"],
]
self.history = response
return self.history
else:
qa_processor = QuestionAnswerer()
bot_message = qa_processor.answer_question(
self.context,
self.context_page_num,
self.context_file_name,
self.history,
)
print(f"bot_message: {bot_message}")
response = [
[user_message, bot_message],
]
self.history[-1] = response[0]
return self.history
def __get_index_file(self, user_message):
user_message_embedding = openai.Embedding.create(
input=user_message, engine="text-embedding-ada-002"
)["data"][0]["embedding"]
self.knowledge_base["distance"] = distances_from_embeddings(
user_message_embedding,
self.knowledge_base["page_embedding"].values,
distance_metric="cosine",
)
self.knowledge_base = self.knowledge_base.sort_values(
by="distance", ascending=True
)
if self.knowledge_base["distance"].values[0] > 0.2:
self.context = None
else:
self.context = self.knowledge_base["page_content"].values[0]
self.context_page_num = self.knowledge_base["page_num"].values[0]
self.context_file_name = self.knowledge_base["file_name"].values[0]
def __generate_uid(self):
return secrets.token_hex(8)
class VideoChatbot:
def __init__(self) -> None:
self.metadata_keys = ["標題", "逐字稿", "摘要", "關鍵字"]
self.metadata = {
"c2fK-hxnPSY": {
"標題": "可汗學院的創新教學:學生與老師模式解析",
"逐字稿": "0:00\n這裡是一個關於西班牙美洲戰爭和AP美國歷史的練習\n0:04\n在可汗學院,我們以學生模式開始,並注意到如果學生要求解釋\n0:11\n它不只是給出答案,它會像一個好的導師一樣,只是試圖引導\n0:15\n學生朝正確的方向前進,並且還注意到老師可以看到\n0:21\n學生正在互動的內容作為安全措施,現在如果我們關閉學生模式,我們\n0:27\n進入老師模式,我們看到當老師要求解釋時,它非常不同,就像\n0:32\n有了老師的指南,它會給出如你所見的非常詳細的解釋,如果老師\n0:39\n想要它的教案,他們只需要要求,他們就會得到一個非常詳細的\n0:44\n教案,包括目標、活動和家庭作業要做的事情,然後如果老師\n0:52\n說太好了,Khanmigo,你說給一個講義或者作為家庭作業給一個反思\n0:58\n實際上給了反思作業,然後它會再次為老師構建那個\n1:03\n如果老師喜歡,他們可以要求自定義這些教案或這些提示或者這些\n1:08\n反思,讓它們更符合他們的學生正在做的事情,這是老師們通常花費\n1:13\n每天好幾個小時工作的事情,我們希望能夠節省\n1:17\n他們很多時間和精力,以利他們自己的健康和他們的學生。",
"摘要": "這段文字描述了一個關於西班牙美洲戰爭和AP美國歷史的教學練習。練習首先展示學生模式,強調良好的教導方式並提到教師可以監控學生互動情況作為安全措施。隨後,進入老師模式,提供了詳細的解釋和教案,包括目標、活動和家庭作業。另外,還有一個自定義教案的選項,使其更符合學生的需求。整個過程旨在節省教師的時間和精力,並有助於他們的健康和學生的學習。",
"關鍵字": ["AP美國歷史", "學生模式", "老師模式", "教案設計", "自定義教學"],
}
}
def answer_question(self, user_message):
self.video_id = "c2fK-hxnPSY"
index = self.compute_similariy(user_message)
if index is None:
return "無法找到相關資訊,請重新提問"
context = self.metadata[self.video_id][index]
system_prompt = """
你是一個知識檢索系統,我會給你一份文件,請幫我依照文件內容回答問題,並用繁體中文回答。以下是文件內容
"""
messages = [
{"role": "system", "content": f"{system_prompt} + '\n' '''{context}'''"},
{"role": "user", "content": user_message},
]
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
temperature=1,
max_tokens=2048,
frequency_penalty=0,
presence_penalty=0.6,
)
bot_answer = response["choices"][0]["message"]["content"]
return bot_answer
except Exception as e:
logging.error(e)
logging.error("Failed to answer question")
def compute_similariy(self, user_message):
threshold = 0.5
user_message_embedding = openai.Embedding.create(
input=user_message, engine="text-embedding-ada-002"
)["data"][0]["embedding"]
index_embedding = {}
for index in self.metadata_keys:
index_embedding[index] = openai.Embedding.create(
input=self.metadata[self.video_id][index],
engine="text-embedding-ada-002",
)["data"][0]["embedding"]
# turn index_embedding into a dataframe
index_embedding = pd.DataFrame(
{
"title": [list(index_embedding.keys())[0]],
"embedding": [list(index_embedding.values())[0]],
}
)
index_embedding["distance"] = distances_from_embeddings(
user_message_embedding,
index_embedding["embedding"].values,
distance_metric="cosine",
)
index_embedding = index_embedding.sort_values(by="distance", ascending=True)
if index_embedding["distance"].values[0] > threshold:
return None
else:
return index_embedding["title"][0]