# Initialize Google API and model import torch device = torch.device("cpu") # Force CPU import base64 import os from huggingface_hub import login import PIL.Image from byaldi import RAGMultiModalModel import PIL.Image as PILImage import io import textwrap import google.generativeai as genai import gradio as gr # Add Gradio for UI from PIL import Image as PILImage import os GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") genai.configure(api_key=GOOGLE_API_KEY) model = genai.GenerativeModel('models/gemini-1.5-flash-latest') # Load the RAG multi-modal model RAG = RAGMultiModalModel.from_pretrained("vidore/colpali-v1.2", verbose=1) RAG.to(device) # Specify the index path where the index was saved during the first run index_path = "/home/mohammadaqib/Desktop/project/research/Multi-Modal-RAG/Colpali/BCC" RAG = RAGMultiModalModel.from_index(index_path) # Initialize conversation history conversation_history = [] def get_user_input(query): """Process user input.""" return query def process_image_from_results(results): """Process images from the search results and merge them.""" image_list = [] for i in range(min(3, len(results))): try: # Ensure the result has a base64 attribute image_bytes = base64.b64decode(results[i].base64) image = PILImage.open(io.BytesIO(image_bytes)) # Open image directly from bytes image_list.append(image) except AttributeError: print(f"Result {i} does not contain a 'base64' attribute.") # Merge images if any if image_list: total_width = sum(img.width for img in image_list) max_height = max(img.height for img in image_list) merged_image = PILImage.new('RGB', (total_width, max_height)) x_offset = 0 for img in image_list: merged_image.paste(img, (x_offset, 0)) x_offset += img.width # Save the merged image merged_image.save('merged_image.jpg') return merged_image else: return None def generate_answer(query, image): """Generate an answer using the Gemini model and the merged image.""" response = model.generate_content([f'Answer to the question asked using the image. Also mention the reference from image to support your answer. Example, Table Number or Statement number or any metadata. Question: {query}', image], stream=True) response.resolve() return response.text def classify_system_question(query): """Check if the question is related to the system itself.""" response = model.generate_content([f"Determine if the question is about the system itself, like 'Who are you?' or 'What can you do?' or 'Introduce yourself' . Answer with 'yes' or 'no'. Question: {query}"], stream=True) response.resolve() return response.text.strip().lower() == "yes" def classify_question(query): """Classify whether the question is general or domain-specific using Gemini.""" response = model.generate_content([f"Classify this question as 'general' or 'domain-specific'. Give one word answer i.e general or domain-specific. General questions are greetings and questions involving general knowledge like the capital of France. General questions also involve politics, geography, history, economics, cosmology, information about famous personalities, etc. Question: {query}"], stream=True) response.resolve() return response.text.strip().lower() # Assuming the response is either 'general' or 'domain-specific' def chatbot(query, history): max_history_length = 50 # Number of recent exchanges to keep # Truncate the history to the last `max_history_length` exchanges truncated_history = history[-max_history_length:] # Add user input to the history truncated_history.append(("You: " + query, "Model:")) # Step 1: Check if the question is about the system if classify_system_question(query): text = "I am an AI assistant capable of answering queries related to the National Building Code of Canada and general questions. I was developed by the research group [SITE] at the University of Alberta. How can I assist you further?" else: # Step 2: Classify the question as general or domain-specific question_type = classify_question(query) # If the question is general, use Gemini to directly answer it if question_type == "general": text = model.generate_content([f"Answer this general question: {query}. If it is a greeting respond accordingly and if it is not greeting add a prefix saying that it is a general query."], stream=True) text.resolve() text = text.text else: # Step 3: Query the RAG model for domain-specific answers results = RAG.search(query, k=3) # Check if RAG found any results if not results: text = model.generate_content([f"Answer this question: {query}"], stream=True) text.resolve() text = text.text text = "It is a general query. ANSWER:" + text else: # Process images from the results image = process_image_from_results(results) # Generate the answer using the Gemini model if an image is found if image: text = generate_answer(query, image) text = "It is a query from NBCC. ANSWER:" + text # Check if the answer is a fallback message (indicating no relevant answer) if any(keyword in text.lower() for keyword in [ "does not provide", "cannot answer", "does not contain", "no relevant answer", "not found", "information unavailable", "not in the document", "unable to provide", "no data", "missing information", "no match", "provided text does not describe", "are not explicitly listed", "are not explicitly mentioned", "no results", "not available", "query not found" ]): # Fallback to Gemini for answering text = model.generate_content([f"Answer this general question in concise manner: {query}"], stream=True) text.resolve() text = text.text text = "It is a general query. ANSWER: " + text else: text = model.generate_content([f"Answer this question: {query}"], stream=True) text.resolve() text = text.text text = "It is a query from NBCC. ANSWER: " + text # Add the model's response to the truncated history truncated_history[-1] = (truncated_history[-1][0], "Model: " + text) # Update the most recent message with model's answer # Return the output text, updated state, and chat history (as tuple pairs) return text, truncated_history, truncated_history # Ensure all three outputs are returned in the correct order import gradio as gr # Define Gradio interface with gr.Blocks() as iface: # Set the conversation state as an empty list state = gr.State([]) # Custom CSS to beautify the interface iface.css = """ .gradio-container { background-color: #f9f9f9; border-radius: 15px; padding: 20px; box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); } .gr-chatbox { background-color: #f0f0f0; border-radius: 10px; padding: 10px; max-height: 1000px; overflow-y: scroll; margin-bottom: 10px; } .gr-textbox input { border-radius: 10px; padding: 12px; font-size: 16px; border: 1px solid #ccc; width: 100%; margin-top: 10px; box-sizing: border-box; } .gr-textbox input:focus { border-color: #4CAF50; outline: none; } .gr-button { background-color: #4CAF50; color: white; padding: 12px; border-radius: 10px; font-size: 16px; border: none; cursor: pointer; } .gr-button:hover { background-color: #45a049; } .gr-chatbot { font-family: "Arial", sans-serif; font-size: 14px; } .gr-chatbot .gr-chatbot-user { background-color: #e1f5fe; border-radius: 10px; padding: 8px; margin-bottom: 10px; max-width: 80%; } .gr-chatbot .gr-chatbot-model { background-color: #ffffff; border-radius: 10px; padding: 8px; margin-bottom: 10px; max-width: 80%; } .gr-chatbot .gr-chatbot-user p, .gr-chatbot .gr-chatbot-model p { margin: 0; } #input_box { position: fixed; bottom: 20px; width: 95%; padding: 10px; border-radius: 10px; box-shadow: 0 0 5px rgba(0, 0, 0, 0.2); } """ # Add an image at the top of the page with gr.Column(): gr.Image("/home/mohammadaqib/Pictures/Screenshots/site.png",height = 300) # Use the image URL gr.Markdown( "# Question Answering System Over National Building Code of Canada" ) # Chatbot UI with gr.Row(): chat_history = gr.Chatbot(label="Chat History", height=250) # Place input at the bottom with gr.Row(): query = gr.Textbox( label="Ask a Question", placeholder="Enter your question here...", lines=1, interactive=True, elem_id="input_box" # Custom ID for styling ) # Output for the response output_text = gr.Textbox(label="Answer", interactive=False, visible=False) # Optional to hide # Define the interaction behavior query.submit( chatbot, inputs=[query, state], outputs=[output_text, state, chat_history], show_progress=True ).then( lambda _: "", # Clear the input after submission inputs=None, outputs=query ) gr.Markdown("

Developed by Mohammad Aqib, MSc Student at the University of Alberta, supervised by Dr. Qipei (Gavin) Mei.

", elem_id="footer") # Launch the interface iface.launch(share=True)