""" Credit to Derek Thomas, derek@huggingface.co """ import os import logging from pathlib import Path from time import perf_counter import gradio as gr from jinja2 import Environment, FileSystemLoader from backend.query_llm import generate_hf, generate_openai, hf_models, openai_models from backend.semantic_search import retrieve import itertools from gradio_client import Client def run_llama(_, msg, *__): client = Client("Be-Bo/llama-3-chatbot_70b") yield client.predict( message=msg, api_name="/chat" ) inf_models = list(hf_models.keys()) + list(openai_models) emb_models = ["bge", "minilm"] splitters = ['ct', 'rct', 'nltk'] chunk_sizes = ["500", "2000"] sub_vectors = ["8", "16", "32"] # Create all combinations of the provided arrays combinations = itertools.product(emb_models, splitters, chunk_sizes, sub_vectors) TOP_K = int(os.getenv("TOP_K", 4)) proj_dir = Path(__file__).parent # Setting up the logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set up the template environment with the templates directory env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) # Load the templates directly from the environment template = env.get_template('template.j2') template_html = env.get_template('template_html.j2') def add_text(history, text): history = [] if history is None else history history = history + [(text, None)] return history, gr.Textbox(value="", interactive=False) def has_balanced_backticks(markdown_str): in_code_block = False lines = markdown_str.split('\n') for line in lines: stripped_line = line.strip() # Check if the line contains triple backticks if stripped_line.startswith("```"): # Toggle the in_code_block flag in_code_block = not in_code_block # If in_code_block is False at the end, all backticks are balanced return not in_code_block def bot(history, model_name, oepnai_api_key, reranker_enabled,reranker_kind,num_prerank_docs, num_docs, model_kind, sub_vector_size, chunk_size, splitter_type, all_at_once): query = history[-1][0] if not query: raise gr.Warning("Please submit a non-empty string as a prompt") logger.info('Retrieving documents...') # Retrieve documents relevant to query document_start = perf_counter() if reranker_enabled and not all_at_once: documents = retrieve(query, int(num_docs), model_kind, sub_vector_size, chunk_size, splitter_type,reranker_kind,num_prerank_docs) else: documents = retrieve(query, int(num_docs), model_kind, sub_vector_size, chunk_size, splitter_type) document_time = perf_counter() - document_start logger.info(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') # Create Prompt prompt = template.render(documents=documents, query=query) prompt_html = template_html.render(documents=documents, query=query) if model_name == "llama 3": generate_fn = run_llama elif model_name in hf_models: generate_fn = generate_hf elif model_name in openai_models: generate_fn = generate_openai else: raise gr.Error(f"Model {model_name} is not supported") history[-1][1] = "" if all_at_once: for emb_model, doc, size, sub_vector in combinations: documents_i = retrieve(query, int(num_docs), emb_model, sub_vector, size, doc) prompt_i = template.render(documents=documents_i, query=query) prompt_html = template_html.render(documents=documents, query=query) hist_chunk = "" prev_hist = history[-1][1] if not has_balanced_backticks(prev_hist): prev_hist += "\n```\n" prev_hist += f"\n\n## model {emb_model}, splitter {doc}, size {size}, sub vector {sub_vector}\n\n" for character in generate_fn(model_name, prompt_i, history[:-1], oepnai_api_key): hist_chunk = character history[-1][1] = prev_hist + hist_chunk yield history, prompt_html else: for character in generate_fn(model_name, prompt, history[:-1], oepnai_api_key): history[-1][1] = character yield history, prompt_html with gr.Blocks() as demo: chatbot = gr.Chatbot( [], elem_id="chatbot", avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', 'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), bubble_full_width=False, show_copy_button=True, show_share_button=True, ) with gr.Row(): txt = gr.Textbox( scale=3, show_label=False, placeholder="Enter text and press enter", container=False, ) txt_btn = gr.Button(value="Submit text", scale=1) with gr.Row(): emb_model_kind = gr.Radio(choices=emb_models, value="minilm", label="embedding model") sub_vector_size = gr.Radio(choices=sub_vectors, value="16", label="sub-vector size") chunk_size = gr.Radio(choices=chunk_sizes, value="2000", label="chunk size") splitter_type = gr.Radio(choices=splitters, value="ct", label="splitter") all_at_once = gr.Checkbox(value=False, label="Run all at once (no reranker)") with gr.Row(): reranker_enabled = gr.Checkbox(value=False, label="Reranker enabled") reranker_kind = gr.Radio(choices=emb_models, value="minilm", label="Reranker model") num_prerank_docs = gr.Slider(5, 80, label="Number of docs before reranker", step=1, value=20) with gr.Row(): num_docs = gr.Slider(1, 20, label="number of docs", step=1, value=4) model_name = gr.Radio(choices=inf_models, value=inf_models[0], label="Chat model") oepnai_api_key = gr.Textbox( show_label=False, placeholder="OpenAI API key", container=False, ) prompt_html = gr.HTML() # Turn off interactivity while generating if you click txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( bot, [chatbot, model_name, oepnai_api_key, reranker_enabled,reranker_kind,num_prerank_docs, num_docs, emb_model_kind, sub_vector_size, chunk_size, splitter_type, all_at_once ], [chatbot, prompt_html]) # Turn it back on txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) # Turn off interactivity while generating if you hit enter txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( bot, [chatbot, model_name, oepnai_api_key, reranker_enabled,reranker_kind,num_prerank_docs, num_docs, emb_model_kind, sub_vector_size, chunk_size, splitter_type ], [chatbot, prompt_html]) # Turn it back on txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) demo.queue() demo.launch(debug=True)