|
import logging |
|
import shutil |
|
from pathlib import Path |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
_logger = logging.getLogger("chunking") |
|
|
|
from operator import itemgetter |
|
from langchain_core.prompts import ChatPromptTemplate |
|
from langchain_core.runnables.base import RunnableSequence |
|
from langchain_core.vectorstores import VectorStore |
|
from langchain.retrievers.multi_query import MultiQueryRetriever |
|
from langchain_community.vectorstores import Qdrant |
|
from langchain.schema.output_parser import StrOutputParser |
|
from langchain.schema.runnable import RunnablePassthrough |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.document_loaders import PyMuPDFLoader |
|
from langchain_experimental.text_splitter import SemanticChunker |
|
|
|
from globals import ( |
|
embeddings, |
|
gpt35_model, |
|
gpt4_model, |
|
META_10K_FILE_PATH, |
|
META_SEMANTIC_COLLECTION, |
|
VECTOR_STORE_PATH |
|
) |
|
|
|
|
|
USE_MEMORY = True |
|
from qdrant_client import QdrantClient |
|
|
|
qclient: QdrantClient |
|
if USE_MEMORY == True: |
|
qclient = QdrantClient(":memory:") |
|
else: |
|
qclient = QdrantClient(path=VECTOR_STORE_PATH) |
|
|
|
|
|
RAG_PROMPT = """ |
|
Reply the user's query thoughtfully and clearly. |
|
You should only respond to user's query if the context is related to the query. |
|
If you are not sure how to answer, please reply "I don't know". |
|
Respond with structure in markdown. |
|
|
|
CONTEXT: |
|
{context} |
|
|
|
QUERY: |
|
{question} |
|
|
|
YOUR REPLY: """ |
|
|
|
rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT) |
|
|
|
|
|
class SemanticStoreFactory: |
|
_semantic_vectorstore: VectorStore = None |
|
|
|
@classmethod |
|
def __load_semantic_store( |
|
cls |
|
) -> VectorStore: |
|
path = Path(VECTOR_STORE_PATH) |
|
store = None |
|
|
|
if path.exists() and path.is_dir() and any(path.iterdir()): |
|
_logger.info(f"\tQdrant loading ...") |
|
store = Qdrant( |
|
client=qclient, |
|
embeddings=embeddings, |
|
collection_name=META_SEMANTIC_COLLECTION, |
|
) |
|
else: |
|
_logger.info(f"\tQdrant creating ...") |
|
store = cls.__create_semantic_store() |
|
return store |
|
|
|
@classmethod |
|
def __create_semantic_store( |
|
cls |
|
) -> VectorStore: |
|
|
|
if USE_MEMORY == True: |
|
_logger.info(f"creating semantic vector store: {USE_MEMORY}") |
|
else: |
|
_logger.info(f"creating semantic vector store: {VECTOR_STORE_PATH}") |
|
path = Path(VECTOR_STORE_PATH) |
|
if not path.exists(): |
|
path.mkdir(parents=True, exist_ok=True) |
|
_logger.info(f"Directory '{path}' created.") |
|
|
|
_logger.info(f"loading {META_10K_FILE_PATH}") |
|
documents = PyMuPDFLoader(META_10K_FILE_PATH).load() |
|
_logger.info(f"\tload { len(documents) } docs") |
|
semantic_chunker = SemanticChunker( |
|
embeddings=embeddings, |
|
breakpoint_threshold_type="percentile" |
|
) |
|
semantic_chunks = semantic_chunker.create_documents([d.page_content for d in documents]) |
|
_logger.info(f"created semantic_chunks: {len(semantic_chunks)}") |
|
if USE_MEMORY == True: |
|
_logger.info(f"\t==> creating memory vectorstore ...") |
|
semantic_chunk_vectorstore = Qdrant.from_documents( |
|
semantic_chunks, |
|
embeddings, |
|
location=":memory:", |
|
collection_name=META_SEMANTIC_COLLECTION, |
|
force_recreate=True |
|
) |
|
_logger.info(f"\t==> finished constructing vectorstore") |
|
else: |
|
semantic_chunk_vectorstore = Qdrant.from_documents( |
|
semantic_chunks, |
|
embeddings, |
|
path=VECTOR_STORE_PATH, |
|
collection_name=META_SEMANTIC_COLLECTION, |
|
force_recreate=True |
|
) |
|
_logger.info(f"\t==> return vectorstore") |
|
|
|
return semantic_chunk_vectorstore |
|
|
|
@classmethod |
|
def get_semantic_store( |
|
cls |
|
) -> VectorStore: |
|
_logger.info(f"get_semantic_store") |
|
if cls._semantic_vectorstore is None: |
|
if USE_MEMORY == True: |
|
cls._semantic_vectorstore = cls.__create_semantic_store() |
|
_logger.info(f"received semantic_vectorstore") |
|
else: |
|
print(f"Loading semantic vectorstore {META_SEMANTIC_COLLECTION} from: {VECTOR_STORE_PATH}") |
|
try: |
|
|
|
cls._semantic_vectorstore = cls.__load_semantic_store() |
|
except Exception as e: |
|
_logger.warning(f"cannot load: {e}") |
|
cls._semantic_vectorstore = cls.__create_semantic_store() |
|
|
|
_logger.info(f"RETURNING get_semantic_store") |
|
return cls._semantic_vectorstore |
|
|
|
class SemanticRAGChainFactory: |
|
_chain: RunnableSequence = None |
|
|
|
@classmethod |
|
def get_semantic_rag_chain( |
|
cls |
|
) -> RunnableSequence: |
|
if cls._chain is None: |
|
_logger.info(f"creating SemanticRAGChainFactory") |
|
semantic_store = SemanticStoreFactory.get_semantic_store() |
|
_logger.info(f"\treceived semantic_store") |
|
if semantic_store is not None: |
|
semantic_chunk_retriever = semantic_store.as_retriever() |
|
semantic_mquery_retriever = MultiQueryRetriever.from_llm( |
|
retriever=semantic_chunk_retriever, |
|
llm=gpt4_model |
|
) |
|
cls._chain = ( |
|
|
|
|
|
|
|
{"context": itemgetter("question") | semantic_mquery_retriever, "question": itemgetter("question")} |
|
|
|
|
|
| RunnablePassthrough.assign(context=itemgetter("context")) |
|
|
|
|
|
|
|
| {"response": rag_prompt | gpt4_model, "context": itemgetter("context")} |
|
) |
|
_logger.info(f"\t_chain constructed") |
|
|
|
return cls._chain |