import gradio as gr import requests import os import time from datetime import timedelta from openai import OpenAI from pinecone import Pinecone import uuid import re import pandas as pd from google.cloud import storage from elevenlabs.client import ElevenLabs, AsyncElevenLabs from elevenlabs import play, save, Voice, stream from pymongo.mongo_client import MongoClient from utils import create_folders from gcp import download_credentials from csv import writer import asyncio import httpx from dotenv import load_dotenv load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") MODEL_OPENAI = os.getenv("MODEL_OPENAI") PINECONE_API_TOKEN = os.getenv("PINECONE_API_TOKEN") PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENV") PINECONE_HOST = os.getenv("PINECONE_HOST") DB_USER_NAME = os.getenv("DB_USER_NAME") DB_PASSWORD = os.getenv("DB_PASSWORD") API_KEY_ELEVENLABS = os.getenv("API_KEY_ELEVENLABS") D_ID_KEY = os.getenv("D_ID_KEY") IMG_XAVY = os.getenv("IMG_XAVY") CREDENTIALS_GCP = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") NAME_BUCKET = os.getenv("NAME_BUCKET") URL_AUDIO = os.getenv("URL_AUDIO") # Chat openai_client = OpenAI(api_key=OPENAI_API_KEY) # Vector store pc = Pinecone(api_key=PINECONE_API_TOKEN) index = pc.Index(host=PINECONE_HOST) # Database uri = f"mongodb+srv://{DB_USER_NAME}:{DB_PASSWORD}@cluster-rob01.3fpztfw.mongodb.net/?retryWrites=true&w=majority&appName=cluster-rob01" client = MongoClient(uri) db = client["ChatCrunchyroll"] collection = db["history_msg"] def _save_history_msg(): return None def _add_question_vectorstore(question: str, response: str): vector_id = str(uuid.uuid4()) vector_embedding = _call_embedding(question) vector_metadata = { 'question': question, 'text': response } index.upsert([(vector_id, vector_embedding, vector_metadata)]) def _update_elements(question, chatbot, output, history_messages, url_audio, url_video, df_table_times): chatbot.append([question, output]) new_comp_audio = gr.Audio(value=str(url_audio), autoplay=False, label="Audio") new_comp_video = gr.Video(value=str(url_video), autoplay=True, height=400, label="Video") history_messages.append({'role': 'user', 'content': question}) history_messages.append({'role': 'assistant', 'content': output}) return chatbot, new_comp_audio, new_comp_video, df_table_times def _query_pinecone(embedding): results = index.query( vector=embedding, top_k=10, include_metadata=True, ) final_results = """""" for result in results['matches']: final_results += f"{result['metadata']['text']}\n" return final_results def _general_prompt(context, option_prompt, general_prompt): if option_prompt == "Default": with open("prompt_general.txt", "r") as file: file_prompt = file.read().replace("\n", "") elif option_prompt == "Custom": file_prompt = general_prompt context_prompt = file_prompt.replace('CONTEXT', context) print(context_prompt) print("--------------------") return context_prompt def _call_embedding(text: str): response = openai_client.embeddings.create( input=text, model='text-embedding-ada-002' ) return response.data[0].embedding def _call_gpt(prompt: str, message: str): response = openai_client.chat.completions.create( model=MODEL_OPENAI, temperature=0.2, messages=[ {'role': 'system', 'content': prompt}, {'role': 'user', 'content': message} ] ) return response.choices[0].message.content def _call_gpt_standalone(prompt: str): response = openai_client.chat.completions.create( model=MODEL_OPENAI, temperature=0.2, messages=[ {'role': 'system', 'content': prompt}, ] ) return response.choices[0].message.content def _get_standalone_question(question, history_messages, option_prompt, standalone_prompt): if option_prompt == "Default": with open("prompt_standalone_message.txt", "r") as file: file_prompt_standalone = file.read().replace("\n", "") elif option_prompt == "Custom": file_prompt_standalone = standalone_prompt history = '' for i, msg in enumerate(history_messages): try: if i == 0: continue # Omit the prompt if i % 2 == 0: history += f'user: {msg["content"]}\n' else: history += f'assistant: {msg["content"]}\n' except Exception as e: print(e) prompt_standalone = file_prompt_standalone.replace('HISTORY', history).replace('QUESTION', question) print(prompt_standalone) print("------------------") standalone_msg_q = _call_gpt_standalone(prompt_standalone) print(standalone_msg_q) print("------------------") return standalone_msg_q def _create_clean_message(text: str): clean_answer = re.sub(r'http[s]?://\S+', 'el siguiente link', text) return clean_answer async def _create_audio(clean_text: str, option_audio: str): download_credentials() create_folders() STORAGE_CLIENT = storage.Client.from_service_account_json(CREDENTIALS_GCP) unique_id = str(uuid.uuid4()) signed_url_audio = "None" if option_audio == "Elevenlabs": # Create audio file with elevenlabs client_elevenlabs = ElevenLabs(api_key=API_KEY_ELEVENLABS) voice_custom = Voice(voice_id = "ZQe5CZNOzWyzPSCn5a3c") audio = client_elevenlabs.generate( text=clean_text, voice=voice_custom, model="eleven_multilingual_v2" ) source_audio_file_name = f'./audios/file_audio_{unique_id}.wav' try: save(audio, source_audio_file_name) except Exception as e: print(e) # Save audio and get url of gcp destination_blob_name_audio = unique_id + '.wav' bucket = STORAGE_CLIENT.bucket(NAME_BUCKET) blob = bucket.blob(destination_blob_name_audio) try: blob.upload_from_filename(source_audio_file_name) except Exception as e: print(e) try: url_expiration = timedelta(minutes=15) signed_url_audio = blob.generate_signed_url(expiration=url_expiration) except Exception as e: print(e) elif option_audio == "XTTS": params = {'text': clean_text, 'language': 'es'} headers = {'accept': 'application/json'} # Makes a request to the instance with the audio api async with httpx.AsyncClient() as client: try: response = await client.get(URL_AUDIO, params=params, headers=headers, timeout=120) except Exception as e: print(f'There is a problem with the audio. Check that instance. ERROR: {e}') # Check if everything was successful if response.status_code == 200: r = response.json() signed_url_audio = r['link_audio'] else: print(f'There is a problem with the audio. Check that instance. ERROR: {response.status_code}') return signed_url_audio, unique_id def _create_video(link_audio: str, unique_id: str): download_credentials() create_folders() STORAGE_CLIENT = storage.Client.from_service_account_json(CREDENTIALS_GCP) bucket = STORAGE_CLIENT.bucket(NAME_BUCKET) # Create video talk with file audio created by elevenlabs api url_did = "https://api.d-id.com/talks" payload = { "script": { "type": "audio", "provider": { "type": "microsoft", "voice_id": "en-US-JennyNeural" }, "ssml": "false", "audio_url": link_audio }, "config": { "fluent": "false", "pad_audio": "0.0", "stitch": True }, "source_url": IMG_XAVY } headers = { "accept": "application/json", "content-type": "application/json", "authorization": f"Basic {D_ID_KEY}" } request_create_talk = requests.post(url_did, json=payload, headers=headers) resp_create_talk = request_create_talk.json() talk_id = "None" try: talk_id = resp_create_talk['id'] except Exception as e: print(e) # Get url of video file url_get_talk_id = f"https://api.d-id.com/talks/{talk_id}" while True: request_video_url = requests.get(url_get_talk_id, headers=headers) resp_video_url = request_video_url.json() if resp_video_url['status'] == 'done': break # Sleep until the video is ready time.sleep(0.5) result_url_video = resp_video_url['result_url'] # Saves the video into a file to later upload it to the GCP source_video_file_name = f'./videos/video_final_{unique_id}.mp4' request_video = requests.get(result_url_video) if request_video.status_code == 200: with open(source_video_file_name, 'wb') as outfile: outfile.write(request_video.content) # Save video file to the GCP destination_blob_name_video = unique_id + '.mp4' # Configure bucket blob = bucket.blob(destination_blob_name_video) try: blob.upload_from_filename(source_video_file_name) except Exception as e: print(e) signed_url_video = "None" try: url_expiration_video = timedelta(minutes=15) signed_url_video = blob.generate_signed_url(expiration=url_expiration_video) except Exception as e: print(e) return signed_url_video def get_answer(question: str, chatbot: list[tuple[str, str]], history_messages, comp_audio, comp_video, df_table, option_audio, option_prompt, general_prompt, standalone_prompt): """ Gets the answer of the chatbot """ if len(chatbot) == 8: message_output = 'Un placer haberte ayudado, hasta luego!' else: start_get_standalone_question = time.time() standalone_msg_q = _get_standalone_question(question, history_messages, option_prompt, standalone_prompt) # create standalone question or message end_get_standalone_question = time.time() time_get_standalone_question = end_get_standalone_question - start_get_standalone_question start_call_embedding = time.time() output_embedding = _call_embedding(standalone_msg_q) # create embedding of standalone question or message end_call_embedding = time.time() time_call_embedding = end_call_embedding - start_call_embedding start_query_pinecone = time.time() best_results = _query_pinecone(output_embedding) # get nearest embeddings end_query_pinecone = time.time() time_query_pinecone = end_query_pinecone - start_query_pinecone start_general_prompt = time.time() final_context_prompt = _general_prompt(best_results, option_prompt, general_prompt) # create context/general prompt end_general_prompt = time.time() time_general_prompt = end_general_prompt - start_general_prompt start_call_gpt = time.time() message_output = _call_gpt(final_context_prompt, question) # final response (to user) end_call_gpt = time.time() time_call_gpt = end_call_gpt - start_call_gpt if "Respuesta:" in message_output: message_output.replace("Respuesta:", "") start_create_clean_message = time.time() processed_message = _create_clean_message(message_output) # clean message output end_create_clean_message = time.time() time_create_clean_message = end_create_clean_message - start_create_clean_message start_create_audio = time.time() url_audio, unique_id = asyncio.run(_create_audio(processed_message, option_audio)) # create audio end_create_audio = time.time() time_create_audio = end_create_audio - start_create_audio start_create_video = time.time() url_video = _create_video(url_audio, unique_id) # create video with d-id no streaming end_create_video = time.time() time_create_video = end_create_video - start_create_video final_time = time_get_standalone_question + time_call_embedding + time_query_pinecone + time_general_prompt final_time += (time_call_gpt + time_create_clean_message + time_create_audio + time_create_video) df_table = pd.DataFrame(df_table) df_table.loc[len(df_table.index)] = [question, message_output, time_get_standalone_question, time_call_embedding, time_query_pinecone, time_general_prompt, time_call_gpt, time_create_clean_message, time_create_audio, time_create_video, final_time] new_df_table = gr.DataFrame(df_table, interactive=False, visible=True) print(history_messages) return _update_elements(question, chatbot, message_output, history_messages, url_audio, url_video, new_df_table) def init_greeting(chatbot, history_messages): if len(chatbot) == 0: greeting = ('Hola 👋, soy tu asistente de recomendación de series y películas animadas en Crunchyroll. ¿En qué puedo ayudarte hoy?') history_messages.append({'role': 'assistant', 'content': greeting}) chatbot.append([None, greeting]) return chatbot, history_messages def export_dataframe(df): final_df = pd.DataFrame(df) final_df = final_df.iloc[1:] final_df.to_csv("./csv_times/csv_times.csv", index=False, encoding='utf-8') return gr.File(value="./csv_times/csv_times.csv", visible=True)