import os import uuid import json import re from bs4 import BeautifulSoup import requests import streamlit as st from PyPDF2 import PdfReader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_community.llms import Ollama from langchain.chains.question_answering import load_qa_chain from langchain.prompts import PromptTemplate from dotenv import load_dotenv from langchain_community.embeddings import HuggingFaceEmbeddings import nltk from urllib.parse import urljoin, urlparse from langchain.memory import ConversationBufferMemory # Load environment variables (if needed for API keys) load_dotenv() # Initialize HuggingFace Embeddings embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") # Download NLTK stopwords nltk.download('stopwords') from nltk.corpus import stopwords STOPWORDS = set(stopwords.words('english')) # Text Preprocessing Function def preprocess_text(text): text = re.sub(r'[^A-Za-z\s]', '', text) # Remove special characters text = re.sub(r'\s+', ' ', text).strip() # Remove extra spaces text = text.lower() # Convert to lowercase tokens = text.split() cleaned_text = " ".join([word for word in tokens if word not in STOPWORDS]) # Remove stopwords return cleaned_text # Function to Save Processed Data to a Document def save_data_to_document(data, filename="processed_data.json"): with open(filename, 'w') as f: json.dump(data, f, indent=4) st.success(f"Data has been saved to {filename}") # Scrape Website with BeautifulSoup def scrape_website(url): visited_urls = set() scraped_data = {} def scrape_page(url): if url in visited_urls: return visited_urls.add(url) try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', } response = requests.get(url, headers=headers) except requests.RequestException as e: st.error(f"Failed to retrieve {url}: {e}") return soup = BeautifulSoup(response.content, 'html.parser') # Extract relevant content relevant_tags = ['p', 'strong', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'div'] content = [] for tag in relevant_tags: for element in soup.find_all(tag): text = element.get_text(strip=True) if text: content.append(text) if content: scraped_data[url] = " ".join(content) # Find and process all internal links on the page for link in soup.find_all('a', href=True): next_url = urljoin(url, link['href']) if urlparse(next_url).netloc == urlparse(url).netloc and next_url not in visited_urls: scrape_page(next_url) scrape_page(url) return scraped_data # PDF Text Extraction def get_pdf_text(pdf_docs): text = "" for pdf in pdf_docs: pdf_reader = PdfReader(pdf) for page in pdf_reader.pages: text += page.extract_text() or "" # Handle None return preprocess_text(text) # Split Text into Manageable Chunks def get_text_chunks(text): text_splitter = RecursiveCharacterTextSplitter(chunk_size=15000, chunk_overlap=1000) chunks = text_splitter.split_text(text) return chunks # Create FAISS Vector Store with UUID def create_faiss_with_uuid(text_chunks): # Generate a unique UUID for this document unique_id = str(uuid.uuid4()) # Generate unique identifier # Create a new FAISS index for the document vector_store = FAISS.from_texts(text_chunks, embeddings) # Create FAISS from chunks # Define a directory to store the FAISS index (using the UUID as part of the directory name) faiss_directory = f'./faiss_index_{unique_id}' os.makedirs(faiss_directory, exist_ok=True) # Save the FAISS index in a directory with the UUID vector_store.save_local(faiss_directory) # Save locally with a unique directory name return unique_id, faiss_directory # Return the UUID and the directory path # Build Conversational Chain def get_conversational_chain(memory): prompt_template = """ Answer the question as detailed as possible from the provided context. If the answer is not in provided context, just say, "answer is not available in the context." Don't provide the wrong answer.\n\n Context:\n {context}\n Question: \n{question}\n Answer: """ model = Ollama(model="phi") # Initialize LLaMA model prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) # Add memory to the chain chain = load_qa_chain(model, chain_type="stuff", prompt=prompt, memory=memory) return chain # Handle User Input and Process Questions with UUID-based FAISS Index def user_input(user_question, faiss_directory, memory): # Load the FAISS index based on the given directory (UUID-based) new_db = FAISS.load_local(faiss_directory, embeddings, allow_dangerous_deserialization=True) # Perform similarity search and answer the user's question docs = new_db.similarity_search(user_question) chain = get_conversational_chain(memory) # Update memory with the question and response response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) memory.save_context({"input": user_question}, {"output": response["output_text"]}) st.write("Reply: ", response["output_text"]) # Main Function for Streamlit App def main(): st.set_page_config("Chat PDF & URL", layout="wide") st.header("Chat with PDF or URL using Ollama 💁") # Initialize memory for conversation history memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) user_question = st.text_input("Ask a Question from the Processed Data") if user_question and 'faiss_directory' in st.session_state: faiss_directory = st.session_state['faiss_directory'] user_input(user_question, faiss_directory, memory) with st.sidebar: st.title("Menu:") # User selects between PDF or URL option = st.radio("Choose input type:", ("PDF", "URL")) if option == "PDF": pdf_docs = st.file_uploader("Upload PDF Files:", accept_multiple_files=True) if st.button("Submit & Process"): with st.spinner("Processing..."): if pdf_docs: raw_text = get_pdf_text(pdf_docs) text_chunks = get_text_chunks(raw_text) unique_id, faiss_directory = create_faiss_with_uuid(text_chunks) st.session_state['faiss_directory'] = faiss_directory # Save the cleaned PDF data to a document save_data_to_document({"pdf_data": raw_text}, f"pdf_data_{unique_id}.json") st.success("PDF data is ready for queries!") else: st.error("No PDF files were uploaded.") elif option == "URL": url_input = st.text_input("Enter a URL to scrape text:") if st.button("Submit & Process"): with st.spinner("Processing..."): if url_input: try: # Run BeautifulSoup and get scraped data scraped_data = scrape_website(url_input) # Combine and preprocess scraped data raw_text = preprocess_text(" ".join(scraped_data.values())) # Split text into chunks and index in FAISS text_chunks = get_text_chunks(raw_text) unique_id, faiss_directory = create_faiss_with_uuid(text_chunks) st.session_state['faiss_directory'] = faiss_directory # Save the cleaned URL data to a document save_data_to_document({"url_data": scraped_data}, f"url_data_{unique_id}.json") st.success("Scraped data is ready for queries!") except Exception as e: st.error(f"Failed to scrape or process data: {e}") else: st.error("No URL was provided.") if __name__ == "__main__": main()