File size: 6,799 Bytes
5449492
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cfdb7b7
 
5449492
cfdb7b7
 
 
 
 
 
5449492
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b7bbbc
5449492
cfdb7b7
5449492
 
 
 
5b7bbbc
5449492
 
 
 
 
 
 
 
cfdb7b7
 
 
 
 
 
 
 
5449492
7ba780f
5449492
8543e0a
5449492
 
 
 
 
7ba780f
cfdb7b7
7ba780f
cfdb7b7
 
 
 
 
 
 
ae1f715
cfdb7b7
 
 
 
 
 
 
 
7ba780f
cfdb7b7
5449492
 
 
 
 
 
ae1f715
5449492
cfdb7b7
 
ae1f715
cfdb7b7
 
 
 
 
 
 
 
5449492
ae1f715
5449492
 
 
 
 
 
 
 
 
 
ae1f715
5449492
ae1f715
5449492
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ae1f715
5449492
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import logging
import shutil
from pathlib import Path

logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger("chunking")

from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables.base import RunnableSequence
from langchain_core.vectorstores import VectorStore
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_community.vectorstores import Qdrant
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_experimental.text_splitter import SemanticChunker

from globals import (
    embeddings,
    gpt35_model,
    gpt4_model,
    META_10K_FILE_PATH,
    META_SEMANTIC_COLLECTION,
    VECTOR_STORE_PATH
)


USE_MEMORY = True
from qdrant_client import QdrantClient

qclient: QdrantClient
if USE_MEMORY == True:
    qclient = QdrantClient(":memory:")
else:
    qclient = QdrantClient(path=VECTOR_STORE_PATH)


RAG_PROMPT = """
Reply the user's query thoughtfully and clearly.
You should only respond to user's query if the context is related to the query.  
If you are not sure how to answer, please reply "I don't know".
Respond with structure in markdown.

CONTEXT:
{context}

QUERY:
{question}

YOUR REPLY: """

rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT)


class SemanticStoreFactory:
    _semantic_vectorstore: VectorStore = None

    @classmethod
    def __load_semantic_store(
        cls
    ) -> VectorStore:
        path = Path(VECTOR_STORE_PATH)
        store = None
        # check if path exists and if it is not empty
        if path.exists() and path.is_dir() and any(path.iterdir()):
            _logger.info(f"\tQdrant loading ...")
            store = Qdrant(
                client=qclient, 
                embeddings=embeddings, 
                collection_name=META_SEMANTIC_COLLECTION,
            )
        else:
            _logger.info(f"\tQdrant creating ...")
            store = cls.__create_semantic_store()
        return store

    @classmethod
    def __create_semantic_store(
        cls
    ) -> VectorStore:
        
        if USE_MEMORY == True:
            _logger.info(f"creating semantic vector store: {USE_MEMORY}")
        else:
            _logger.info(f"creating semantic vector store: {VECTOR_STORE_PATH}")
            path = Path(VECTOR_STORE_PATH)
            if not path.exists():
                path.mkdir(parents=True, exist_ok=True)
                _logger.info(f"Directory '{path}' created.")

        _logger.info(f"loading {META_10K_FILE_PATH}")
        documents = PyMuPDFLoader(META_10K_FILE_PATH).load()
        _logger.info(f"\tload { len(documents) } docs")
        semantic_chunker = SemanticChunker(
            embeddings=embeddings,
            breakpoint_threshold_type="percentile"
        ) 
        semantic_chunks = semantic_chunker.create_documents([d.page_content for d in documents])
        _logger.info(f"created semantic_chunks: {len(semantic_chunks)}")
        if USE_MEMORY == True:
            _logger.info(f"\t==> creating memory vectorstore ...")
            semantic_chunk_vectorstore = Qdrant.from_documents(
                semantic_chunks,
                embeddings,
                location=":memory:",
                collection_name=META_SEMANTIC_COLLECTION,
                force_recreate=True
            )
            _logger.info(f"\t==> finished constructing vectorstore")
        else:
            semantic_chunk_vectorstore = Qdrant.from_documents(
                semantic_chunks,
                embeddings,
                path=VECTOR_STORE_PATH,
                collection_name=META_SEMANTIC_COLLECTION,
                force_recreate=True
            )
        _logger.info(f"\t==> return vectorstore")

        return semantic_chunk_vectorstore

    @classmethod
    def get_semantic_store(
        cls
    ) -> VectorStore:
        _logger.info(f"get_semantic_store")
        if cls._semantic_vectorstore is None:
            if USE_MEMORY == True:
                cls._semantic_vectorstore = cls.__create_semantic_store()
                _logger.info(f"received semantic_vectorstore")
            else:
                print(f"Loading semantic vectorstore {META_SEMANTIC_COLLECTION} from: {VECTOR_STORE_PATH}")
                try:
                    # first try to load the store
                    cls._semantic_vectorstore = cls.__load_semantic_store()
                except Exception as e:
                    _logger.warning(f"cannot load: {e}")
                    cls._semantic_vectorstore = cls.__create_semantic_store()

        _logger.info(f"RETURNING get_semantic_store")
        return cls._semantic_vectorstore 

class SemanticRAGChainFactory:
    _chain: RunnableSequence = None

    @classmethod
    def get_semantic_rag_chain(
        cls
    ) -> RunnableSequence:
        if cls._chain is None:
            _logger.info(f"creating SemanticRAGChainFactory")
            semantic_store = SemanticStoreFactory.get_semantic_store()
            _logger.info(f"\treceived semantic_store")
            if semantic_store is not None:
                semantic_chunk_retriever = semantic_store.as_retriever()
                semantic_mquery_retriever = MultiQueryRetriever.from_llm(
                    retriever=semantic_chunk_retriever, 
                    llm=gpt4_model
                )
                cls._chain =  (
                    # INVOKE CHAIN WITH: {"question" : "<<SOME USER QUESTION>>"}
                    # "question" : populated by getting the value of the "question" key
                    # "context"  : populated by getting the value of the "question" key and chaining it into the base_retriever
                    {"context": itemgetter("question") | semantic_mquery_retriever, "question": itemgetter("question")}
                    # "context"  : is assigned to a RunnablePassthrough object (will not be called or considered in the next step)
                    #              by getting the value of the "context" key from the previous step
                    | RunnablePassthrough.assign(context=itemgetter("context"))
                    # "response" : the "context" and "question" values are used to format our prompt object and then piped
                    #              into the LLM and stored in a key called "response"
                    # "context"  : populated by getting the value of the "context" key from the previous step
                    | {"response": rag_prompt | gpt4_model, "context": itemgetter("context")}
                )
                _logger.info(f"\t_chain constructed")

        return cls._chain