Spaces:
Sleeping
Sleeping
from time import time, sleep | |
import datetime | |
import dotenv | |
import os | |
import openai | |
import json | |
import pinecone | |
from uuid import uuid4 | |
from helper import open_file, save_file | |
import re | |
from langchain.memory import VectorStoreRetrieverMemory | |
## Read the environment variables | |
dotenv.load_dotenv('.env') | |
openai.api_key = os.getenv('OPENAI_API_KEY') | |
embedding_model = os.getenv('EMBEDDING_ENGINE') | |
convo_length = int(os.getenv('CONVO_LENGTH_TO_FETCH')) | |
llm_model = os.getenv('LLM_MODEL') | |
pinecone_api_key = os.getenv('PINECONE_API_KEY') | |
pinecone_env = os.getenv('PINECONE_REGION') | |
pinecone_index = os.getenv('PINECONE_INDEX') | |
pinecone.init( | |
api_key=pinecone_api_key, | |
environment=pinecone_env | |
) | |
vector_db = pinecone.Index(pinecone_index) | |
def timestamp_to_datetime(unix_time): | |
return datetime.datetime.fromtimestamp(unix_time).strftime("%A, %B %d, %Y at %I:%M%p %Z") | |
def perform_embedding(content): | |
content = content.encode(encoding='ASCII',errors='ignore').decode() | |
response = openai.Embedding.create(model=embedding_model, input=content) | |
vector = response['data'][0]['embedding'] | |
return vector | |
def load_conversation(results): | |
result = list() | |
for m in results['matches']: | |
result.append({'time1': m['metadata']['timestring'], 'text': m['metadata']['text']}) | |
ordered = sorted(result, key=lambda d: d['time1'], reverse = False) | |
messages = [i['text'] for i in ordered] | |
message_block = '\n'.join(messages).strip() | |
return message_block | |
def call_gpt(prompt): | |
max_retry = 5 | |
retry = 0 | |
prompt = prompt.encode(encoding='ASCII',errors='ignore').decode() | |
while True: | |
try: | |
response = openai.ChatCompletion.create( | |
model=llm_model, | |
temperature=0.9, | |
messages=[ | |
{"role": "user", "content": prompt} | |
] | |
) | |
text = response.choices[0].message.content | |
text = re.sub('[\r\n]+', '\n', text) | |
text = re.sub('[\t ]+', ' ', text) | |
filename = '%s_gpt3.txt' % time() | |
if not os.path.exists('gpt3_logs'): | |
os.makedirs('gpt3_logs') | |
save_file('gpt3_logs/%s' % filename, prompt + '\n\n==========\n\n' + text) | |
response.choices[0].message.content = text | |
return response | |
except Exception as oops: | |
retry += 1 | |
if retry >= max_retry: | |
return "GPT3 error: %s" % oops | |
print('Error communicating with OpenAI:', oops) | |
sleep(1) | |
def start_game(game_id, user_id, user_input): | |
payload = list() | |
# Get user input, save it, vectorize it and save to pinecone | |
timestamp = time() | |
timestring = timestamp_to_datetime(timestamp) | |
unique_id = str(uuid4()) | |
vector = perform_embedding(user_input) | |
metadata = {'speaker': 'USER', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': user_input} | |
payload.append((unique_id, vector, metadata)) | |
# Search for relevant messages and return a response | |
results=vector_db.query(vector=vector, top_k=convo_length, include_metadata=True, | |
filter={ | |
"$and": [{ "user_id": { "$eq": user_id } }, { "game_id": { "$eq": game_id } }] | |
} | |
) | |
conversation = load_conversation(results) | |
# Populate prompt | |
prompt_text = open_file(f"prompt_{game_id}_{user_id}.txt") | |
prompt = open_file('prompt_response.txt').replace('<<PROMPT_VALUE>>', prompt_text).replace('<<CONVERSATION>>', conversation).replace('<<USER_MSG>>', user_input).replace('<<USER_VAL>>', user_id) | |
# Generate response, vectorize | |
llm_output_msg = call_gpt(prompt) | |
llm_output = llm_output_msg.choices[0].message.content | |
timestamp_op = time() | |
timestring_op = timestamp_to_datetime(timestamp) | |
vector_op = perform_embedding(llm_output) | |
unique_id_op = str(uuid4) | |
metadata_op = {'speaker': 'BOT', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': llm_output} | |
payload.append((unique_id_op, vector_op, metadata_op)) | |
# Upsert into the vector database | |
vector_db.upsert(payload) | |
return(llm_output) | |
def populate_prompt(game_id): | |
prompt_text = list() | |
idlist = [] | |
for j in range(21): | |
idlist.append(game_id + "-" + str(j)) | |
results=vector_db.fetch(ids=idlist) | |
for ids in idlist: | |
prompt_text.append(results['vectors'][ids]["metadata"]["text"]) | |
whole_prompt = ' '.join(prompt_text).strip() | |
return whole_prompt | |
def initialize_game(user_id, game_id): | |
whole_prompt = populate_prompt(game_id) | |
llm_prompt_op = call_gpt(whole_prompt) | |
#print(llm_prompt_op.choices[0]["message"]["content"]) | |
fname="prompt_" + game_id + "_" + user_id + ".txt" | |
save_file(fname, llm_prompt_op.choices[0]["message"]["content"]) | |
return llm_prompt_op.choices[0]["message"]["content"] | |
if __name__ == '__main__': | |
user_id='user_1' | |
game_id = '536e6bc89df5' | |
output = initialize_game(user_id, game_id) | |
print('\n\nGENESIS: %s' % output) | |
while True: | |
a = input('\n\n%s: ' % user_id) | |
output = start_game(game_id, user_id, user_input=a) | |
print('\n\nGENESIS: %s' % output) |