|
rom typing import Any, List, Tuple |
|
import gradio as gr |
|
from langchain_openai import OpenAIEmbeddings |
|
from langchain_community.vectorstores import Chroma |
|
from langchain.chains import ConversationalRetrievalChain |
|
from langchain_openai import ChatOpenAI |
|
from langchain_community.document_loaders import PyMuPDFLoader |
|
import fitz |
|
from PIL import Image |
|
import os |
|
import re |
|
import openai |
|
|
|
openai.api_key = "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" |
|
|
|
def add_text(history: List[Tuple[str, str]], text: str) -> List[Tuple[str, str]]: |
|
if not text: |
|
raise gr.Error("Enter text") |
|
history.append((text, "")) |
|
return history |
|
|
|
class MyApp: |
|
def __init__(self) -> None: |
|
self.OPENAI_API_KEY: str = openai.api_key |
|
self.chain = None |
|
self.chat_history: list = [] |
|
self.documents = None |
|
self.file_name = None |
|
|
|
def __call__(self, file: str) -> ConversationalRetrievalChain: |
|
if self.chain is None: |
|
self.chain = self.build_chain(file) |
|
return self.chain |
|
|
|
def process_file(self, file) -> Image.Image: |
|
loader = PyMuPDFLoader(file.name) |
|
self.documents = loader.load() |
|
pattern = r"/([^/]+)$" |
|
match = re.search(pattern, file.name) |
|
try: |
|
self.file_name = match.group(1) |
|
except: |
|
self.file_name = os.path.basename(file) |
|
doc = fitz.open(file.name) |
|
page = doc[0] |
|
pix = page.get_pixmap(dpi=150) |
|
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
return image |
|
|
|
def build_chain(self, file) -> str: |
|
embeddings = OpenAIEmbeddings(openai_api_key=self.OPENAI_API_KEY) |
|
pdfsearch = Chroma.from_documents( |
|
self.documents, |
|
embeddings, |
|
collection_name=self.file_name, |
|
) |
|
self.chain = ConversationalRetrievalChain.from_llm( |
|
ChatOpenAI(temperature=0.0, openai_api_key=self.OPENAI_API_KEY), |
|
retriever=pdfsearch.as_retriever(search_kwargs={"k": 1}), |
|
return_source_documents=True, |
|
) |
|
return "Vector database built successfully!" |
|
|
|
def get_response(history, query, file): |
|
if not file: |
|
raise gr.Error(message="Upload a PDF") |
|
chain = app(file) |
|
try: |
|
result = chain.invoke( |
|
{"question": query, "chat_history": app.chat_history} |
|
) |
|
app.chat_history.append((query, result["answer"])) |
|
source_docs = result["source_documents"] |
|
source_texts = [] |
|
for doc in source_docs: |
|
source_texts.append(f"Page {doc.metadata['page'] + 1}: {doc.page_content}") |
|
source_texts_str = "\n\n".join(source_texts) |
|
for char in result["answer"]: |
|
history[-1][-1] += char |
|
return history, source_texts_str |
|
except Exception: |
|
app.chat_history.append((query, "I have no information about it. Feed me knowledge, please!")) |
|
return history, "I have no information about it. Feed me knowledge, please!" |
|
|
|
def render_file(file) -> Image.Image: |
|
doc = fitz.open(file.name) |
|
page = doc[0] |
|
pix = page.get_pixmap(dpi=150) |
|
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
return image |
|
|
|
def purge_chat_and_render_first(file) -> Tuple[Image.Image, list]: |
|
app.chat_history = [] |
|
doc = fitz.open(file.name) |
|
page = doc[0] |
|
pix = page.get_pixmap(dpi=150) |
|
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
return image, [] |
|
|
|
def refresh_chat(): |
|
app.chat_history = [] |
|
return [] |
|
|
|
app = MyApp() |
|
|
|
with gr.Blocks() as demo: |
|
with gr.Tab("Step 1: Upload PDF"): |
|
btn = gr.UploadButton("📁 Upload a PDF", file_types=[".pdf"]) |
|
show_img = gr.Image(label="Uploaded PDF") |
|
|
|
with gr.Tab("Step 2: Process File"): |
|
process_btn = gr.Button("Process PDF") |
|
show_img_processed = gr.Image(label="Processed PDF") |
|
process_status = gr.Textbox(label="Processing Status", interactive=False) |
|
|
|
with gr.Tab("Step 3: Build Vector Database"): |
|
build_vector_btn = gr.Button("Build Vector Database") |
|
status_text = gr.Textbox(label="Status", value="", interactive=False) |
|
|
|
with gr.Tab("Step 4: Ask Questions"): |
|
chatbot = gr.Chatbot(elem_id="chatbot") |
|
txt = gr.Textbox( |
|
show_label=False, |
|
placeholder="Enter text and press submit", |
|
scale=2 |
|
) |
|
submit_btn = gr.Button("Submit", scale=1) |
|
refresh_btn = gr.Button("Refresh Chat", scale=1) |
|
source_texts_output = gr.Textbox(label="Source Texts", interactive=False) |
|
|
|
btn.upload( |
|
fn=purge_chat_and_render_first, |
|
inputs=[btn], |
|
outputs=[show_img, chatbot], |
|
) |
|
|
|
process_btn.click( |
|
fn=lambda file: (app.process_file(file), "Processing complete!"), |
|
inputs=[btn], |
|
outputs=[show_img_processed, process_status], |
|
) |
|
|
|
build_vector_btn.click( |
|
fn=app.build_chain, |
|
inputs=[btn], |
|
outputs=[status_text], |
|
) |
|
|
|
submit_btn.click( |
|
fn=add_text, |
|
inputs=[chatbot, txt], |
|
outputs=[chatbot], |
|
queue=False, |
|
).success( |
|
fn=get_response, inputs=[chatbot, txt, btn], outputs=[chatbot, source_texts_output] |
|
) |
|
|
|
refresh_btn.click( |
|
fn=refresh_chat, |
|
inputs=[], |
|
outputs=[chatbot], |
|
) |
|
|
|
demo.queue() |
|
demo.launch() |
|
|