# 悟了悟了RAG使用 ## 一、前言 ### RAG一般流程 - 将原始数据切分后向量化,制作成向量数据库 - 对用户输入的问题进行 embedding - 基于 embedding 结果在向量数据库中进行检索 - 对召回数据重排序(选择和问题更接近的结果) - 依据用户问题和召回数据生成最后的结果 悟了悟了默认`data`目录为txt数据源目录,开启RAG后,会使用bce-embedding-base_v1自动将`data`目录下的txt数据转为换chroma向量库数据,存放在`rag/chroma `目录下(如果该目录下已有数据库文件,则跳过数据库创建),然后使用bce-reranker-base_v1对检索到的信息重排序后,将问题和上下文一起给模型得到最终输出。`rag/simple_rag.py`里是一个简单的demo,参数配置见`configs/rag_cfg.yaml`。 [LangChain](https://python.langchain.com/docs/concepts/rag/)在这块的工具比较好,各种功能都有,本模型的RAG是基于LangChain进行开发的。 ## 二、数据库制作 数据库制作代码在`rag/chroma_db.py`中。首先会将txt文本切分成小块,类似此前的增量预训练数据制作,此部分代码不再赘述。 切分后的文本可以直接使用 langchain_community.vectorstores 中的 Chroma制作向量数据库,并将数据库做一个持久化 ``` # 加载数据库 vectordb = Chroma.from_documents( documents=split_docs, embedding=embeddings_model, persist_directory=persist_directory) vectordb.persist() #数据库做持久化 ``` 另外还有一个[Faiss](https://faiss.ai/)数据库,也是主流使用的。Faiss是一个用于高效相似性搜索和密集向量聚类的库。它包含的算法可以搜索任意大小的向量集。langchain已经整合过FAISS,[FAISS in Langchain](https://python.langchain.com/docs/integrations/vectorstores/faiss) ## 三、rag调用 基于LangChain的RAG实现比较简单,需要一个Embeddings和reranker模型,从数据库中提取和输入问题最相关的材料,再把输入问题和对应材料合在一起(prompt中),统一喂给基础的LLM生成最终的答案,prompt类似如下: ``` '材料:“{}”\n 问题:“{}” \n 请仔细阅读参考材料回答问题。' ``` 具体实现参考`rag/simple_rag.py`,核心部分是如下代码, self.llm 可以换成任意模型或者api接口,只要能输入文本,输出文字结果就行; ```python class WuleRAG(): """ 存储检索问答链的对象 """ def __init__(self, data_source_dir, db_persist_directory, base_mode, embeddings_model, reranker_model, rag_prompt_template): # 加载自定义 LLM self.llm = base_mode # 定义 Embeddings self.embeddings = HuggingFaceEmbeddings(model_name=embeddings_model, model_kwargs={"device": "cuda"}, encode_kwargs={"batch_size": 1, "normalize_embeddings": True}) reranker_args = {'model': reranker_model, 'top_n': 5, 'device': 'cuda', "use_fp16": True} self.reranker = BCERerank(**reranker_args) vectordb = get_chroma_db(data_source_dir, db_persist_directory, self.embeddings) # 创建基础检索器 # retriever = vectordb.as_retriever(search_type="similarity", search_kwargs={"score_threshold": 0.3, "k": 2}) self.retriever = vectordb.as_retriever(search_kwargs={"k": 3, "score_threshold": 0.6}, search_type="similarity_score_threshold" ) # 创建上下文压缩检索器 self.compression_retriever = ContextualCompressionRetriever( base_compressor=self.reranker, base_retriever=self.retriever ) # 定义包含 system prompt 的模板 self.PROMPT = PromptTemplate( template=rag_prompt_template, input_variables=["context", "question"] ) # 创建 RetrievalQA 链,包含自定义 prompt self.qa_chain = RetrievalQA.from_chain_type( llm=self.llm, chain_type="stuff", # "stuff"、"map_reduce"、"refine"、"map_rerank" retriever=self.compression_retriever, return_source_documents=True, chain_type_kwargs={"prompt": self.PROMPT} ) def query_stream(self, query: str) -> Iterator[str]: docs = self.compression_retriever.get_relevant_documents(query) context = "\n\n".join(doc.page_content for doc in docs) prompt = self.PROMPT.format(context=context, question=query) return self.llm.stream(prompt) def query(self, question): """ 调用问答链进行回答,如果没有找到相关文档,则使用模型自身的回答 #使用示例 question='黑神话悟空发售时间和团队?主要讲了什么故事?' result = self.qa_chain({"query": question}) print(result["result"]) """ if not question: return "请提供个有用的问题。" try: # 使用检索链来获取相关文档 result = self.qa_chain.invoke({"query": question}) # logging.info(f"Get rag res:\n{result}") if 'result' in result: answer = result['result'] final_answer = re.sub(r'^根据提供的信息,\s?', '', answer, flags=re.M).strip() return final_answer else: logging.error("Error: 'result' field not found in the result.") return "悟了悟了目前无法提供答案,请稍后再试。" except Exception as e: # 打印更详细的错误信息,包括traceback import traceback logging.error(f"An error occurred: {e}\n{traceback.format_exc()}") return "悟了悟了遇到了一些技术问题,正在修复中。" ```