import gradio as gr import os import zipfile import uuid from langchain.embeddings import HuggingFaceEmbeddings from langchain.document_loaders import WhatsAppChatLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from dotenv import load_dotenv from pinecone import Pinecone, ServerlessSpec load_dotenv() # Initialize Pinecone and the index outside the function pinecone_key = os.getenv("PINECONE_API_KEY") pc = Pinecone(api_key=pinecone_key) index_name = "whatsapp-chat-index" if 'index_name' not in pc.list_indexes().names(): pc.create_index( name=index_name, dimension=384, # change as per embedding model metric="cosine", spec=ServerlessSpec( cloud='aws', region='us-east-1' ) ) index = pc.Index(index_name) # Initialize Hugging Face embeddings embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") # Maximum allowed chunk size in bytes (4MB) MAX_CHUNK_SIZE = 4 * 1024 * 1024 def load_chat_content(file) -> str: """Load chat content from the uploaded zip file and store it in Pinecone.""" if file is None: return "No file uploaded. Please upload a valid ZIP file to process." # Ensure the uploaded file is a ZIP file if not zipfile.is_zipfile(file.name): return "Uploaded file is not a valid ZIP file. Please upload a ZIP file." # Load and process the ZIP file temp_dir = 'temp_extracted_files' os.makedirs(temp_dir, exist_ok=True) try: with zipfile.ZipFile(file, 'r') as z: z.extractall(temp_dir) except zipfile.BadZipFile: return "Error reading ZIP file. The file may be corrupted." chat_files = [f for f in os.listdir(temp_dir) if f.endswith('.txt')] if not chat_files: return "No chat files found in the zip archive." chat_file_path = os.path.join(temp_dir, chat_files[0]) loader = WhatsAppChatLoader(path=chat_file_path) raw_messages = loader.lazy_load() messages = list(raw_messages) chat_content = "\n".join([doc.page_content for doc in messages]) text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, ) chunks = text_splitter.create_documents([chat_content]) # Store chunks in Pinecone with unique IDs vectors_to_upsert = [] for i, chunk in enumerate(chunks): vector = embeddings.embed_documents([chunk.page_content])[0] unique_id = str(uuid.uuid4()) # Generate a unique ID vectors_to_upsert.append((unique_id, vector, {"text": chunk.page_content})) # Split the vectors_to_upsert into smaller batches if needed for i in range(0, len(vectors_to_upsert), 100): # Batch size of 100 batch = vectors_to_upsert[i:i + 100] # Calculate batch size batch_size = sum(len(vector[2]["text"].encode('utf-8')) for vector in batch) if batch_size > MAX_CHUNK_SIZE: # Further split the batch if it exceeds the limit for j in range(0, len(batch), 10): # Split into even smaller batches sub_batch = batch[j:j + 10] index.upsert(sub_batch) else: index.upsert(batch) return "All chat content has been successfully upserted to Pinecone." # Define the Gradio interface interface = gr.Interface( fn=load_chat_content, inputs=[ gr.File(label="Upload WhatsApp Chat Zip File") ], outputs="text", title="WhatsApp Chat Upsert to Pinecone", description="Upload a zip file containing a WhatsApp chat file and upsert its content to Pinecone.", ) if __name__ == "__main__": interface.launch()