Spaces:
Runtime error
Runtime error
import gradio as gr | |
import random | |
import os | |
from openai import OpenAI | |
from pinecone import Pinecone | |
import uuid | |
from pymongo.mongo_client import MongoClient | |
from dotenv import load_dotenv | |
load_dotenv() | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
MODEL_OPENAI = os.getenv("MODEL_OPENAI") | |
PINECONE_API_TOKEN = os.getenv("PINECONE_API_TOKEN") | |
PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENV") | |
PINECONE_HOST = os.getenv("PINECONE_HOST") | |
DB_USER_NAME = os.getenv("DB_USER_NAME") | |
DB_PASSWORD = os.getenv("DB_PASSWORD") | |
# Chat | |
openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
# Vector store | |
pc = Pinecone(api_key=PINECONE_API_TOKEN) | |
index = pc.Index(host=PINECONE_HOST) | |
# Database | |
uri = f"mongodb+srv://{DB_USER_NAME}:{DB_PASSWORD}@cluster-rob01.3fpztfw.mongodb.net/?retryWrites=true&w=majority&appName=cluster-rob01" | |
client = MongoClient(uri) | |
db = client["ChatCrunchyroll"] | |
collection = db["history_msg"] | |
def _save_history_msg(): | |
return None | |
def _add_question_vectorstore(question: str, response: str): | |
vector_id = str(uuid.uuid4()) | |
vector_embedding = _call_embedding(question) | |
vector_metadata = { | |
'question': question, | |
'text': response | |
} | |
index.upsert([(vector_id, vector_embedding, vector_metadata)]) | |
def _update_elements(question, chatbot, output, history_messages): | |
chatbot.append([question, output]) | |
history_messages.append({'role': 'user', 'content': question}) | |
history_messages.append({'role': 'assistant', 'content': output}) | |
return chatbot | |
def _query_pinecone(embedding): | |
results = index.query( | |
vector=embedding, | |
top_k=10, | |
include_metadata=True, | |
) | |
final_results = """""" | |
for result in results['matches']: | |
final_results += f"{result['metadata']['text']}\n" | |
return final_results | |
def _general_prompt(context): | |
with open("prompt_general.txt", "r") as file: | |
file_prompt = file.read().replace("\n", "") | |
context_prompt = file_prompt.replace('CONTEXT', context) | |
print(context_prompt) | |
print("--------------------") | |
return context_prompt | |
def _call_embedding(text: str): | |
response = openai_client.embeddings.create( | |
input=text, | |
model='text-embedding-ada-002' | |
) | |
return response.data[0].embedding | |
def _call_gpt(prompt: str, message: str): | |
response = openai_client.chat.completions.create( | |
model=MODEL_OPENAI, | |
temperature=0.2, | |
messages=[ | |
{'role': 'system', 'content': prompt}, | |
{'role': 'user', 'content': message} | |
] | |
) | |
return response.choices[0].message.content | |
def _call_gpt_standalone(prompt: str): | |
response = openai_client.chat.completions.create( | |
model=MODEL_OPENAI, | |
temperature=0.2, | |
messages=[ | |
{'role': 'system', 'content': prompt}, | |
] | |
) | |
return response.choices[0].message.content | |
def _get_standalone_question(question, history_messages): | |
with open("prompt_standalone_message.txt", "r") as file: | |
file_prompt_standalone = file.read().replace("\n", "") | |
history = '' | |
for i, msg in enumerate(history_messages): | |
try: | |
if i == 0: | |
continue # Omit the prompt | |
if i % 2 == 0: | |
history += f'user: {msg["content"]}\n' | |
else: | |
history += f'assistant: {msg["content"]}\n' | |
except Exception as e: | |
print(e) | |
prompt_standalone = file_prompt_standalone.replace('HISTORY', history).replace('QUESTION', question) | |
standalone_msg_q = _call_gpt_standalone(prompt_standalone) | |
print(standalone_msg_q) | |
print("------------------") | |
return standalone_msg_q | |
def get_answer_text(question: str, chatbot: list[tuple[str, str]], history_messages): | |
""" | |
Gets the answer of the chatbot | |
""" | |
if len(chatbot) == 8: | |
message_output = 'Un placer haberte ayudado, hasta luego!' | |
else: | |
standalone_msg_q = _get_standalone_question(question, history_messages) # create standalone question or message | |
output_embedding = _call_embedding(standalone_msg_q) # create embedding of standalone question or message | |
best_results = _query_pinecone(output_embedding) # get nearest embeddings | |
final_context_prompt = _general_prompt(best_results) # create context/general prompt | |
message_output = _call_gpt(final_context_prompt, question) | |
if "Respuesta:" in message_output: | |
message_output.replace("Respuesta:", "") | |
print(history_messages) | |
return _update_elements(question, chatbot, message_output, history_messages) |