|
import gradio as gr |
|
import shutil |
|
|
|
from chains.local_doc_qa import LocalDocQA |
|
from configs.model_config import * |
|
import nltk |
|
import models.shared as shared |
|
from models.loader.args import parser |
|
from models.loader import LoaderCheckPoint |
|
import os |
|
import pandas as pd |
|
|
|
nltk.data.path = [NLTK_DATA_PATH] + nltk.data.path |
|
|
|
|
|
def get_vs_list(): |
|
lst_default = ["python_bot"] |
|
if not os.path.exists(KB_ROOT_PATH): |
|
return lst_default |
|
lst = os.listdir(KB_ROOT_PATH) |
|
if not lst: |
|
return lst_default |
|
lst.sort() |
|
return lst_default + lst |
|
|
|
|
|
embedding_model_dict_list = list(embedding_model_dict.keys()) |
|
|
|
llm_model_dict_list = list(llm_model_dict.keys()) |
|
|
|
local_doc_qa = LocalDocQA() |
|
|
|
flag_csv_logger = gr.CSVLogger() |
|
|
|
user = "None" |
|
|
|
users = [ |
|
("wsy", "123456"), |
|
("wdy", "654321"), |
|
("lhj", "123456"), |
|
("hhy", "123456"), |
|
("yl", "123456"), |
|
("hy", "123456"), |
|
] |
|
|
|
vs_path = "/home/wsy/Langchain-chat/Langchain-Chatchat/knowledge_base" |
|
|
|
def get_answer(query, vs_path, history, mode, score_threshold=VECTOR_SEARCH_SCORE_THRESHOLD, |
|
vector_search_top_k=VECTOR_SEARCH_TOP_K, chunk_conent: bool = True, |
|
chunk_size=CHUNK_SIZE, streaming: bool = STREAMING): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if mode == "知识库问答" and vs_path is not None and os.path.exists(vs_path) and "index.faiss" in os.listdir( |
|
vs_path): |
|
for resp, history in local_doc_qa.get_knowledge_based_answer( |
|
query=query, vs_path=vs_path, chat_history=history, streaming=streaming): |
|
source = "\n\n" |
|
source += "".join( |
|
[f"""<details> <summary>出处 [{i + 1}] {os.path.split(doc.metadata["source"])[-1]}</summary>\n""" |
|
f"""{doc.page_content}\n""" |
|
f"""</details>""" |
|
for i, doc in |
|
enumerate(resp["source_documents"])]) |
|
history[-1][-1] += source |
|
yield history, "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
answer_result_stream_result = local_doc_qa.llm_model_chain( |
|
{"prompt": query, "history": history, "streaming": streaming}) |
|
|
|
for answer_result in answer_result_stream_result['answer_result_stream']: |
|
resp = answer_result.llm_output["answer"] |
|
history = answer_result.history |
|
history[-1][-1] = resp |
|
yield history, "" |
|
logger.info(f"flagging: username={user},query={query},vs_path={vs_path},mode={mode},history={history}") |
|
flag_csv_logger.flag([query, vs_path, history, mode], username=user) |
|
|
|
|
|
def init_model(): |
|
args = parser.parse_args() |
|
|
|
args_dict = vars(args) |
|
shared.loaderCheckPoint = LoaderCheckPoint(args_dict) |
|
llm_model_ins = shared.loaderLLM() |
|
llm_model_ins.history_len = LLM_HISTORY_LEN |
|
try: |
|
local_doc_qa.init_cfg(llm_model=llm_model_ins) |
|
answer_result_stream_result = local_doc_qa.llm_model_chain( |
|
{"prompt": "你好", "history": [], "streaming": False}) |
|
|
|
for answer_result in answer_result_stream_result['answer_result_stream']: |
|
print(answer_result.llm_output) |
|
reply = """模型已成功加载,可以开始对话""" |
|
logger.info(reply) |
|
return reply |
|
except Exception as e: |
|
logger.error(e) |
|
reply = """模型未成功加载,请到页面左上角"模型配置"选项卡中重新选择后点击"加载模型"按钮""" |
|
if str(e) == "Unknown platform: darwin": |
|
logger.info("该报错可能因为您使用的是 macOS 操作系统,需先下载模型至本地后执行 Web UI,具体方法请参考项目 README 中本地部署方法及常见问题:" |
|
" https://github.com/imClumsyPanda/langchain-ChatGLM") |
|
else: |
|
logger.info(reply) |
|
return reply |
|
|
|
|
|
def reinit_model(llm_model, embedding_model, llm_history_len, no_remote_model, use_ptuning_v2, use_lora, top_k, |
|
history): |
|
try: |
|
llm_model_ins = shared.loaderLLM(llm_model, no_remote_model, use_ptuning_v2) |
|
llm_model_ins.history_len = llm_history_len |
|
local_doc_qa.init_cfg(llm_model=llm_model_ins, |
|
embedding_model=embedding_model, |
|
top_k=top_k) |
|
model_status = """模型已成功重新加载""" |
|
logger.info(model_status) |
|
except Exception as e: |
|
logger.error(e) |
|
model_status = """模型未成功重新加载,请到页面左上角"模型配置"选项卡中重新选择后点击"加载模型"按钮""" |
|
logger.info(model_status) |
|
return history + [[None, model_status]] |
|
|
|
|
|
def get_vector_store(vs_id, files, sentence_size, history, one_conent, one_content_segmentation): |
|
vs_path = os.path.join(KB_ROOT_PATH, vs_id, "vector_store") |
|
filelist = [] |
|
if local_doc_qa.llm_model_chain and local_doc_qa.embeddings: |
|
if isinstance(files, list): |
|
for file in files: |
|
filename = os.path.split(file.name)[-1] |
|
shutil.move(file.name, os.path.join(KB_ROOT_PATH, vs_id, "content", filename)) |
|
filelist.append(os.path.join(KB_ROOT_PATH, vs_id, "content", filename)) |
|
vs_path, loaded_files = local_doc_qa.init_knowledge_vector_store(filelist, vs_path, sentence_size) |
|
else: |
|
vs_path, loaded_files = local_doc_qa.one_knowledge_add(vs_path, files, one_conent, one_content_segmentation, |
|
sentence_size) |
|
if len(loaded_files): |
|
file_status = f"已添加 {'、'.join([os.path.split(i)[-1] for i in loaded_files if i])} 内容至知识库,并已加载知识库,请开始提问" |
|
else: |
|
file_status = "文件未成功加载,请重新上传文件" |
|
else: |
|
file_status = "模型未完成加载,请先在加载模型后再导入文件" |
|
vs_path = None |
|
logger.info(file_status) |
|
return vs_path, None, history + [[None, file_status]], \ |
|
gr.update(choices=local_doc_qa.list_file_from_vector_store(vs_path) if vs_path else []) |
|
|
|
|
|
def change_vs_name_input(vs_id, history): |
|
if vs_id == "新建知识库": |
|
return gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), None, history, \ |
|
gr.update(choices=[]), gr.update(visible=False) |
|
else: |
|
vs_path = os.path.join(KB_ROOT_PATH, vs_id, "vector_store") |
|
if "index.faiss" in os.listdir(vs_path): |
|
file_status = f"已加载知识库{vs_id},请开始提问" |
|
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), \ |
|
vs_path, history + [[None, file_status]], \ |
|
gr.update(choices=local_doc_qa.list_file_from_vector_store(vs_path), value=[]), \ |
|
gr.update(visible=True) |
|
else: |
|
file_status = f"已选择知识库{vs_id},当前知识库中未上传文件,请先上传文件后,再开始提问" |
|
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), \ |
|
vs_path, history + [[None, file_status]], \ |
|
gr.update(choices=[], value=[]), gr.update(visible=True, value=[]) |
|
|
|
|
|
knowledge_base_test_mode_info = ("【注意】\n\n" |
|
"1. 您已进入知识库测试模式,您输入的任何对话内容都将用于进行知识库查询," |
|
"并仅输出知识库匹配出的内容及相似度分值和及输入的文本源路径,查询的内容并不会进入模型查询。\n\n" |
|
"2. 知识相关度 Score 经测试,建议设置为 500 或更低,具体设置情况请结合实际使用调整。" |
|
"""3. 使用"添加单条数据"添加文本至知识库时,内容如未分段,则内容越多越会稀释各查询内容与之关联的score阈值。\n\n""" |
|
"4. 单条内容长度建议设置在100-150左右。\n\n" |
|
"5. 本界面用于知识入库及知识匹配相关参数设定,但当前版本中," |
|
"本界面中修改的参数并不会直接修改对话界面中参数,仍需前往`configs/model_config.py`修改后生效。" |
|
"相关参数将在后续版本中支持本界面直接修改。") |
|
|
|
|
|
def change_mode(mode, history): |
|
if mode == "知识库问答": |
|
return gr.update(visible=True), gr.update(visible=False), history |
|
|
|
elif mode == "知识库测试": |
|
return gr.update(visible=True), gr.update(visible=True), [[None, |
|
knowledge_base_test_mode_info]] |
|
else: |
|
return gr.update(visible=False), gr.update(visible=False), history |
|
|
|
|
|
def change_chunk_conent(mode, label_conent, history): |
|
conent = "" |
|
if "chunk_conent" in label_conent: |
|
conent = "搜索结果上下文关联" |
|
elif "one_content_segmentation" in label_conent: |
|
conent = "内容分段入库" |
|
|
|
if mode: |
|
return gr.update(visible=True), history + [[None, f"【已开启{conent}】"]] |
|
else: |
|
return gr.update(visible=False), history + [[None, f"【已关闭{conent}】"]] |
|
|
|
|
|
def add_vs_name(vs_name, chatbot): |
|
if vs_name is None or vs_name.strip() == "": |
|
vs_status = "知识库名称不能为空,请重新填写知识库名称" |
|
chatbot = chatbot + [[None, vs_status]] |
|
return gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update( |
|
visible=False), chatbot, gr.update(visible=False) |
|
elif vs_name in get_vs_list(): |
|
vs_status = "与已有知识库名称冲突,请重新选择其他名称后提交" |
|
chatbot = chatbot + [[None, vs_status]] |
|
return gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update( |
|
visible=False), chatbot, gr.update(visible=False) |
|
else: |
|
|
|
if not os.path.exists(os.path.join(KB_ROOT_PATH, vs_name, "content")): |
|
os.makedirs(os.path.join(KB_ROOT_PATH, vs_name, "content")) |
|
|
|
if not os.path.exists(os.path.join(KB_ROOT_PATH, vs_name, "vector_store")): |
|
os.makedirs(os.path.join(KB_ROOT_PATH, vs_name, "vector_store")) |
|
vs_status = f"""已新增知识库"{vs_name}",将在上传文件并载入成功后进行存储。请在开始对话前,先完成文件上传。 """ |
|
chatbot = chatbot + [[None, vs_status]] |
|
return gr.update(visible=True, choices=get_vs_list(), value=vs_name), gr.update( |
|
visible=False), gr.update(visible=False), gr.update(visible=True), chatbot, gr.update(visible=True) |
|
|
|
|
|
|
|
def reinit_vector_store(vs_id, history): |
|
try: |
|
shutil.rmtree(os.path.join(KB_ROOT_PATH, vs_id, "vector_store")) |
|
vs_path = os.path.join(KB_ROOT_PATH, vs_id, "vector_store") |
|
sentence_size = gr.Number(value=SENTENCE_SIZE, precision=0, |
|
label="文本入库分句长度限制", |
|
interactive=True, visible=True) |
|
vs_path, loaded_files = local_doc_qa.init_knowledge_vector_store(os.path.join(KB_ROOT_PATH, vs_id, "content"), |
|
vs_path, sentence_size) |
|
model_status = """知识库构建成功""" |
|
except Exception as e: |
|
logger.error(e) |
|
model_status = """知识库构建未成功""" |
|
logger.info(model_status) |
|
return history + [[None, model_status]] |
|
|
|
|
|
def refresh_vs_list(): |
|
return gr.update(choices=get_vs_list()), gr.update(choices=get_vs_list()) |
|
|
|
|
|
def delete_file(vs_id, files_to_delete, chatbot): |
|
vs_path = os.path.join(KB_ROOT_PATH, vs_id, "vector_store") |
|
content_path = os.path.join(KB_ROOT_PATH, vs_id, "content") |
|
docs_path = [os.path.join(content_path, file) for file in files_to_delete] |
|
status = local_doc_qa.delete_file_from_vector_store(vs_path=vs_path, |
|
filepath=docs_path) |
|
if "fail" not in status: |
|
for doc_path in docs_path: |
|
if os.path.exists(doc_path): |
|
os.remove(doc_path) |
|
rested_files = local_doc_qa.list_file_from_vector_store(vs_path) |
|
if "fail" in status: |
|
vs_status = "文件删除失败。" |
|
elif len(rested_files) > 0: |
|
vs_status = "文件删除成功。" |
|
else: |
|
vs_status = f"文件删除成功,知识库{vs_id}中无已上传文件,请先上传文件后,再开始提问。" |
|
logger.info(",".join(files_to_delete) + vs_status) |
|
chatbot = chatbot + [[None, vs_status]] |
|
return gr.update(choices=local_doc_qa.list_file_from_vector_store(vs_path), value=[]), chatbot |
|
|
|
|
|
def delete_vs(vs_id, chatbot): |
|
try: |
|
shutil.rmtree(os.path.join(KB_ROOT_PATH, vs_id)) |
|
status = f"成功删除知识库{vs_id}" |
|
logger.info(status) |
|
chatbot = chatbot + [[None, status]] |
|
return gr.update(choices=get_vs_list(), value=get_vs_list()[0]), gr.update(visible=True), gr.update( |
|
visible=True), \ |
|
gr.update(visible=False), chatbot, gr.update(visible=False) |
|
except Exception as e: |
|
logger.error(e) |
|
status = f"删除知识库{vs_id}失败" |
|
chatbot = chatbot + [[None, status]] |
|
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), \ |
|
gr.update(visible=True), chatbot, gr.update(visible=True) |
|
|
|
|
|
block_css = """.importantButton { |
|
background: linear-gradient(45deg, #7e0570,#5d1c99, #6e00ff) !important; |
|
border: none !important; |
|
} |
|
.importantButton:hover { |
|
background: linear-gradient(45deg, #ff00e0,#8500ff, #6e00ff) !important; |
|
border: none !important; |
|
}""" |
|
|
|
webui_title = """ |
|
# 🎉Welcome Python bot🎉 |
|
""" |
|
|
|
init_message = f"""欢迎使用 Python bot! |
|
|
|
在下侧对话框输入问题后,按下Shift+回车即可换行继续输入,按下回车即可获得回复! |
|
|
|
|
|
若想询问程序报错相关问题,将报错信息最后的报错原因贴上来即可。 |
|
|
|
""" |
|
|
|
|
|
model_status = init_model() |
|
|
|
default_theme_args = dict( |
|
font=["Source Sans Pro", 'ui-sans-serif', 'system-ui', 'sans-serif'], |
|
font_mono=['IBM Plex Mono', 'ui-monospace', 'Consolas', 'monospace'], |
|
) |
|
|
|
with gr.Blocks(css=block_css, theme=gr.themes.Default(**default_theme_args)) as demo: |
|
vs_path, file_status, model_status = gr.State( |
|
os.path.join(KB_ROOT_PATH, get_vs_list()[0], "vector_store") if len(get_vs_list()) > 1 else ""), gr.State( |
|
""), gr.State( |
|
model_status) |
|
gr.Markdown(webui_title) |
|
with gr.Tab("对话"): |
|
with gr.Row(): |
|
with gr.Column(scale=10): |
|
chatbot = gr.Chatbot([[None, init_message], [None, model_status.value]], |
|
elem_id="chat-box", |
|
show_label=False).style(height=750) |
|
query = gr.Textbox(show_label=False, |
|
placeholder="请输入提问内容,按回车进行提交").style(container=False) |
|
|
|
mode = gr.Radio(["知识库问答"], |
|
show_label=False, |
|
value="知识库问答" ) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
flag_csv_logger.setup([query, vs_path, chatbot, mode], "student_log") |
|
query.submit(get_answer, |
|
[query, vs_path, chatbot, mode], |
|
[chatbot, query]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gradio_callback(inputs, outputs): |
|
|
|
username = inputs['username'] |
|
|
|
print("Current username:", username) |
|
|
|
def student(): |
|
hy1_path = "/home/wsy/Langchain-chat/Langchain-Chatchat/stuendt/hy_student1.xlsx" |
|
hy2_path = "/home/wsy/Langchain-chat/Langchain-Chatchat/stuendt/hy_student2.xlsx" |
|
lhj_path = "/home/wsy/Langchain-chat/Langchain-Chatchat/stuendt/lhj_student.xlsx" |
|
ygc_path = "/home/wsy/Langchain-chat/Langchain-Chatchat/stuendt/ygc_student.xlsx" |
|
yl_path = "/home/wsy/Langchain-chat/Langchain-Chatchat/stuendt/yl_student.xlsx" |
|
zsg1_path = "/home/wsy/Langchain-chat/Langchain-Chatchat/stuendt/zsg_student1.xlsx" |
|
zsg2_path = "/home/wsy/Langchain-chat/Langchain-Chatchat/stuendt/zsg_student2.xlsx" |
|
|
|
hy1_student_data = pd.DataFrame(pd.read_excel(hy1_path)) |
|
hy2_student_data = pd.DataFrame(pd.read_excel(hy2_path)) |
|
lhj_student_data = pd.DataFrame(pd.read_excel(lhj_path)) |
|
ygc_student_data = pd.DataFrame(pd.read_excel(ygc_path)) |
|
yl_student_data = pd.DataFrame(pd.read_excel(yl_path)) |
|
zsg1_student_data = pd.DataFrame(pd.read_excel(zsg1_path)) |
|
zsg2_student_data = pd.DataFrame(pd.read_excel(zsg2_path)) |
|
|
|
hy1_student = list(hy1_student_data[['姓名', '学号']].apply(tuple, axis=1)) |
|
hy2_student = list(hy2_student_data[['姓名', '学号']].apply(tuple, axis=1)) |
|
lhj_student = list(lhj_student_data[['姓名', '学号']].apply(tuple, axis=1)) |
|
ygc_student = list(ygc_student_data[['姓名', '学号']].apply(tuple, axis=1)) |
|
yl_student = list(yl_student_data[['姓名', '学号']].apply(tuple, axis=1)) |
|
zsg1_student = list(zsg1_student_data[['姓名', '学号']].apply(tuple, axis=1)) |
|
zsg2_student = list(zsg2_student_data[['姓名', '学号']].apply(tuple, axis=1)) |
|
|
|
student = hy1_student + hy2_student + lhj_student + ygc_student + yl_student + zsg1_student + zsg2_student |
|
for i in range(len(student)): |
|
password = student[i][1] |
|
student[i] = (student[i][0], str(password)) |
|
|
|
return student |
|
|
|
def login(x, y): |
|
users = student() |
|
for username, password in users: |
|
if username == x and password == y: |
|
global user |
|
user = username |
|
return x, y |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(demo |
|
.queue(concurrency_count=30) |
|
.launch(server_name='0.0.0.0', |
|
server_port=7860, |
|
show_api=False, |
|
share=False, |
|
inbrowser=False, |
|
auth=login) |
|
) |
|
|
|
|
|
|
|
|