from typing import Any import gradio as gr from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain.chains import ConversationalRetrievalChain from langchain_openai import ChatOpenAI from langchain_community.document_loaders import PyMuPDFLoader import fitz from PIL import Image import os import re import openai openai.api_key = "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" def add_text(history, text: str): if not text: raise gr.Error("Enter text") history = history + [(text, "")] return history class MyApp: def __init__(self) -> None: self.OPENAI_API_KEY: str = openai.api_key self.chain = None self.chat_history: list = [] self.N: int = 0 self.count: int = 0 self.documents = None self.file_name = None def __call__(self, file: str) -> Any: if self.count == 0: self.chain = self.build_chain(file) self.count += 1 return self.chain def process_file(self, file: str): loader = PyMuPDFLoader(file.name) self.documents = loader.load() pattern = r"/([^/]+)$" match = re.search(pattern, file.name) try: self.file_name = match.group(1) except: self.file_name = os.path.basename(file) # Render the first page for display doc = fitz.open(file.name) page = doc[0] pix = page.get_pixmap(dpi=150) image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) return image def build_chain(self, file: str): embeddings = OpenAIEmbeddings(openai_api_key=self.OPENAI_API_KEY) pdfsearch = Chroma.from_documents( self.documents, embeddings, collection_name=self.file_name, ) self.chain = ConversationalRetrievalChain.from_llm( ChatOpenAI(temperature=0.0, openai_api_key=self.OPENAI_API_KEY), retriever=pdfsearch.as_retriever(search_kwargs={"k": 1}), return_source_documents=True, ) return "Vector database built successfully!" def get_response(history, query, file): if not file: raise gr.Error(message="Upload a PDF") chain = app(file) result = chain( {"question": query, "chat_history": app.chat_history}, return_only_outputs=True ) app.chat_history += [(query, result["answer"])] app.N = list(result["source_documents"][0])[1][1]["page"] highlighted_line = result["answer"] for char in result["answer"]: history[-1][-1] += char yield history, "", f"Page: {app.N + 1}, Highlight: {highlighted_line}" def render_file(file): doc = fitz.open(file.name) page = doc[app.N] pix = page.get_pixmap(dpi=150) image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) return image def purge_chat_and_render_first(file): app.chat_history = [] app.count = 0 doc = fitz.open(file.name) page = doc[0] pix = page.get_pixmap(dpi=150) image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) return image, [] def refresh_chat(): app.chat_history = [] return [] app = MyApp() with gr.Blocks() as demo: with gr.Tab("Step 1: Upload PDF"): btn = gr.UploadButton("📁 Upload a PDF", file_types=[".pdf"]) show_img = gr.Image(label="Uploaded PDF") with gr.Tab("Step 2: Process File"): process_btn = gr.Button("Process PDF") show_img_processed = gr.Image(label="Processed PDF") with gr.Tab("Step 3: Build Vector Database"): build_vector_btn = gr.Button("Build Vector Database") status_text = gr.Textbox(label="Status", value="", interactive=False) with gr.Tab("Step 4: Ask Questions"): chatbot = gr.Chatbot(value=[], elem_id="chatbot") txt = gr.Textbox( show_label=False, placeholder="Enter text and press submit", scale=2 ) submit_btn = gr.Button("Submit", scale=1) refresh_btn = gr.Button("Refresh Chat", scale=1) temperature_slider = gr.Slider(0, 1, value=0.0, label="Temperature") btn.upload( fn=purge_chat_and_render_first, inputs=[btn], outputs=[show_img, chatbot], ) process_btn.click( fn=app.process_file, inputs=[btn], outputs=[show_img_processed], ) build_vector_btn.click( fn=app.build_chain, inputs=[btn], outputs=[status_text], ) submit_btn.click( fn=add_text, inputs=[chatbot, txt], outputs=[chatbot], queue=False, ).success( fn=get_response, inputs=[chatbot, txt, btn], outputs=[chatbot, txt] ).success( fn=render_file, inputs=[btn], outputs=[show_img_processed] ) refresh_btn.click( fn=refresh_chat, inputs=[], outputs=[chatbot], ) demo.queue() demo.launch()