Spaces:
Runtime error
Runtime error
#import gradio as gr | |
#import cv2 | |
#def to_black(image): | |
# output = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
# return output | |
#interface = gr.Interface(fn=to_black, inputs="image", outputs="image") | |
#print('here') | |
#interface.launch() | |
#print(share_url) | |
#print(local_url) | |
#print(app) | |
#interface.launch(inbrowser =True, share=True, port=8888) | |
#url = interface.share() | |
#print(url) | |
from langchain.chains import RetrievalQA | |
from langchain.document_loaders import UnstructuredFileLoader, TextLoader, CSVLoader | |
from langchain.document_loaders import CSVLoader | |
from langchain.document_loaders import TextLoader | |
from langchain.vectorstores import DocArrayInMemorySearch | |
from langchain.indexes import VectorstoreIndexCreator | |
from langchain.prompts import PromptTemplate | |
from langchain.embeddings.huggingface import HuggingFaceEmbeddings | |
from langchain import HuggingFacePipeline | |
import torch | |
from langchain.vectorstores import FAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.chains.base import Chain | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain.chains.summarize import load_summarize_chain | |
import gradio as gr | |
from typing import List | |
from tqdm import tqdm | |
import logging | |
import argparse | |
import os | |
import string | |
CHUNK_SIZE=600 | |
CHUNK_OVERLAP = 100 | |
SEARCH_TOP_K = 5 | |
logger = logging.getLogger("bio_LLM_logger") | |
def tree(filepath, ignore_dir_names=None, ignore_file_names=None): | |
"""返回两个列表,第一个列表为 filepath 下全部文件的完整路径, 第二个为对应的文件名""" | |
if ignore_dir_names is None: | |
ignore_dir_names = [] | |
if ignore_file_names is None: | |
ignore_file_names = [] | |
ret_list = [] | |
if isinstance(filepath, str): | |
if not os.path.exists(filepath): | |
print("路径不存在") | |
return None, None | |
elif os.path.isfile(filepath) and os.path.basename(filepath) not in ignore_file_names: | |
return [filepath], [os.path.basename(filepath)] | |
elif os.path.isdir(filepath) and os.path.basename(filepath) not in ignore_dir_names: | |
for file in os.listdir(filepath): | |
fullfilepath = os.path.join(filepath, file) | |
if os.path.isfile(fullfilepath) and os.path.basename(fullfilepath) not in ignore_file_names: | |
ret_list.append(fullfilepath) | |
if os.path.isdir(fullfilepath) and os.path.basename(fullfilepath) not in ignore_dir_names: | |
ret_list.extend(tree(fullfilepath, ignore_dir_names, ignore_file_names)[0]) | |
return ret_list, [os.path.basename(p) for p in ret_list] | |
def load_file(file_path, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP): | |
if file_path.lower().endswith(".pdf"): | |
loader = UnstructuredFileLoader(file_path, mode="elements") | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap= chunk_overlap) | |
docs = loader.load_and_split(text_splitter=text_splitter) | |
elif file_path.lower().endswith(".txt"): | |
loader = TextLoader(file_path, autodetect_encoding=True) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap= chunk_overlap) | |
docs = loader.load_and_split(text_splitter=text_splitter) | |
elif file_path.lower().endswith(".csv"): | |
loader = CSVLoader(file_path) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap= chunk_overlap) | |
docs = loader.load_and_split(text_splitter=text_splitter) | |
else: | |
print("unsupported the file format") | |
return docs | |
#class summary_chain: | |
# def init_cfg(self, | |
# llm_model: Chain, | |
def summary(model, chain_type, PROMPT, REFINE_PROMPT,docs): | |
if chain_type == "stuff": | |
chain = load_summarize_chain(model, chain_type="stuff", prompt=PROMPT) | |
elif chain_type == "refine": | |
chain = load_summarize_chain(model, chain_type="refine", question_prompt=PROMPT, refine_prompt=REFINE_PROMPT) | |
print(chain.run(docs)) | |
class QA_Localdb: | |
llm_model_chain: Chain = None | |
embeddings: object = None | |
top_k: int = SEARCH_TOP_K | |
chunk_size: int = CHUNK_SIZE | |
def init_cfg(self, | |
llm_model: Chain, | |
embedding_model: str, | |
#embedding_device: str, | |
top_k = SEARCH_TOP_K, | |
): | |
self.llm_model_chain = llm_model | |
self.embeddings = HuggingFaceEmbeddings(model_name = embedding_model) | |
self.top_k = top_k | |
def init_knowledge_vector_store(self, | |
file_path: str or List[str], | |
vectorstore_path: str or os.PathLike = None, | |
): | |
loaded_files = [] | |
failed_files = [] | |
if isinstance(file_path, str): | |
if not os.path.exists(file_path): | |
print("unknown path") | |
return None | |
elif os.path.isfile(file_path): | |
file = os.path.split(file_path)[-1] | |
try: | |
docs = load_file(file_path) | |
logger.info(f"{file} sucessful loaded") | |
loaded_files.append(file_path) | |
except Exception as e: | |
logger.error(e) | |
logger.info(f"{file} unsucessful loaded") | |
return None | |
elif os.path.isdir(file_path): | |
docs=[] | |
for fullfilepath, file in tqdm(zip(*tree(file_path, ignore_dir_names=['tmp_files'])), desc="load file"): | |
try: | |
docs += load_file(fullfilepath) | |
loaded_files.append(fullfilepath) | |
except Exception as e: | |
logger.error(e) | |
failed_files.append(file) | |
if len(failed_files) > 0: | |
logger.info('unloaded files are as follows') | |
for file in failed_files: | |
logger.info(f"{file}\n") | |
else: | |
docs = [] | |
for file in file_path: | |
try: | |
docs += load_file(file) | |
logger.info(f"{file} sucessful loaded") | |
loaded_files.append(file) | |
except Exception as e: | |
logger.error(e) | |
logger.info(f"{file} unsucessful loaded") | |
if len(docs) > 0: | |
logger.info("sucessful loaded, generating vector store") | |
if vectorstore_path and os.path.isdir(vectorstore_path) and "index.faiss" in os.listdir(vectorstore_path): | |
print("temp") | |
# vector_store = load_vector_store(vectorstore_path, self.embeddings) | |
# vector_store.add_documents(docs) | |
# torch_gc() | |
else: | |
if not vectorstore_path: | |
vectorstore_path = "" | |
vector_store = FAISS.from_documents(docs, self.embeddings) | |
#vector_store.save_local(vectorstore_path) | |
return vector_store, loaded_files | |
else: | |
logger.info("file load failed") | |
''' | |
def delete_file_from_vector_store(self, | |
filepath: str or List[str], | |
vs_path): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
status = vector_store.delete_doc(filepath) | |
return status | |
def update_file_from_vector_store(self, | |
filepath: str or List[str], | |
vs_path, | |
docs: List[Document], ): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
status = vector_store.update_doc(filepath, docs) | |
return status | |
def list_file_from_vector_store(self, | |
vs_path, | |
fullpath=False): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
docs = vector_store.list_docs() | |
if fullpath: | |
return docs | |
else: | |
return [os.path.split(doc)[-1] for doc in docs] | |
''' | |
def QA_model(): | |
# file_path = "/mnt/petrelfs/lvying/LLM/BoMA/data/test/OPUS-DSD.pdf" | |
file_path = "OPUS-BioLLM-v1/data/test/Interageting-Prior-into-DA.pdf" | |
# file_path = "/mnt/petrelfs/lvying/LLM/BoMA/data/test/Interageting-Prior-into-DA.pdf" | |
# file_path = "/mnt/petrelfs/lvying/LLM/BoMA/data/test/" | |
model_path = "/mnt/petrelfs/lvying/LLM/BoMA/models/LLM/Llama-2-13b-chat-hf" | |
embedding_path = "/mnt/petrelfs/lvying/LLM/BoMA/text2vec/instructor-xl/" | |
model = HuggingFacePipeline.from_model_id(model_id="daryl149/llama-2-7b-chat-hf", | |
task="text-generation", | |
model_kwargs={ | |
"torch_dtype" : torch.float16, | |
"low_cpu_mem_usage" : True, | |
"temperature": 0.2, | |
"max_length": 2048, | |
#"device_map": "auto", | |
"repetition_penalty":1.1} | |
) | |
print(model.model_id) | |
QA = QA_Localdb() | |
QA.init_cfg(llm_model=model, embedding_model = "sentence-transformers/paraphrase-MiniLM-L6-v2") | |
vector_store, _ =QA.init_knowledge_vector_store(file_path) | |
retriever = vector_store.as_retriever(search_kwargs={"k": 3}) | |
print("loading LLM...") | |
prompt_template = ("Below is an instruction that describes a task. " | |
"Write a response that appropriately completes the request.\n\n" | |
"### Instruction:\n{context}\n{question}\n\n### Response: ") | |
PROMPT = PromptTemplate( | |
template=prompt_template, input_variables=["context", "question"] | |
) | |
chain_type_kwargs = {"prompt": PROMPT} | |
#print(chain_type_kwargs) | |
''' | |
qa_stuff = RetrievalQA.from_chain_type( | |
llm = model, | |
chain_type="stuff", | |
retriever = retriever, | |
chain_type_kwargs = chain_type_kwargs, | |
# verbose = True | |
) | |
while True: | |
print("Input Qusetion:") | |
query = input() | |
if len(query.strip())==0: | |
break | |
print(qa_stuff.run(query)) | |
''' | |
''' | |
qa = ConversationalRetrievalChain.from_llm( | |
llm = QA.llm_model_chain, | |
chain_type="stuff", | |
retriever = retriever, | |
combine_docs_chain_kwargs = chain_type_kwargs, | |
# verbose = True | |
) | |
''' | |
qa = RetrievalQA.from_chain_type( | |
llm = QA.llm_model_chain, | |
chain_type="stuff", | |
retriever = retriever, | |
chain_type_kwargs = chain_type_kwargs, | |
# verbose = True | |
) | |
return qa | |
qa_temp = QA_model() | |
def temp(query): | |
return qa_temp.run(query) | |
def answer_question(query): | |
print(query) | |
chat_history = [] | |
threshold_history = 10 # Remembered historical conversations | |
i = 0 | |
if i>threshold_history: | |
chat_history = [] | |
print("Send a Message:") | |
#query = context | |
#if len(query.strip())==0: | |
# break | |
result = qa_temp({"question":query, "chat_history": chat_history}) | |
print(type(result["answer"])) | |
chat_history.append((query, result["answer"])) | |
i = i + 1 | |
resp = result["answer"] | |
return str(resp) | |
iface = gr.Interface( | |
fn = temp, | |
inputs="text", | |
outputs="text",) | |
#title="问答界面", | |
#description="输入问题和相关文本,得到问题的答案。", | |
#article="这里是相关的文本。可以输入一些段落或者问题的背景。", | |
#examples=[ | |
# ["Gradio是什么?", "Gradio是一个用于构建和部署机器学习模型的开源库。"], | |
# ["Python的创始人是谁?", "Python的创始人是Guido van Rossum。"] | |
#]) | |
#print(iface.launch(share=True)) | |
#print("======Finish======") | |
#share_url = iface.share() | |
#print(share_url) | |
iface.launch() | |