KhangPTT373pdf_qa / utils.py
KhangPTT373's picture
Upload folder using huggingface_hub
683c41b verified
import chromadb
# from .encoder import get_embedding
import uuid
from tqdm import tqdm
from config import settings
import os
import json
from logs.logger_config import logger
import requests
def get_embedding(content=""):
response = requests.post(
f"{settings.ENCODER_INFERENCE_ENDPOINT}/v1/embeddings",
json={"input": content, "model": settings.ENCODER_INFERENCE_MODEL},
)
# Check if the response is successful
if response.status_code == 200:
logger.bind(logger_name="encoder_inference").info("Encode done!")
else:
logger.bind(logger_name="encoder_inference").error(
"Error: {response.status_code} , {response.text} "
)
return response.json()["data"][0]["embedding"]
def generate_id():
return str(uuid.uuid4())
def init_pdfs(
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK,
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT,
):
if CHROMA_PERSISTENT_DISK != None:
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK)
else:
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT)
logger.bind(logger_name="system").info(chroma_client.heartbeat())
chroma_client.get_or_create_collection(
settings.PDF_COLLECTION_NAME, metadata={"hnsw:space": "cosine"}
)
logger.bind(logger_name="system").info(
f"Collection {settings.PDF_COLLECTION_NAME} initial successfully"
)
def add_pdfs(
pdfs=[],
chroma_client=None,
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK,
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT,
):
try:
if chroma_client == None:
if CHROMA_PERSISTENT_DISK != None:
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK)
else:
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT)
logger.bind(logger_name="system").info(chroma_client.heartbeat())
collection = chroma_client.get_collection(settings.PDF_COLLECTION_NAME)
except Exception as e:
logger.bind(logger_name="system").info("ERROR: ", e)
return "Fail"
for pdf in tqdm(pdfs):
try:
chunk = f"""Filename: {pdf["name_file"]}\n\nType: {pdf["type"]}\n\nContent: {pdf["content"]}"""
chunk_id = f"PDF-{generate_id()}"
embedding = get_embedding(chunk)
metadata = {"Filename": str(pdf["name_file"]),
"Type": str(pdf["type"]),
"Content": str(pdf["content"])
}
# print('metadata here: ')
# print([pdf])
collection.add(
embeddings=[embedding],
metadatas=[metadata],
# documents = documents,
ids=[chunk_id],
)
except Exception as e:
logger.bind(logger_name="system").info(f"ERROR: {e}")
logger.bind(logger_name="system").info(f"DROP: {pdf}")
return "Suck seed"
def format_pdfs(metadatas):
logger.bind(logger_name="system").info(metadatas)
# print(metadatas[0].keys())
outputs = ''
for pdf in metadatas:
try:
outputs += "\n\n" + f"""Filename: {pdf.get("Filename","None")}\nType: {pdf.get("Content","None")}\nContent: {pdf.get("Content","None")}"""
except:
print('Exception raised: ',pdf.keys())
logger.bind(logger_name="system").info(outputs)
return outputs
def query_pdfs(
query="",
chroma_client=None,
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK,
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT,
):
try:
if chroma_client == None:
if CHROMA_PERSISTENT_DISK != None:
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK)
else:
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT)
logger.bind(logger_name="system").info(chroma_client.heartbeat())
collection = chroma_client.get_collection(name=settings.PDF_COLLECTION_NAME)
except:
return f"Collection {settings.PDF_COLLECTION_NAME} not found"
query = "Represent this sentence for searching relevant passages: \n" + query
query_results = collection.query(query_embeddings=get_embedding(query), n_results=30)
logger.bind(logger_name="system").info(query_results)
return format_pdfs(query_results["metadatas"][0])
def list_collections(
chroma_client=None,
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK,
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT,
):
try:
if chroma_client == None:
if CHROMA_PERSISTENT_DISK != None:
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK)
else:
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT)
collections = chroma_client.list_collections()
logger.bind(logger_name="system").info("Available collections:")
for collection in collections:
logger.bind(logger_name="system").info(f"- Name: {collection.name}, Count: {collection.count()}")
return collections
except Exception as e:
logger.bind(logger_name="system").error(f"Error listing collections: {e}")
return []
def delete_collection(
collection_name,
chroma_client=None,
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK,
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT,
):
try:
if chroma_client == None:
if CHROMA_PERSISTENT_DISK != None:
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK)
else:
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT)
chroma_client.delete_collection(name=collection_name)
logger.bind(logger_name="system").info(f"Collection '{collection_name}' deleted successfully")
return True
except Exception as e:
logger.bind(logger_name="system").error(f"Error deleting collection '{collection_name}': {e}")
return False
# logger.bind(logger_name="system").info(query_faqs("tell me about distilled ai?"))
# init_faqs()
# add_status = add_faqs(faqs)
# logger.bind(logger_name="system").info("add_status: ", add_status)
if __name__ == "__main__":
# init_pdfs()
parsed_dir = 'parsed'
pdf_dir = 'pdf'
if not os.path.exists(parsed_dir):
raise FileNotFoundError(f"Directory '{parsed_dir}' not found")
if not os.path.exists(pdf_dir):
os.makedirs(pdf_dir)
for filename in os.listdir(parsed_dir):
if filename.endswith('.json'):
try:
json_path = os.path.join(parsed_dir, filename)
pdf_filename = filename.replace('.json', '.pdf')
relative_pdf_path = os.path.join(pdf_dir, pdf_filename)
with open(json_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
prepared_data = prepare_data(metadata, relative_pdf_path)
add_status = add_pdfs(prepared_data)
if add_status:
print(f"Successfully processed {filename}")
else:
print(f"Failed to process {filename}")
except Exception as e:
print(f"Error processing file {filename}: {str(e)}")
continue