Spaces:
Runtime error
Runtime error
import chromadb | |
# from .encoder import get_embedding | |
import uuid | |
from tqdm import tqdm | |
from config import settings | |
import os | |
import json | |
from logs.logger_config import logger | |
import requests | |
def get_embedding(content=""): | |
response = requests.post( | |
f"{settings.ENCODER_INFERENCE_ENDPOINT}/v1/embeddings", | |
json={"input": content, "model": settings.ENCODER_INFERENCE_MODEL}, | |
) | |
# Check if the response is successful | |
if response.status_code == 200: | |
logger.bind(logger_name="encoder_inference").info("Encode done!") | |
else: | |
logger.bind(logger_name="encoder_inference").error( | |
"Error: {response.status_code} , {response.text} " | |
) | |
return response.json()["data"][0]["embedding"] | |
def generate_id(): | |
return str(uuid.uuid4()) | |
def init_pdfs( | |
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK, | |
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT, | |
): | |
if CHROMA_PERSISTENT_DISK != None: | |
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK) | |
else: | |
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT) | |
logger.bind(logger_name="system").info(chroma_client.heartbeat()) | |
chroma_client.get_or_create_collection( | |
settings.PDF_COLLECTION_NAME, metadata={"hnsw:space": "cosine"} | |
) | |
logger.bind(logger_name="system").info( | |
f"Collection {settings.PDF_COLLECTION_NAME} initial successfully" | |
) | |
def add_pdfs( | |
pdfs=[], | |
chroma_client=None, | |
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK, | |
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT, | |
): | |
try: | |
if chroma_client == None: | |
if CHROMA_PERSISTENT_DISK != None: | |
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK) | |
else: | |
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT) | |
logger.bind(logger_name="system").info(chroma_client.heartbeat()) | |
collection = chroma_client.get_collection(settings.PDF_COLLECTION_NAME) | |
except Exception as e: | |
logger.bind(logger_name="system").info("ERROR: ", e) | |
return "Fail" | |
for pdf in tqdm(pdfs): | |
try: | |
chunk = f"""Filename: {pdf["name_file"]}\n\nType: {pdf["type"]}\n\nContent: {pdf["content"]}""" | |
chunk_id = f"PDF-{generate_id()}" | |
embedding = get_embedding(chunk) | |
metadata = {"Filename": str(pdf["name_file"]), | |
"Type": str(pdf["type"]), | |
"Content": str(pdf["content"]) | |
} | |
# print('metadata here: ') | |
# print([pdf]) | |
collection.add( | |
embeddings=[embedding], | |
metadatas=[metadata], | |
# documents = documents, | |
ids=[chunk_id], | |
) | |
except Exception as e: | |
logger.bind(logger_name="system").info(f"ERROR: {e}") | |
logger.bind(logger_name="system").info(f"DROP: {pdf}") | |
return "Suck seed" | |
def format_pdfs(metadatas): | |
logger.bind(logger_name="system").info(metadatas) | |
# print(metadatas[0].keys()) | |
outputs = '' | |
for pdf in metadatas: | |
try: | |
outputs += "\n\n" + f"""Filename: {pdf.get("Filename","None")}\nType: {pdf.get("Content","None")}\nContent: {pdf.get("Content","None")}""" | |
except: | |
print('Exception raised: ',pdf.keys()) | |
logger.bind(logger_name="system").info(outputs) | |
return outputs | |
def query_pdfs( | |
query="", | |
chroma_client=None, | |
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK, | |
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT, | |
): | |
try: | |
if chroma_client == None: | |
if CHROMA_PERSISTENT_DISK != None: | |
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK) | |
else: | |
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT) | |
logger.bind(logger_name="system").info(chroma_client.heartbeat()) | |
collection = chroma_client.get_collection(name=settings.PDF_COLLECTION_NAME) | |
except: | |
return f"Collection {settings.PDF_COLLECTION_NAME} not found" | |
query = "Represent this sentence for searching relevant passages: \n" + query | |
query_results = collection.query(query_embeddings=get_embedding(query), n_results=30) | |
logger.bind(logger_name="system").info(query_results) | |
return format_pdfs(query_results["metadatas"][0]) | |
def list_collections( | |
chroma_client=None, | |
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK, | |
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT, | |
): | |
try: | |
if chroma_client == None: | |
if CHROMA_PERSISTENT_DISK != None: | |
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK) | |
else: | |
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT) | |
collections = chroma_client.list_collections() | |
logger.bind(logger_name="system").info("Available collections:") | |
for collection in collections: | |
logger.bind(logger_name="system").info(f"- Name: {collection.name}, Count: {collection.count()}") | |
return collections | |
except Exception as e: | |
logger.bind(logger_name="system").error(f"Error listing collections: {e}") | |
return [] | |
def delete_collection( | |
collection_name, | |
chroma_client=None, | |
CHROMA_PERSISTENT_DISK=settings.CHROMA_PERSISTENT_DISK, | |
CHROMA_ENDPOINT=settings.CHROMA_ENDPOINT, | |
): | |
try: | |
if chroma_client == None: | |
if CHROMA_PERSISTENT_DISK != None: | |
chroma_client = chromadb.PersistentClient(path=CHROMA_PERSISTENT_DISK) | |
else: | |
chroma_client = chromadb.HttpClient(host=CHROMA_ENDPOINT) | |
chroma_client.delete_collection(name=collection_name) | |
logger.bind(logger_name="system").info(f"Collection '{collection_name}' deleted successfully") | |
return True | |
except Exception as e: | |
logger.bind(logger_name="system").error(f"Error deleting collection '{collection_name}': {e}") | |
return False | |
# logger.bind(logger_name="system").info(query_faqs("tell me about distilled ai?")) | |
# init_faqs() | |
# add_status = add_faqs(faqs) | |
# logger.bind(logger_name="system").info("add_status: ", add_status) | |
if __name__ == "__main__": | |
# init_pdfs() | |
parsed_dir = 'parsed' | |
pdf_dir = 'pdf' | |
if not os.path.exists(parsed_dir): | |
raise FileNotFoundError(f"Directory '{parsed_dir}' not found") | |
if not os.path.exists(pdf_dir): | |
os.makedirs(pdf_dir) | |
for filename in os.listdir(parsed_dir): | |
if filename.endswith('.json'): | |
try: | |
json_path = os.path.join(parsed_dir, filename) | |
pdf_filename = filename.replace('.json', '.pdf') | |
relative_pdf_path = os.path.join(pdf_dir, pdf_filename) | |
with open(json_path, 'r', encoding='utf-8') as f: | |
metadata = json.load(f) | |
prepared_data = prepare_data(metadata, relative_pdf_path) | |
add_status = add_pdfs(prepared_data) | |
if add_status: | |
print(f"Successfully processed {filename}") | |
else: | |
print(f"Failed to process {filename}") | |
except Exception as e: | |
print(f"Error processing file {filename}: {str(e)}") | |
continue |