import json import os import openai from audio_utils import text_to_speech_polly from openai_utils import get_embedding, whisper_transcription from vector_db import LanceVectorDb, QnA db = LanceVectorDb("qna_db") OPENAI_KEY = os.environ["OPENAI_KEY"] openai.api_key = OPENAI_KEY if not db.table or len(db.table.to_pandas()) == 0: print("Empty db, trying to load qna's from json file") try: db.init_from_qna_json("all_questions_audio.json") print("Initialized db from json file") except Exception as exception: raise Exception("Failed to initialize db from json file") from exception import os def ensure_dir(directory): if not os.path.exists(directory): os.makedirs(directory) ensure_dir("audio_temp") import random from langdetect import detect def red(text): return f'\x1b[31m"{text}"\x1b[0m' def query_database(prompt: str, filters: dict = {}): print("Querying database for question:", prompt) embedding = get_embedding(prompt) qnas = db.get_qna(embedding, filters=filters, limit=3) print("Total_qnas:", len(qnas), [qna.score for qna in qnas]) qnas = [qna for qna in qnas if qna.score < 0.49] print("Filtered_qnas:", len(qnas)) return qnas available_functions = { "query_database": query_database, } conversation_folder = f"conversations/{random.randint(0, 10000)}" ensure_dir(conversation_folder) print("Conversation", conversation_folder) SYSTEM_PROMPT = ( "You are a question answering assistant.\n" "You answer questions from users delimited by tripple dashes --- based on information in our database provided as context.\n" "The context informtion in delimited by tripple backticks ```\n" "You try to be concise and offer the most relevant information.\n" "You answer in the language that the question was asked in.\n" "You speak german and english.\n" ) step = 0 def context_format(qnas): context = "Context:\n\n```" for qna in qnas: context += f"For question: {qna.question}\nThe answer is: {qna.answer}\n" context += "```" return context def bot_respond(user_query, history: list): global step chat_messages = history["chat_messages"] qnas = query_database(user_query) # Try to match an already existing question if any(qna.score < 0.15 for qna in qnas): min_score = min(qna.score for qna in qnas) qna_minscore = [qna for qna in qnas if qna.score == min_score][0] uid: str = qna_minscore.uid mp3_path = os.path.join("audio", f"{uid}.mp3") if not os.path.exists(mp3_path): text_to_speech_polly(qna_minscore.answer, qna_minscore.language, mp3_path) chat_messages.append({"role": "user", "content": user_query}) chat_messages.append({"role": "assistant", "content": qna_minscore.answer}) return { "type": "cached_response", "mp3_path": mp3_path, "bot_response": qna_minscore.answer, "prompt": "No chatbot response, cached response from database", } # Search only the base images qnas = query_database(user_query, filters={"source": "base"}) # Use chatgpt to answer the question path = os.path.join(conversation_folder, f"step_{step}_qna.json") prompt = f"The user said: ---{user_query}---\n\n" context = context_format(qnas) prompt += context chat_messages.append({"role": "user", "content": user_query}) completion = openai.ChatCompletion.create( model="gpt-4", messages=chat_messages, temperature=0 ) response_message = completion["choices"][0]["message"] bot_response = response_message.content path = os.path.join(conversation_folder, f"step_{step}_qna.json") # remove the last message chat_messages.pop(-1) chat_messages.append({"role": "user", "content": user_query}) chat_messages.append({"role": "assistant", "content": bot_response}) with open(path, "w") as f: json.dump( { "chat_messages": chat_messages, "response": response_message.content, }, f, indent=4, ) step += 1 data = { "type": "openai", "bot_response": bot_response, "prompt": prompt, } return data def add_question(question): if os.path.exists("runtime_questions.json"): with open("runtime_questions.json") as f: questions = json.load(f) else: questions = [] questions.append(question) with open("runtime_questions.json", "w") as f: json.dump(questions, f, indent=4, ensure_ascii=False) import random def display_history(conversation): conversation_string = "" for message in conversation: conversation_string += ( f"<<{message['role']}>>:\n{message['content']}\n<<{message['role']}>>\n\n" ) return conversation_string if not os.path.exists("runtime_questions.json"): with open("runtime_questions.json", "w") as f: json.dump([], f) def handle_audiofile(audio_filepath: str, history: list): user_question = whisper_transcription(audio_filepath) print("Transcription", user_question) res = bot_respond(user_question, history) if res["type"] == "cached_response": return ( user_question, res["bot_response"], history, res["prompt"], display_history(history["chat_messages"]), res["mp3_path"], ) else: bot_response_text = res["bot_response"] prompt = res["prompt"] if bot_response_text: lang = detect(bot_response_text) print("Detected language:", lang, "for text:", bot_response_text) else: lang = "en" add_question( {"question": user_question, "answer": bot_response_text, "language": lang} ) if lang not in ["en", "de"]: lang = "en" output_filepath = os.path.join( "audio_temp", f"output_{random.randint(0, 1000)}.mp3" ) text_to_speech_polly(bot_response_text, lang, output_filepath) context_prompt = prompt context_prompt += f"<> : {lang}\n" context_prompt += f"<> : {bot_response_text}\n" return ( user_question, bot_response_text, history, context_prompt, display_history(history["chat_messages"]), output_filepath, "runtime_questions.json", ) import gradio as gr with gr.Blocks() as demo: # initialize the state that will be used to store the chat messages chat_messages = gr.State( { "chat_messages": [{"role": "system", "content": SYSTEM_PROMPT}], } ) with gr.Row(): audio_input = gr.Audio(source="microphone", type="filepath", format="mp3") # autoplay=True => run the output audio file automatically output_audio = gr.Audio(label="PhoneBot Answer TTS", autoplay=True) with gr.Row(): user_query_textbox = gr.Textbox(label="User Query") assistant_answer = gr.Textbox(label="PhoneBot Answer") with gr.Row(): context_info = gr.Textbox( label="Context provided to the bot + additional infos for debugging" ) conversation_history = gr.Textbox(label="Conversation history") with gr.Row(): file_output = gr.File(label="Download questions file", download=True) # when the audio input is stopped, run the transcribe function audio_input.stop_recording( handle_audiofile, inputs=[audio_input, chat_messages], outputs=[ user_query_textbox, assistant_answer, chat_messages, context_info, conversation_history, output_audio, file_output, ], ) username = os.environ["GRADIO_USERNAME"] password = os.environ["GRADIO_PASSWORD"] # lunch app demo.launch(auth=(username, password))