import os import pinecone import gradio as gr from openai import OpenAI from typing import Callable import google.generativeai as genai from huggingface_hub import hf_hub_download def download_prompt(name_prompt: str) -> str: """ Downloads prompt from HuggingFace Hub :param name_prompt: name of the file :return: text of the file """ hf_hub_download( repo_id=os.environ.get('DATA'), repo_type='dataset', filename=f"{name_prompt}.txt", token=os.environ.get('HUB_TOKEN'), local_dir="prompts" ) with open(f'prompts/{name_prompt}.txt', mode='r', encoding='utf-8') as infile: prompt = infile.read() return prompt def start_chat(model: str) -> tuple[gr.helpers, gr.helpers, gr.helpers, gr.helpers]: """ Shows the chatbot interface and hides the selection of the model. Returns gradio helpers (gr.update()) :param model: name of the model to use :return: visible=False, visible=True, visible=True, value=selected_model """ no_visible = gr.update(visible=False) visible = gr.update(visible=True) title = gr.update(value=f"# {model}") return no_visible, visible, visible, title def restart_chat() -> tuple[gr.helpers, gr.helpers, gr.helpers, list, str]: """ Shows the selection of the model, hides the chatbot interface and restarts the chatbot. Returns gradio helpers (gr.update()) :return: visible=True, visible=False, visible=False, empty list, empty string """ no_visible = gr.update(visible=False) visible = gr.update(visible=True) return visible, no_visible, no_visible, [], "" def get_answer(chatbot: list[tuple[str, str]], message: str, model: str) -> tuple[list[tuple[str, str]], str]: """ Calls the model and returns the answer :param chatbot: message history :param message: user input :param model: name of the model :return: chatbot answer """ # Setup which function will be called (depends on the model) if COMPANIES[model]['real name'] == 'Gemini': call_model = _call_google else: call_model = _call_openai # Get standalone question standalone_question = _get_standalone_question(chatbot, message, call_model) # Get context context = _get_context(standalone_question) # Get answer from the Chatbot prompt = PROMPT_GENERAL.replace('CONTEXT', context) answer = call_model(prompt, chatbot, message) # Add the new answer to the history chatbot.append((message, answer)) return chatbot, "" def _get_standalone_question( chat_history: list[tuple[str, str]], message: str, call_model: Callable[[str, list, str], str] ) -> str: """ To get a better context a standalone question is obtained for each question :param chat_history: message history :param message: user input :param call_model: name of the model :return: standalone phrase """ # Format the message history like: Human: blablablá \nAssistant: blablablá history = '' for i, (user, bot) in enumerate(chat_history): if i == 0: history += f'Assistant: {bot}\n' else: history += f'Human: {user}\n' history += f'Assistant: {bot}\n' # Add history and question to the prompt prompt = PROMPT_STANDALONE.replace('HISTORY', history) question = f'Follow-up message: {message}' return call_model(prompt, [], question) def _get_embedding(text: str) -> list[float]: """ :param text: input text :return: embedding """ response = OPENAI_CLIENT.embeddings.create( input=text, model='text-embedding-ada-002' ) return response.data[0].embedding def _get_context(question: str) -> str: """ Get the 10 nearest vectors to the given input :param question: standalone question :return: formatted context with the nearest vectors """ result = INDEX.query( vector=_get_embedding(question), top_k=10, include_metadata=True, namespace=f'{CLIENT}-context' )['matches'] context = '' for r in result: context += r['metadata']['Text'] + '\n\n' return context def _call_openai(prompt: str, chat_history: list[tuple[str, str]], question: str) -> str: """ Calls ChatGPT 4 :param prompt: prompt with the context or the question (in the case of the standalone one) :param chat_history: history of the conversation :param question: user input :return: chatbot answer """ # Format the message history to the one used by OpenAI msg_history = [{'role': 'system', 'content': prompt}] for i, (user, bot) in enumerate(chat_history): msg_history.append({'role': 'user', 'content': user}) msg_history.append({'role': 'assistant', 'content': bot}) msg_history.append({'role': 'user', 'content': question}) # Call ChatGPT 4 response = OPENAI_CLIENT.chat.completions.create( model='gpt-4-turbo-preview', temperature=0.5, messages=msg_history ) return response.choices[0].message.content def _call_google(prompt: str, chat_history: list[tuple[str, str]], question: str) -> str: """ Calls Gemini :param prompt: prompt with the context or the question (in the case of the standalone one) :param chat_history: history of the conversation :param question: user input :return: chatbot answer """ # Format the message history to the one used by Google history = [ {'role': 'user', 'parts': [prompt]}, {'role': 'model', 'parts': ['Excelente! Estoy super lista para ayudarte en lo que necesites']} ] for i, (user, bot) in enumerate(chat_history): history.append({'role': 'user', 'parts': [user]}) history.append({'role': 'model', 'parts': [bot]}) convo = GEMINI.start_chat(history=history) # Call Gemini convo.send_message(question) return convo.last.text # ----------------------------------------- Setup constants and models ------------------------------------------------ OPENAI_CLIENT = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) genai.configure(api_key=os.getenv("GEMINI_API_KEY")) pinecone.init(api_key=os.getenv('PINECONE_API_KEY'), environment=os.getenv("PINECONE_ENVIRONMENT")) INDEX = pinecone.Index(os.getenv('PINECONE_INDEX')) CLIENT = os.getenv('CLIENT') # Setup Gemini generation_config = { "temperature": 0.9, "top_p": 1, "top_k": 1, "max_output_tokens": 2048, } safety_settings = [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_ONLY_HIGH" }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, ] GEMINI = genai.GenerativeModel( model_name="gemini-1.0-pro", generation_config=generation_config, safety_settings=safety_settings ) # Download and open prompts from HuggingFace Hub os.makedirs('prompts', exist_ok=True) PROMPT_STANDALONE = download_prompt('standalone') PROMPT_GENERAL = download_prompt('general') # Constants used in the app COMPANIES = { 'Model G': {'company': 'Google', 'real name': 'Gemini'}, 'Model C': {'company': 'OpenAI', 'real name': 'ChatGPT 4'}, } MODELS = list(COMPANIES.keys())