|
import os |
|
import gradio as gr |
|
import fitz |
|
from sentence_transformers import SentenceTransformer |
|
import numpy as np |
|
import faiss |
|
from typing import List, Tuple, Dict |
|
from google.generativeai import GenerativeModel, configure, types |
|
|
|
|
|
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") |
|
configure(api_key=GOOGLE_API_KEY) |
|
|
|
|
|
class MyApp: |
|
def __init__(self) -> None: |
|
self.documents = [] |
|
self.embeddings = None |
|
self.index = None |
|
self.model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
def load_pdfs(self, files: List[gr.File]) -> str: |
|
"""Extracts text from multiple PDF files and stores them.""" |
|
self.documents = [] |
|
for file in files: |
|
doc = fitz.open(stream=file.read(), filetype="pdf") |
|
for page_num in range(len(doc)): |
|
page = doc[page_num] |
|
text = page.get_text() |
|
self.documents.append({ |
|
"file_name": file.name, |
|
"page": page_num + 1, |
|
"content": text |
|
}) |
|
return f"Processed {len(files)} PDFs successfully!" |
|
|
|
def build_vector_db(self) -> str: |
|
"""Builds a vector database using the content of the PDFs.""" |
|
if not self.documents: |
|
return "No documents to process." |
|
contents = [doc["content"] for doc in self.documents] |
|
self.embeddings = self.model.encode(contents, show_progress_bar=True) |
|
self.index = faiss.IndexFlatL2(self.embeddings.shape[1]) |
|
self.index.add(np.array(self.embeddings)) |
|
return "Vector database built successfully!" |
|
|
|
def search_documents(self, query: str, k: int = 3) -> List[Dict]: |
|
"""Searches for relevant document snippets using vector similarity.""" |
|
if not self.index: |
|
return [{"content": "Vector database is not built."}] |
|
query_embedding = self.model.encode([query], show_progress_bar=False) |
|
D, I = self.index.search(np.array(query_embedding), k) |
|
results = [self.documents[i] for i in I[0]] |
|
return results if results else [{"content": "No relevant documents found."}] |
|
|
|
app = MyApp() |
|
|
|
def upload_files(files: List[gr.File]) -> str: |
|
return app.load_pdfs(files) |
|
|
|
def build_vector_db() -> str: |
|
return app.build_vector_db() |
|
|
|
def respond(message: str, history: List[Tuple[str, str]]) -> Tuple[List[Tuple[str, str]], str]: |
|
system_message = ( |
|
"You are a helpful assistant designed to assist with studying and learning. " |
|
"You analyze uploaded PDF documents and provide clear, concise responses " |
|
"to any questions based on the content. You strive to be accurate, detailed, " |
|
"and educational in your responses." |
|
) |
|
messages = [{"role": "system", "content": system_message}] |
|
|
|
for user_msg, assistant_msg in history: |
|
if user_msg: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if assistant_msg: |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
retrieved_docs = app.search_documents(message) |
|
context = "\n".join( |
|
[f"File: {doc['file_name']}, Page: {doc['page']}\n{doc['content'][:200]}..." for doc in retrieved_docs] |
|
) |
|
|
|
|
|
model = GenerativeModel("gemini-1.5-pro-latest") |
|
generation_config = types.GenerationConfig( |
|
temperature=0.7, |
|
max_output_tokens=1024, |
|
) |
|
|
|
try: |
|
response = model.generate_content([context + "\nQuestion: " + message], generation_config=generation_config) |
|
response_content = response.text if hasattr(response, "text") else "No response generated." |
|
except Exception as e: |
|
response_content = f"An error occurred while generating the response: {str(e)}" |
|
|
|
|
|
history.append((message, response_content)) |
|
return history, "" |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Study Assistant Chatbot") |
|
gr.Markdown("Upload your PDFs, build a vector database, and ask questions to learn efficiently.") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
upload_btn = gr.File(label="Upload PDFs", file_types=[".pdf"], file_count="multiple") |
|
upload_message = gr.Textbox(label="Upload Status", lines=2) |
|
build_db_btn = gr.Button("Build Vector Database") |
|
db_message = gr.Textbox(label="DB Build Status", lines=2) |
|
|
|
upload_btn.change(upload_files, inputs=[upload_btn], outputs=[upload_message]) |
|
build_db_btn.click(build_vector_db, inputs=[], outputs=[db_message]) |
|
|
|
with gr.Column(): |
|
chatbot = gr.Chatbot(label="Chat Responses") |
|
query_input = gr.Textbox(label="Enter your query here") |
|
submit_btn = gr.Button("Submit") |
|
submit_btn.click(respond, inputs=[query_input, chatbot], outputs=[chatbot, query_input]) |
|
|
|
demo.launch() |