|
from uuid import uuid4 |
|
import time |
|
from qdrant_client import QdrantClient, models |
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
from langchain_qdrant import QdrantVectorStore |
|
from qdrant_client import QdrantClient |
|
from qdrant_client.http.models import Distance, VectorParams |
|
from langchain_core.documents import Document |
|
|
|
import json |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SemanticCache: |
|
def __init__(self, threshold=0.9): |
|
self.embeddings = HuggingFaceEmbeddings(model_name="dangvantuan/vietnamese-embedding") |
|
self.cache_client = QdrantClient(":memory:") |
|
self.cache_collection_name = "cache" |
|
self.cache_client.create_collection( |
|
collection_name=self.cache_collection_name, |
|
vectors_config=VectorParams(size=768, distance=Distance.COSINE), |
|
) |
|
self.vector_store = QdrantVectorStore( |
|
client=self.cache_client, |
|
collection_name=self.cache_collection_name, |
|
embedding=self.embeddings, |
|
) |
|
self.threshold = threshold |
|
self.cache_file = 'new_cache_file.json' |
|
self.init_cache_file = '/home/justtuananh/AI4TUAN/DOAN2024/offical/pipelines/semantic_cache/cache_init.json' |
|
self._load_cache_from_json() |
|
|
|
|
|
def _load_cache_from_json(self): |
|
"""Hàm để load cache từ file JSON và thêm vào vector_store""" |
|
try: |
|
with open(self.init_cache_file, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
for entry in data: |
|
question = entry.get('question') |
|
answer = entry.get('answer') |
|
if question and answer: |
|
|
|
document = Document( |
|
page_content=question, |
|
metadata={"answer": answer} |
|
) |
|
doc_id = str(uuid4()) |
|
self.vector_store.add_documents(documents=[document], ids=[doc_id]) |
|
print("Loaded into vector store!") |
|
except FileNotFoundError: |
|
print(f"Cache file {self.init_cache_file} not found, starting with empty cache.") |
|
except json.JSONDecodeError: |
|
print("Error parsing the JSON file. Please check the file format.") |
|
|
|
def search_cache(self, query): |
|
search_result = self.vector_store.similarity_search_with_score( |
|
collection_name=self.cache_collection_name, |
|
query=query, |
|
score_threshold = self.threshold, |
|
k=1 |
|
) |
|
for doc, score in search_result: |
|
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") |
|
if search_result: |
|
first_doc, first_score = search_result[0] |
|
return first_doc, first_score |
|
else: |
|
return [],[] |
|
|
|
def add_to_cache(self, question, response_text): |
|
document = Document( |
|
page_content=question, |
|
metadata={"answer": response_text} |
|
) |
|
doc_id = str(uuid4()) |
|
self.vector_store.add_documents(documents=[document], ids=[doc_id]) |
|
print(f"Question: {question} added to cache db!") |
|
|
|
|
|
def checker(self, question): |
|
start_time = time.time() |
|
first_doc, first_score = self.search_cache(question) |
|
if first_score and first_score >= self.threshold: |
|
print('cache-hit') |
|
print(f'Found cache with score {first_score:.3f}') |
|
elapsed_time = time.time() - start_time |
|
print(f'Match with {first_doc.page_content}') |
|
print(f"Time taken: {elapsed_time:.3f} seconds") |
|
return first_doc.metadata |
|
|
|
print("No answer found in Cache or Database") |
|
elapsed_time = time.time() - start_time |
|
print(f"Time taken: {elapsed_time:.3f} seconds") |
|
return None |
|
|
|
|
|
|