Spaces:
Running
Running
import os | |
import uuid | |
import json | |
import re | |
from bs4 import BeautifulSoup | |
import requests | |
import streamlit as st | |
from PyPDF2 import PdfReader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.llms import Ollama | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain.prompts import PromptTemplate | |
from dotenv import load_dotenv | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import nltk | |
from urllib.parse import urljoin, urlparse | |
from langchain.memory import ConversationBufferMemory | |
# Load environment variables (if needed for API keys) | |
load_dotenv() | |
# Initialize HuggingFace Embeddings | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
# Download NLTK stopwords | |
nltk.download('stopwords') | |
from nltk.corpus import stopwords | |
STOPWORDS = set(stopwords.words('english')) | |
# Text Preprocessing Function | |
def preprocess_text(text): | |
text = re.sub(r'[^A-Za-z\s]', '', text) # Remove special characters | |
text = re.sub(r'\s+', ' ', text).strip() # Remove extra spaces | |
text = text.lower() # Convert to lowercase | |
tokens = text.split() | |
cleaned_text = " ".join([word for word in tokens if word not in STOPWORDS]) # Remove stopwords | |
return cleaned_text | |
# Function to Save Processed Data to a Document | |
def save_data_to_document(data, filename="processed_data.json"): | |
with open(filename, 'w') as f: | |
json.dump(data, f, indent=4) | |
st.success(f"Data has been saved to {filename}") | |
# Scrape Website with BeautifulSoup | |
def scrape_website(url): | |
visited_urls = set() | |
scraped_data = {} | |
def scrape_page(url): | |
if url in visited_urls: | |
return | |
visited_urls.add(url) | |
try: | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Connection': 'keep-alive', | |
} | |
response = requests.get(url, headers=headers) | |
except requests.RequestException as e: | |
st.error(f"Failed to retrieve {url}: {e}") | |
return | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Extract relevant content | |
relevant_tags = ['p', 'strong', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'div'] | |
content = [] | |
for tag in relevant_tags: | |
for element in soup.find_all(tag): | |
text = element.get_text(strip=True) | |
if text: | |
content.append(text) | |
if content: | |
scraped_data[url] = " ".join(content) | |
# Find and process all internal links on the page | |
for link in soup.find_all('a', href=True): | |
next_url = urljoin(url, link['href']) | |
if urlparse(next_url).netloc == urlparse(url).netloc and next_url not in visited_urls: | |
scrape_page(next_url) | |
scrape_page(url) | |
return scraped_data | |
# PDF Text Extraction | |
def get_pdf_text(pdf_docs): | |
text = "" | |
for pdf in pdf_docs: | |
pdf_reader = PdfReader(pdf) | |
for page in pdf_reader.pages: | |
text += page.extract_text() or "" # Handle None | |
return preprocess_text(text) | |
# Split Text into Manageable Chunks | |
def get_text_chunks(text): | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=15000, chunk_overlap=1000) | |
chunks = text_splitter.split_text(text) | |
return chunks | |
# Create FAISS Vector Store with UUID | |
def create_faiss_with_uuid(text_chunks): | |
# Generate a unique UUID for this document | |
unique_id = str(uuid.uuid4()) # Generate unique identifier | |
# Create a new FAISS index for the document | |
vector_store = FAISS.from_texts(text_chunks, embeddings) # Create FAISS from chunks | |
# Define a directory to store the FAISS index (using the UUID as part of the directory name) | |
faiss_directory = f'./faiss_index_{unique_id}' | |
os.makedirs(faiss_directory, exist_ok=True) | |
# Save the FAISS index in a directory with the UUID | |
vector_store.save_local(faiss_directory) # Save locally with a unique directory name | |
return unique_id, faiss_directory # Return the UUID and the directory path | |
# Build Conversational Chain | |
def get_conversational_chain(memory): | |
prompt_template = """ | |
Answer the question as detailed as possible from the provided context. If the answer is not in | |
provided context, just say, "answer is not available in the context." Don't provide the wrong answer.\n\n | |
Context:\n {context}\n | |
Question: \n{question}\n | |
Answer: | |
""" | |
model = Ollama(model="phi") # Initialize LLaMA model | |
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) | |
# Add memory to the chain | |
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt, memory=memory) | |
return chain | |
# Handle User Input and Process Questions with UUID-based FAISS Index | |
def user_input(user_question, faiss_directory, memory): | |
# Load the FAISS index based on the given directory (UUID-based) | |
new_db = FAISS.load_local(faiss_directory, embeddings, allow_dangerous_deserialization=True) | |
# Perform similarity search and answer the user's question | |
docs = new_db.similarity_search(user_question) | |
chain = get_conversational_chain(memory) | |
# Update memory with the question and response | |
response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) | |
memory.save_context({"input": user_question}, {"output": response["output_text"]}) | |
st.write("Reply: ", response["output_text"]) | |
# Main Function for Streamlit App | |
def main(): | |
st.set_page_config("Chat PDF & URL", layout="wide") | |
st.header("Chat with PDF or URL using Ollama π") | |
# Initialize memory for conversation history | |
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
user_question = st.text_input("Ask a Question from the Processed Data") | |
if user_question and 'faiss_directory' in st.session_state: | |
faiss_directory = st.session_state['faiss_directory'] | |
user_input(user_question, faiss_directory, memory) | |
with st.sidebar: | |
st.title("Menu:") | |
# User selects between PDF or URL | |
option = st.radio("Choose input type:", ("PDF", "URL")) | |
if option == "PDF": | |
pdf_docs = st.file_uploader("Upload PDF Files:", accept_multiple_files=True) | |
if st.button("Submit & Process"): | |
with st.spinner("Processing..."): | |
if pdf_docs: | |
raw_text = get_pdf_text(pdf_docs) | |
text_chunks = get_text_chunks(raw_text) | |
unique_id, faiss_directory = create_faiss_with_uuid(text_chunks) | |
st.session_state['faiss_directory'] = faiss_directory | |
# Save the cleaned PDF data to a document | |
save_data_to_document({"pdf_data": raw_text}, f"pdf_data_{unique_id}.json") | |
st.success("PDF data is ready for queries!") | |
else: | |
st.error("No PDF files were uploaded.") | |
elif option == "URL": | |
url_input = st.text_input("Enter a URL to scrape text:") | |
if st.button("Submit & Process"): | |
with st.spinner("Processing..."): | |
if url_input: | |
try: | |
# Run BeautifulSoup and get scraped data | |
scraped_data = scrape_website(url_input) | |
# Combine and preprocess scraped data | |
raw_text = preprocess_text(" ".join(scraped_data.values())) | |
# Split text into chunks and index in FAISS | |
text_chunks = get_text_chunks(raw_text) | |
unique_id, faiss_directory = create_faiss_with_uuid(text_chunks) | |
st.session_state['faiss_directory'] = faiss_directory | |
# Save the cleaned URL data to a document | |
save_data_to_document({"url_data": scraped_data}, f"url_data_{unique_id}.json") | |
st.success("Scraped data is ready for queries!") | |
except Exception as e: | |
st.error(f"Failed to scrape or process data: {e}") | |
else: | |
st.error("No URL was provided.") | |
if __name__ == "__main__": | |
main() | |