File size: 6,799 Bytes
5449492 cfdb7b7 5449492 cfdb7b7 5449492 5b7bbbc 5449492 cfdb7b7 5449492 5b7bbbc 5449492 cfdb7b7 5449492 7ba780f 5449492 8543e0a 5449492 7ba780f cfdb7b7 7ba780f cfdb7b7 ae1f715 cfdb7b7 7ba780f cfdb7b7 5449492 ae1f715 5449492 cfdb7b7 ae1f715 cfdb7b7 5449492 ae1f715 5449492 ae1f715 5449492 ae1f715 5449492 ae1f715 5449492 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import logging
import shutil
from pathlib import Path
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger("chunking")
from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables.base import RunnableSequence
from langchain_core.vectorstores import VectorStore
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_community.vectorstores import Qdrant
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_experimental.text_splitter import SemanticChunker
from globals import (
embeddings,
gpt35_model,
gpt4_model,
META_10K_FILE_PATH,
META_SEMANTIC_COLLECTION,
VECTOR_STORE_PATH
)
USE_MEMORY = True
from qdrant_client import QdrantClient
qclient: QdrantClient
if USE_MEMORY == True:
qclient = QdrantClient(":memory:")
else:
qclient = QdrantClient(path=VECTOR_STORE_PATH)
RAG_PROMPT = """
Reply the user's query thoughtfully and clearly.
You should only respond to user's query if the context is related to the query.
If you are not sure how to answer, please reply "I don't know".
Respond with structure in markdown.
CONTEXT:
{context}
QUERY:
{question}
YOUR REPLY: """
rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT)
class SemanticStoreFactory:
_semantic_vectorstore: VectorStore = None
@classmethod
def __load_semantic_store(
cls
) -> VectorStore:
path = Path(VECTOR_STORE_PATH)
store = None
# check if path exists and if it is not empty
if path.exists() and path.is_dir() and any(path.iterdir()):
_logger.info(f"\tQdrant loading ...")
store = Qdrant(
client=qclient,
embeddings=embeddings,
collection_name=META_SEMANTIC_COLLECTION,
)
else:
_logger.info(f"\tQdrant creating ...")
store = cls.__create_semantic_store()
return store
@classmethod
def __create_semantic_store(
cls
) -> VectorStore:
if USE_MEMORY == True:
_logger.info(f"creating semantic vector store: {USE_MEMORY}")
else:
_logger.info(f"creating semantic vector store: {VECTOR_STORE_PATH}")
path = Path(VECTOR_STORE_PATH)
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
_logger.info(f"Directory '{path}' created.")
_logger.info(f"loading {META_10K_FILE_PATH}")
documents = PyMuPDFLoader(META_10K_FILE_PATH).load()
_logger.info(f"\tload { len(documents) } docs")
semantic_chunker = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile"
)
semantic_chunks = semantic_chunker.create_documents([d.page_content for d in documents])
_logger.info(f"created semantic_chunks: {len(semantic_chunks)}")
if USE_MEMORY == True:
_logger.info(f"\t==> creating memory vectorstore ...")
semantic_chunk_vectorstore = Qdrant.from_documents(
semantic_chunks,
embeddings,
location=":memory:",
collection_name=META_SEMANTIC_COLLECTION,
force_recreate=True
)
_logger.info(f"\t==> finished constructing vectorstore")
else:
semantic_chunk_vectorstore = Qdrant.from_documents(
semantic_chunks,
embeddings,
path=VECTOR_STORE_PATH,
collection_name=META_SEMANTIC_COLLECTION,
force_recreate=True
)
_logger.info(f"\t==> return vectorstore")
return semantic_chunk_vectorstore
@classmethod
def get_semantic_store(
cls
) -> VectorStore:
_logger.info(f"get_semantic_store")
if cls._semantic_vectorstore is None:
if USE_MEMORY == True:
cls._semantic_vectorstore = cls.__create_semantic_store()
_logger.info(f"received semantic_vectorstore")
else:
print(f"Loading semantic vectorstore {META_SEMANTIC_COLLECTION} from: {VECTOR_STORE_PATH}")
try:
# first try to load the store
cls._semantic_vectorstore = cls.__load_semantic_store()
except Exception as e:
_logger.warning(f"cannot load: {e}")
cls._semantic_vectorstore = cls.__create_semantic_store()
_logger.info(f"RETURNING get_semantic_store")
return cls._semantic_vectorstore
class SemanticRAGChainFactory:
_chain: RunnableSequence = None
@classmethod
def get_semantic_rag_chain(
cls
) -> RunnableSequence:
if cls._chain is None:
_logger.info(f"creating SemanticRAGChainFactory")
semantic_store = SemanticStoreFactory.get_semantic_store()
_logger.info(f"\treceived semantic_store")
if semantic_store is not None:
semantic_chunk_retriever = semantic_store.as_retriever()
semantic_mquery_retriever = MultiQueryRetriever.from_llm(
retriever=semantic_chunk_retriever,
llm=gpt4_model
)
cls._chain = (
# INVOKE CHAIN WITH: {"question" : "<<SOME USER QUESTION>>"}
# "question" : populated by getting the value of the "question" key
# "context" : populated by getting the value of the "question" key and chaining it into the base_retriever
{"context": itemgetter("question") | semantic_mquery_retriever, "question": itemgetter("question")}
# "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step)
# by getting the value of the "context" key from the previous step
| RunnablePassthrough.assign(context=itemgetter("context"))
# "response" : the "context" and "question" values are used to format our prompt object and then piped
# into the LLM and stored in a key called "response"
# "context" : populated by getting the value of the "context" key from the previous step
| {"response": rag_prompt | gpt4_model, "context": itemgetter("context")}
)
_logger.info(f"\t_chain constructed")
return cls._chain |