apisforgenesis / chat.py
laxsvips's picture
Update chat.py
a385a57
raw
history blame
6.72 kB
from time import time, sleep
import datetime
import dotenv
import os
import openai
import json
import pinecone
from uuid import uuid4
from helper import open_file, save_file, read_word_document
import re
from langchain.memory import VectorStoreRetrieverMemory
## Read the environment variables
dotenv.load_dotenv('.env')
openai.api_key = os.getenv('OPENAI_API_KEY')
embedding_model = os.getenv('EMBEDDING_ENGINE')
convo_length = int(os.getenv('CONVO_LENGTH_TO_FETCH'))
llm_model = os.getenv('LLM_MODEL')
debug=False
if os.getenv('DEBUG') == 'True':
debug=True
pinecone_api_key = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_REGION')
pinecone_index = os.getenv('PINECONE_INDEX')
pinecone.init(
api_key=pinecone_api_key,
environment=pinecone_env
)
vector_db = pinecone.Index(pinecone_index)
file_path = os.getenv('GAME_DOCS_FOLDER')
file_name = os.getenv('GAME_DOCS_FILE')
game_index = os.getenv('GAME_ID_INDEX')
def timestamp_to_datetime(unix_time):
return datetime.datetime.fromtimestamp(unix_time).strftime("%A, %B %d, %Y at %I:%M%p %Z")
def perform_embedding(content):
content = content.encode(encoding='ASCII',errors='ignore').decode()
response = openai.Embedding.create(model=embedding_model, input=content)
vector = response['data'][0]['embedding']
return vector
def load_conversation(results):
result = list()
for m in results['matches']:
result.append({'time1': m['metadata']['timestring'], 'text': m['metadata']['text']})
ordered = sorted(result, key=lambda d: d['time1'], reverse = False)
messages = [i['text'] for i in ordered]
message_block = '\n'.join(messages).strip()
return message_block
def call_gpt(prompt):
max_retry = 5
retry = 0
prompt = prompt.encode(encoding='ASCII',errors='ignore').decode()
while True:
try:
response = openai.ChatCompletion.create(
model=llm_model,
temperature=0.9,
messages=[
{"role": "user", "content": prompt}
]
)
text = response.choices[0].message.content
text = re.sub('[\r\n]+', '\n', text)
text = re.sub('[\t ]+', ' ', text)
filename = '%s_gpt3.txt' % time()
if not os.path.exists('gpt3_logs'):
os.makedirs('gpt3_logs')
save_file('gpt3_logs/%s' % filename, prompt + '\n\n==========\n\n' + text)
response.choices[0].message.content = text
return response
except Exception as oops:
retry += 1
if retry >= max_retry:
return "GPT3 error: %s" % oops
print('Error communicating with OpenAI:', oops)
sleep(1)
def start_game(game_id, user_id, user_input):
payload = list()
# Get user input, save it, vectorize it and save to pinecone
timestamp = time()
timestring = timestamp_to_datetime(timestamp)
unique_id = str(uuid4())
vector = perform_embedding(user_input)
metadata = {'speaker': 'USER', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': user_input}
payload.append((unique_id, vector, metadata))
# Search for relevant messages and return a response
results=vector_db.query(vector=vector, top_k=convo_length, include_metadata=True,
filter={
"$and": [{ "user_id": { "$eq": user_id } }, { "game_id": { "$eq": game_id } }]
}
)
conversation = load_conversation(results)
# Populate prompt
prompt_text = open_file(f"prompt_{game_id}_{user_id}.txt")
prompt = open_file('prompt_response.txt').replace('<<PROMPT_VALUE>>', prompt_text).replace('<<CONVERSATION>>', conversation).replace('<<USER_MSG>>', user_input).replace('<<USER_VAL>>', user_id)
# Generate response, vectorize
llm_output_msg = call_gpt(prompt)
llm_output = llm_output_msg.choices[0].message.content
timestamp_op = time()
timestring_op = timestamp_to_datetime(timestamp)
vector_op = perform_embedding(llm_output)
unique_id_op = str(uuid4)
metadata_op = {'speaker': 'BOT', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': llm_output}
payload.append((unique_id_op, vector_op, metadata_op))
# Upsert into the vector database
vector_db.upsert(payload)
return(llm_output)
def get_game_details(game_id):
file_data = open_file(f"{file_path}/{game_index}")
tmp_json = json.loads(file_data)
for json_item in tmp_json["game_details"]:
if json_item["game_id"] == game_id:
return json_item
return "Not Found"
def populate_prompt(game_id, splits):
prompt_text = list()
idlist = []
for j in range(int(splits)):
idlist.append(game_id + "-" + str(j))
results=vector_db.fetch(ids=idlist)
for ids in idlist:
prompt_text.append(results['vectors'][ids]["metadata"]["text"])
whole_prompt = ' '.join(prompt_text).strip()
return whole_prompt
def initialize_game(game_id, user_id, user_input):
game_details = get_game_details(game_id)
whole_prompt = populate_prompt(game_id, game_details["splits"])
if debug:
print(whole_prompt[:1000])
whole_prompt = whole_prompt.replace("<<USER_INPUT_MSG>>", user_input)
if debug:
print(whole_prompt[:1000])
llm_prompt_op = call_gpt(whole_prompt)
#print(llm_prompt_op.choices[0]["message"]["content"])
fname="prompt_" + game_id + "_" + user_id + ".txt"
save_file(fname, llm_prompt_op.choices[0]["message"]["content"])
return llm_prompt_op.choices[0]["message"]["content"]
def generate_image_prompt(game_id, user_id, user_input):
if 'You have a manual' in user_input:
user_input = user_input.replace('You have a manual of this newly created simulation in your mind. Now what is the first thing you will do in this world?', '')
payload = list()
file_data = open_file(f"{file_path}/image_prompt_leo.txt").replace("<<PROMPT_FOR_IMG>>", user_input)
leo_input_msg = call_gpt(file_data)
leo_input = leo_input_msg.choices[0].message.content
timestamp_op = time()
timestring_op = timestamp_to_datetime(timestamp_op)
vector_op = perform_embedding(leo_input)
unique_id_op = str(uuid4)
metadata_op = {'speaker': 'BOT4LEO', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring_op, 'text': leo_input}
payload.append((unique_id_op, vector_op, metadata_op))
return leo_input
if __name__ == '__main__':
print("main")