import torch import transformers import gradio as gr from ragatouille import RAGPretrainedModel from huggingface_hub import InferenceClient import re from datetime import datetime import json retrieve_results = 10 show_examples = False generate_kwargs = dict( temperature = None, max_new_tokens = 512, top_p = None, do_sample = False, ) RAG = RAGPretrainedModel.from_index("colbert/indexes/arxiv_colbert") try: gr.Info("Setting up retriever, please wait...") rag_initial_output = RAG.search("what is Mistral?", k = 1) gr.Info("Retriever working successfully!") except: gr.Warning("Retriever not working!") mark_text = '# 🔍 Search Results\n' header_text = "# ArXivCS RAG \n" try: with open("README.md", "r") as f: mdfile = f.read() date_pattern = r'Index Last Updated : \d{4}-\d{2}-\d{2}' match = re.search(date_pattern, mdfile) date = match.group().split(': ')[1] formatted_date = datetime.strptime(date, '%Y-%m-%d').strftime('%d %b %Y') header_text += f'Index Last Updated: {formatted_date}\n' except: pass if show_examples: with open("sample_outputs.json", "r") as f: sample_outputs = json.load(f) output_placeholder = sample_outputs['output_placeholder'] md_text_initial = sample_outputs['search_placeholder'] else: output_placeholder = None md_text_initial = '' def rag_cleaner(inp): rank = inp['rank'] title = inp['document_metadata']['title'] content = inp['content'] return f"{rank}. {title} \n Abstract: {content}" def get_prompt_text(question, context, formatted = True): if formatted: sys_instruction = f"Context:\n {context} \n Given the following scientific paper abstracts, take a deep breath and lets think step by step to answer what the question. Cite the titles of your sources when answering." message = f"Question: {question}" return f"" + f"[INST] {sys_instruction} " + f" {message} [/INST] " return f"Context:\n {context} \n Given the following info, take a deep breath and lets think step by step to answer the question: {question}. Cite the titles of your sources when answering.\n\n" def get_references(question, retriever, k = retrieve_results): rag_out = retriever.search(query=question, k=k) return rag_out def get_rag(message): return get_references(message, RAG) with gr.Blocks(theme = gr.themes.Soft()) as demo: header = gr.Markdown(header_text) with gr.Group(): msg = gr.Textbox(label = 'Search', placeholder = 'What is Mistral?') with gr.Accordion("Advanced Settings", open=False): with gr.Row(equal_height = True): llm_model = gr.Dropdown(choices = ['mistralai/Mixtral-8x7B-Instruct-v0.1','mistralai/Mistral-7B-Instruct-v0.2', 'None'], value = 'mistralai/Mistral-7B-Instruct-v0.2', label = 'LLM Model') llm_results = gr.Slider(minimum=4, maximum=10, value=5, step=1, interactive=True, label="Top n results to sent as context") output_text = gr.Textbox(show_label = True, container = True, label = 'LLM Answer', visible = True, placeholder = output_placeholder) input = gr.Textbox(show_label = False, visible = False) gr_md = gr.Markdown(mark_text + md_text_initial) def update_with_rag_md(message, llm_results_use = 5): rag_out = get_rag(message) md_text_updated = mark_text for i in range(retrieve_results): rag_answer = rag_out[i] title = rag_answer['document_metadata']['title'].replace('\n','') #score = round(rag_answer['score'], 2) date = rag_answer['document_metadata']['_time'] paper_title = f'''### {date} | [{title}](https://arxiv.org/abs/{rag_answer['document_id']}) | [⬇️](https://arxiv.org/pdf/{rag_answer['document_id']})\n''' paper_abs = rag_answer['content'] authors = rag_answer['document_metadata']['authors'].replace('\n','') authors_formatted = f'*{authors}*' + ' \n\n' md_text_updated += paper_title + authors_formatted + paper_abs + '\n---------------\n'+ '\n' prompt = get_prompt_text(message, '\n\n'.join(rag_cleaner(out) for out in rag_out[:llm_results_use])) return md_text_updated, prompt def ask_llm(prompt, llm_model_picked = 'mistralai/Mistral-7B-Instruct-v0.2'): model_disabled_text = "LLM Model is disabled" output = "" if llm_model_picked == 'None': for out in model_disabled_text: output += out yield output return output client = InferenceClient(llm_model_picked) #output = client.text_generation(prompt, **generate_kwargs, stream=False, details=False, return_full_text=False) try: stream = client.text_generation(prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) except: gr.Warning("LLM Inference rate limit reached, try again later!") return "" #output = output.lstrip(' \n') if output.lstrip().startswith('\n') else output for response in stream: output += response.token.text yield output return output #return gr.Textbox(output, visible = True) msg.submit(update_with_rag_md, [msg, llm_results], [gr_md, input]).success(ask_llm, [input, llm_model], output_text) demo.launch(debug = True)