Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import zipfile | |
import uuid | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.document_loaders import WhatsAppChatLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from dotenv import load_dotenv | |
from pinecone import Pinecone, ServerlessSpec | |
load_dotenv() | |
# Initialize Pinecone and the index outside the function | |
pinecone_key = os.getenv("PINECONE_API_KEY") | |
pc = Pinecone(api_key=pinecone_key) | |
index_name = "whatsapp-chat-index-1" | |
if 'index_name' not in pc.list_indexes().names(): | |
pc.create_index( | |
name=index_name, | |
dimension=384, # change as per embedding model | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud='aws', | |
region='us-east-1' | |
) | |
) | |
index = pc.Index(index_name) | |
# Initialize Hugging Face embeddings | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
# Maximum allowed chunk size in bytes (4MB) | |
MAX_CHUNK_SIZE = 4 * 1024 * 1024 | |
def load_chat_content(file) -> str: | |
"""Load chat content from the uploaded zip file and store it in Pinecone.""" | |
if file is None: | |
return "No file uploaded. Please upload a valid ZIP file to process." | |
# Ensure the uploaded file is a ZIP file | |
if not zipfile.is_zipfile(file.name): | |
return "Uploaded file is not a valid ZIP file. Please upload a ZIP file." | |
# Load and process the ZIP file | |
temp_dir = 'temp_extracted_files' | |
os.makedirs(temp_dir, exist_ok=True) | |
try: | |
with zipfile.ZipFile(file, 'r') as z: | |
z.extractall(temp_dir) | |
except zipfile.BadZipFile: | |
return "Error reading ZIP file. The file may be corrupted." | |
chat_files = [f for f in os.listdir(temp_dir) if f.endswith('.txt')] | |
if not chat_files: | |
return "No chat files found in the zip archive." | |
chat_file_path = os.path.join(temp_dir, chat_files[0]) | |
loader = WhatsAppChatLoader(path=chat_file_path) | |
raw_messages = loader.lazy_load() | |
messages = list(raw_messages) | |
chat_content = "\n".join([doc.page_content for doc in messages]) | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
) | |
chunks = text_splitter.create_documents([chat_content]) | |
# Store chunks in Pinecone with unique IDs | |
vectors_to_upsert = [] | |
for i, chunk in enumerate(chunks): | |
vector = embeddings.embed_documents([chunk.page_content])[0] | |
unique_id = str(uuid.uuid4()) # Generate a unique ID | |
vectors_to_upsert.append((unique_id, vector, {"text": chunk.page_content})) | |
# Split the vectors_to_upsert into smaller batches if needed | |
for i in range(0, len(vectors_to_upsert), 100): # Batch size of 100 | |
batch = vectors_to_upsert[i:i + 100] | |
# Calculate batch size | |
batch_size = sum(len(vector[2]["text"].encode('utf-8')) for vector in batch) | |
if batch_size > MAX_CHUNK_SIZE: | |
# Further split the batch if it exceeds the limit | |
for j in range(0, len(batch), 10): # Split into even smaller batches | |
sub_batch = batch[j:j + 10] | |
index.upsert(sub_batch) | |
else: | |
index.upsert(batch) | |
return "All chat content has been successfully upserted to Pinecone." | |
# Define the Gradio interface | |
interface = gr.Interface( | |
fn=load_chat_content, | |
inputs=[ | |
gr.File(label="Upload WhatsApp Chat Zip File") | |
], | |
outputs="text", | |
title="WhatsApp Chat Upsert to Pinecone", | |
description="Upload a zip file containing a WhatsApp chat file and upsert its content to Pinecone.", | |
) | |
if __name__ == "__main__": | |
interface.launch() | |