Spaces:
Runtime error
Runtime error
import gradio as gr | |
import requests | |
import os | |
import time | |
from datetime import timedelta | |
from openai import OpenAI | |
from pinecone import Pinecone | |
import uuid | |
import re | |
import pandas as pd | |
import tensorflow as tf | |
from utils import create_folders | |
from google.cloud import storage | |
from elevenlabs.client import ElevenLabs, AsyncElevenLabs | |
from elevenlabs import play, save, Voice, stream | |
from pymongo.mongo_client import MongoClient | |
from gcp import download_credentials | |
from dotenv import load_dotenv | |
load_dotenv() | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
MODEL_OPENAI = os.getenv("MODEL_OPENAI") | |
PINECONE_API_TOKEN = os.getenv("PINECONE_API_TOKEN") | |
PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENV") | |
PINECONE_HOST = os.getenv("PINECONE_HOST") | |
DB_USER_NAME = os.getenv("DB_USER_NAME") | |
DB_PASSWORD = os.getenv("DB_PASSWORD") | |
API_KEY_ELEVENLABS = os.getenv("API_KEY_ELEVENLABS") | |
D_ID_KEY = os.getenv("D_ID_KEY") | |
IMG_XAVY = os.getenv("IMG_XAVY") | |
CREDENTIALS_GCP = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") | |
NAME_BUCKET = os.getenv("NAME_BUCKET") | |
# Chat | |
openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
# Vector store | |
pc = Pinecone(api_key=PINECONE_API_TOKEN) | |
index = pc.Index(host=PINECONE_HOST) | |
# Database | |
uri = f"mongodb+srv://{DB_USER_NAME}:{DB_PASSWORD}@cluster-rob01.3fpztfw.mongodb.net/?retryWrites=true&w=majority&appName=cluster-rob01" | |
client = MongoClient(uri) | |
db = client["ChatCrunchyroll"] | |
collection = db["history_msg"] | |
def _save_history_msg(): | |
return None | |
def _add_question_vectorstore(question: str, response: str): | |
vector_id = str(uuid.uuid4()) | |
vector_embedding = _call_embedding(question) | |
vector_metadata = { | |
'question': question, | |
'text': response | |
} | |
index.upsert([(vector_id, vector_embedding, vector_metadata)]) | |
def _update_elements(question, chatbot, output, history_messages, url_audio, url_video, df_table_times): | |
if tf.test.is_gpu_available(): | |
chatbot.append([question, output]) | |
new_comp_audio = gr.Audio(value=str(url_audio), autoplay=False, label="Audio") | |
new_comp_video = gr.Video(value=str(url_video), autoplay=True, height=400, label="Video") | |
history_messages.append({'role': 'user', 'content': question}) | |
history_messages.append({'role': 'assistant', 'content': output}) | |
else: | |
chatbot.append([question, output]) | |
new_comp_audio = gr.Audio(value=str(url_audio), autoplay=False, label="Audio") | |
new_comp_video = gr.Video(value=str(url_video), autoplay=True, height=400, label="Video") | |
history_messages.append({'role': 'user', 'content': question}) | |
history_messages.append({'role': 'assistant', 'content': output}) | |
return chatbot, new_comp_audio, new_comp_video, df_table_times | |
def _query_pinecone(embedding): | |
if tf.test.is_gpu_available(): | |
results = index.query( | |
vector=embedding, | |
top_k=10, | |
include_metadata=True, | |
) | |
final_results = """""" | |
for result in results['matches']: | |
final_results += f"{result['metadata']['text']}\n" | |
else: | |
results = index.query( | |
vector=embedding, | |
top_k=10, | |
include_metadata=True, | |
) | |
final_results = """""" | |
for result in results['matches']: | |
final_results += f"{result['metadata']['text']}\n" | |
return final_results | |
def _general_prompt(context): | |
if tf.test.is_gpu_available(): | |
with open("prompt_general.txt", "r") as file: | |
file_prompt = file.read().replace("\n", "") | |
context_prompt = file_prompt.replace('CONTEXT', context) | |
print(context_prompt) | |
print("--------------------") | |
else: | |
with open("prompt_general.txt", "r") as file: | |
file_prompt = file.read().replace("\n", "") | |
context_prompt = file_prompt.replace('CONTEXT', context) | |
print(context_prompt) | |
print("--------------------") | |
return context_prompt | |
def _call_embedding(text: str): | |
if tf.test.is_gpu_available(): | |
response = openai_client.embeddings.create( | |
input=text, | |
model='text-embedding-ada-002' | |
) | |
else: | |
response = openai_client.embeddings.create( | |
input=text, | |
model='text-embedding-ada-002' | |
) | |
return response.data[0].embedding | |
def _call_gpt(prompt: str, message: str): | |
if tf.test.is_gpu_available(): | |
response = openai_client.chat.completions.create( | |
model=MODEL_OPENAI, | |
temperature=0.2, | |
messages=[ | |
{'role': 'system', 'content': prompt}, | |
{'role': 'user', 'content': message} | |
] | |
) | |
else: | |
response = openai_client.chat.completions.create( | |
model=MODEL_OPENAI, | |
temperature=0.2, | |
messages=[ | |
{'role': 'system', 'content': prompt}, | |
{'role': 'user', 'content': message} | |
] | |
) | |
return response.choices[0].message.content | |
def _call_gpt_standalone(prompt: str): | |
if tf.test.is_gpu_available(): | |
response = openai_client.chat.completions.create( | |
model=MODEL_OPENAI, | |
temperature=0.2, | |
messages=[ | |
{'role': 'system', 'content': prompt}, | |
] | |
) | |
else: | |
response = openai_client.chat.completions.create( | |
model=MODEL_OPENAI, | |
temperature=0.2, | |
messages=[ | |
{'role': 'system', 'content': prompt}, | |
] | |
) | |
return response.choices[0].message.content | |
def _get_standalone_question(question, history_messages): | |
if tf.test.is_gpu_available(): | |
with open("prompt_standalone_message.txt", "r") as file: | |
file_prompt_standalone = file.read().replace("\n", "") | |
history = '' | |
for i, msg in enumerate(history_messages): | |
try: | |
if i == 0: | |
continue # Omit the prompt | |
if i % 2 == 0: | |
history += f'user: {msg["content"]}\n' | |
else: | |
history += f'assistant: {msg["content"]}\n' | |
except Exception as e: | |
print(e) | |
prompt_standalone = file_prompt_standalone.replace('HISTORY', history).replace('QUESTION', question) | |
standalone_msg_q = _call_gpt_standalone(prompt_standalone) | |
print(standalone_msg_q) | |
print("------------------") | |
else: | |
with open("prompt_standalone_message.txt", "r") as file: | |
file_prompt_standalone = file.read().replace("\n", "") | |
history = '' | |
for i, msg in enumerate(history_messages): | |
try: | |
if i == 0: | |
continue # Omit the prompt | |
if i % 2 == 0: | |
history += f'user: {msg["content"]}\n' | |
else: | |
history += f'assistant: {msg["content"]}\n' | |
except Exception as e: | |
print(e) | |
prompt_standalone = file_prompt_standalone.replace('HISTORY', history).replace('QUESTION', question) | |
standalone_msg_q = _call_gpt_standalone(prompt_standalone) | |
print(standalone_msg_q) | |
print("------------------") | |
return standalone_msg_q | |
def _create_clean_message(text: str): | |
clean_answer = re.sub(r'http[s]?://\S+', 'el siguiente link', text) | |
return clean_answer | |
def _create_audio(clean_text: str): | |
if tf.test.is_gpu_available(): | |
download_credentials() | |
create_folders() | |
STORAGE_CLIENT = storage.Client.from_service_account_json(CREDENTIALS_GCP) | |
unique_id = str(uuid.uuid4()) | |
# Create audio file | |
client_elevenlabs = ElevenLabs(api_key=API_KEY_ELEVENLABS) | |
voice_custom = Voice(voice_id = "ZQe5CZNOzWyzPSCn5a3c") | |
audio = client_elevenlabs.generate( | |
text=clean_text, | |
voice=voice_custom, | |
model="eleven_multilingual_v2" | |
) | |
source_audio_file_name = f'./audios/file_audio_{unique_id}.wav' | |
try: | |
save(audio, source_audio_file_name) | |
except Exception as e: | |
print(e) | |
# Save audio and get url of gcp | |
destination_blob_name_audio = unique_id + '.wav' | |
bucket = STORAGE_CLIENT.bucket(NAME_BUCKET) | |
blob = bucket.blob(destination_blob_name_audio) | |
try: | |
blob.upload_from_filename(source_audio_file_name) | |
except Exception as e: | |
print(e) | |
signed_url_audio = "None" | |
try: | |
url_expiration = timedelta(minutes=15) | |
signed_url_audio = blob.generate_signed_url(expiration=url_expiration) | |
except Exception as e: | |
print(e) | |
else: | |
download_credentials() | |
create_folders() | |
STORAGE_CLIENT = storage.Client.from_service_account_json(CREDENTIALS_GCP) | |
unique_id = str(uuid.uuid4()) | |
# Create audio file | |
client_elevenlabs = ElevenLabs(api_key=API_KEY_ELEVENLABS) | |
voice_custom = Voice(voice_id = "ZQe5CZNOzWyzPSCn5a3c") | |
audio = client_elevenlabs.generate( | |
text=clean_text, | |
voice=voice_custom, | |
model="eleven_multilingual_v2" | |
) | |
source_audio_file_name = f'./audios/file_audio_{unique_id}.wav' | |
try: | |
save(audio, source_audio_file_name) | |
except Exception as e: | |
print(e) | |
# Save audio and get url of gcp | |
destination_blob_name_audio = unique_id + '.wav' | |
bucket = STORAGE_CLIENT.bucket(NAME_BUCKET) | |
blob = bucket.blob(destination_blob_name_audio) | |
try: | |
blob.upload_from_filename(source_audio_file_name) | |
except Exception as e: | |
print(e) | |
signed_url_audio = "None" | |
try: | |
url_expiration = timedelta(minutes=15) | |
signed_url_audio = blob.generate_signed_url(expiration=url_expiration) | |
except Exception as e: | |
print(e) | |
return signed_url_audio, unique_id | |
def _create_video(link_audio: str, unique_id: str): | |
if tf.test.is_gpu_available(): | |
STORAGE_CLIENT = storage.Client.from_service_account_json(CREDENTIALS_GCP) | |
bucket = STORAGE_CLIENT.bucket(NAME_BUCKET) | |
# Create video talk with file audio created by elevenlabs api | |
url_did = "https://api.d-id.com/talks" | |
payload = { | |
"script": { | |
"type": "audio", | |
"provider": { | |
"type": "microsoft", | |
"voice_id": "en-US-JennyNeural" | |
}, | |
"ssml": "false", | |
"audio_url": link_audio | |
}, | |
"config": { | |
"fluent": "false", | |
"pad_audio": "0.0", | |
"stitch": True | |
}, | |
"source_url": IMG_XAVY | |
} | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"authorization": f"Basic {D_ID_KEY}" | |
} | |
request_create_talk = requests.post(url_did, json=payload, headers=headers) | |
resp_create_talk = request_create_talk.json() | |
talk_id = "None" | |
try: | |
talk_id = resp_create_talk['id'] | |
except Exception as e: | |
print(e) | |
# Get url of video file | |
url_get_talk_id = f"https://api.d-id.com/talks/{talk_id}" | |
while True: | |
request_video_url = requests.get(url_get_talk_id, headers=headers) | |
resp_video_url = request_video_url.json() | |
if resp_video_url['status'] == 'done': | |
break | |
# Sleep until the video is ready | |
time.sleep(0.5) | |
result_url_video = resp_video_url['result_url'] | |
# Saves the video into a file to later upload it to the GCP | |
source_video_file_name = f'./videos/video_final_{unique_id}.mp4' | |
request_video = requests.get(result_url_video) | |
if request_video.status_code == 200: | |
with open(source_video_file_name, 'wb') as outfile: | |
outfile.write(request_video.content) | |
# Save video file to the GCP | |
destination_blob_name_video = unique_id + '.mp4' | |
# Configure bucket | |
blob = bucket.blob(destination_blob_name_video) | |
try: | |
blob.upload_from_filename(source_video_file_name) | |
except Exception as e: | |
print(e) | |
signed_url_video = "None" | |
try: | |
url_expiration_video = timedelta(minutes=15) | |
signed_url_video = blob.generate_signed_url(expiration=url_expiration_video) | |
except Exception as e: | |
print(e) | |
else: | |
STORAGE_CLIENT = storage.Client.from_service_account_json(CREDENTIALS_GCP) | |
bucket = STORAGE_CLIENT.bucket(NAME_BUCKET) | |
# Create video talk with file audio created by elevenlabs api | |
url_did = "https://api.d-id.com/talks" | |
payload = { | |
"script": { | |
"type": "audio", | |
"provider": { | |
"type": "microsoft", | |
"voice_id": "en-US-JennyNeural" | |
}, | |
"ssml": "false", | |
"audio_url": link_audio | |
}, | |
"config": { | |
"fluent": "false", | |
"pad_audio": "0.0", | |
"stitch": True | |
}, | |
"source_url": IMG_XAVY | |
} | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"authorization": f"Basic {D_ID_KEY}" | |
} | |
request_create_talk = requests.post(url_did, json=payload, headers=headers) | |
resp_create_talk = request_create_talk.json() | |
talk_id = "None" | |
try: | |
talk_id = resp_create_talk['id'] | |
except Exception as e: | |
print(e) | |
# Get url of video file | |
url_get_talk_id = f"https://api.d-id.com/talks/{talk_id}" | |
while True: | |
request_video_url = requests.get(url_get_talk_id, headers=headers) | |
resp_video_url = request_video_url.json() | |
if resp_video_url['status'] == 'done': | |
break | |
# Sleep until the video is ready | |
time.sleep(0.5) | |
result_url_video = resp_video_url['result_url'] | |
# Saves the video into a file to later upload it to the GCP | |
source_video_file_name = f'./videos/video_final_{unique_id}.mp4' | |
request_video = requests.get(result_url_video) | |
if request_video.status_code == 200: | |
with open(source_video_file_name, 'wb') as outfile: | |
outfile.write(request_video.content) | |
# Save video file to the GCP | |
destination_blob_name_video = unique_id + '.mp4' | |
# Configure bucket | |
blob = bucket.blob(destination_blob_name_video) | |
try: | |
blob.upload_from_filename(source_video_file_name) | |
except Exception as e: | |
print(e) | |
signed_url_video = "None" | |
try: | |
url_expiration_video = timedelta(minutes=15) | |
signed_url_video = blob.generate_signed_url(expiration=url_expiration_video) | |
except Exception as e: | |
print(e) | |
return signed_url_video | |
def get_answer(question: str, chatbot: list[tuple[str, str]], history_messages, comp_audio, comp_video, df_table): | |
""" | |
Gets the answer of the chatbot | |
""" | |
if len(chatbot) == 8: | |
message_output = 'Un placer haberte ayudado, hasta luego!' | |
else: | |
start_get_standalone_question = time.time() | |
standalone_msg_q = _get_standalone_question(question, history_messages) # create standalone question or message | |
end_get_standalone_question = time.time() | |
time_get_standalone_question = end_get_standalone_question - start_get_standalone_question | |
start_call_embedding = time.time() | |
output_embedding = _call_embedding(standalone_msg_q) # create embedding of standalone question or message | |
end_call_embedding = time.time() | |
time_call_embedding = end_call_embedding - start_call_embedding | |
start_query_pinecone = time.time() | |
best_results = _query_pinecone(output_embedding) # get nearest embeddings | |
end_query_pinecone = time.time() | |
time_query_pinecone = end_query_pinecone - start_query_pinecone | |
start_general_prompt = time.time() | |
final_context_prompt = _general_prompt(best_results) # create context/general prompt | |
end_general_prompt = time.time() | |
time_general_prompt = end_general_prompt - start_general_prompt | |
start_call_gpt = time.time() | |
message_output = _call_gpt(final_context_prompt, question) # final response (to user) | |
end_call_gpt = time.time() | |
time_call_gpt = end_call_gpt - start_call_gpt | |
if "Respuesta:" in message_output: | |
message_output.replace("Respuesta:", "") | |
start_create_clean_message = time.time() | |
processed_message = _create_clean_message(message_output) # clean message output | |
end_create_clean_message = time.time() | |
time_create_clean_message = end_create_clean_message - start_create_clean_message | |
start_create_audio = time.time() | |
url_audio, unique_id = _create_audio(processed_message) # create audio with elevenlabs | |
end_create_audio = time.time() | |
time_create_audio = end_create_audio - start_create_audio | |
start_create_video = time.time() | |
url_video = _create_video(url_audio, unique_id) # create video with d-id no streaming | |
end_create_video = time.time() | |
time_create_video = end_create_video - start_create_video | |
final_time = time_get_standalone_question + time_call_embedding + time_query_pinecone + time_general_prompt | |
final_time += (time_call_gpt + time_create_clean_message + time_create_audio + time_create_video) | |
df_table = pd.DataFrame(df_table) | |
df_table.loc[len(df_table.index)] = [question, | |
message_output, | |
time_get_standalone_question, | |
time_call_embedding, | |
time_query_pinecone, | |
time_general_prompt, | |
time_call_gpt, | |
time_create_clean_message, | |
time_create_audio, | |
time_create_video, | |
final_time] | |
new_df_table = gr.DataFrame(df_table, interactive=False, visible=True) | |
print(history_messages) | |
return _update_elements(question, chatbot, message_output, history_messages, url_audio, url_video, new_df_table) | |
def init_greeting(chatbot, history_messages): | |
if len(chatbot) == 0: | |
greeting = ('Hola 👋, soy Roll, tu asistente de recomendación de series y películas animadas en Crunchyroll. ¿En qué puedo ayudarte hoy?') | |
history_messages.append({'role': 'assistant', 'content': greeting}) | |
chatbot.append([None, greeting]) | |
return chatbot, history_messages | |
def export_dataframe(df): | |
final_df = pd.DataFrame(df) | |
final_df = final_df.iloc[1:] | |
final_df.to_csv("./csv_times/csv_times.csv", index=False, encoding='utf-8') |