apisforgenesis / chat.py
laxsvips's picture
Update chat.py
ff9674c
raw
history blame
5.84 kB
from time import time, sleep
import datetime
import dotenv
import os
import openai
import json
import pinecone
from uuid import uuid4
from helper import open_file, save_file
import re
from langchain.memory import VectorStoreRetrieverMemory
## Read the environment variables
dotenv.load_dotenv('.env')
openai.api_key = os.getenv('OPENAI_API_KEY')
embedding_model = os.getenv('EMBEDDING_ENGINE')
convo_length = int(os.getenv('CONVO_LENGTH_TO_FETCH'))
llm_model = os.getenv('LLM_MODEL')
debug=False
if os.getenv('DEBUG') == 'True':
debug=True
pinecone_api_key = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_REGION')
pinecone_index = os.getenv('PINECONE_INDEX')
pinecone.init(
api_key=pinecone_api_key,
environment=pinecone_env
)
vector_db = pinecone.Index(pinecone_index)
file_path = os.getenv('GAME_DOCS_FOLDER')
file_name = os.getenv('GAME_DOCS_FILE')
game_index = os.getenv('GAME_ID_INDEX')
def timestamp_to_datetime(unix_time):
return datetime.datetime.fromtimestamp(unix_time).strftime("%A, %B %d, %Y at %I:%M%p %Z")
def perform_embedding(content):
content = content.encode(encoding='ASCII',errors='ignore').decode()
response = openai.Embedding.create(model=embedding_model, input=content)
vector = response['data'][0]['embedding']
return vector
def load_conversation(results):
result = list()
for m in results['matches']:
result.append({'time1': m['metadata']['timestring'], 'text': m['metadata']['text']})
ordered = sorted(result, key=lambda d: d['time1'], reverse = False)
messages = [i['text'] for i in ordered]
message_block = '\n'.join(messages).strip()
return message_block
def call_gpt(prompt):
max_retry = 5
retry = 0
prompt = prompt.encode(encoding='ASCII',errors='ignore').decode()
while True:
try:
response = openai.ChatCompletion.create(
model=llm_model,
temperature=0.9,
messages=[
{"role": "user", "content": prompt}
]
)
text = response.choices[0].message.content
text = re.sub('[\r\n]+', '\n', text)
text = re.sub('[\t ]+', ' ', text)
filename = '%s_gpt3.txt' % time()
if not os.path.exists('gpt3_logs'):
os.makedirs('gpt3_logs')
save_file('gpt3_logs/%s' % filename, prompt + '\n\n==========\n\n' + text)
response.choices[0].message.content = text
return response
except Exception as oops:
retry += 1
if retry >= max_retry:
return "GPT3 error: %s" % oops
print('Error communicating with OpenAI:', oops)
sleep(1)
def start_game(game_id, user_id, user_input):
payload = list()
# Get user input, save it, vectorize it and save to pinecone
timestamp = time()
timestring = timestamp_to_datetime(timestamp)
unique_id = str(uuid4())
vector = perform_embedding(user_input)
metadata = {'speaker': 'USER', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': user_input}
payload.append((unique_id, vector, metadata))
# Search for relevant messages and return a response
results=vector_db.query(vector=vector, top_k=convo_length, include_metadata=True,
filter={
"$and": [{ "user_id": { "$eq": user_id } }, { "game_id": { "$eq": game_id } }]
}
)
conversation = load_conversation(results)
# Populate prompt
prompt_text = open_file(f"prompt_{game_id}_{user_id}.txt")
prompt = open_file('prompt_response.txt').replace('<<PROMPT_VALUE>>', prompt_text).replace('<<CONVERSATION>>', conversation).replace('<<USER_MSG>>', user_input).replace('<<USER_VAL>>', user_id)
# Generate response, vectorize
llm_output_msg = call_gpt(prompt)
llm_output = llm_output_msg.choices[0].message.content
timestamp_op = time()
timestring_op = timestamp_to_datetime(timestamp)
vector_op = perform_embedding(llm_output)
unique_id_op = str(uuid4)
metadata_op = {'speaker': 'BOT', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': llm_output}
payload.append((unique_id_op, vector_op, metadata_op))
# Upsert into the vector database
vector_db.upsert(payload)
return(llm_output)
def get_game_details(game_id):
file_data = open_file(f"{file_path}/{game_index}")
tmp_json = json.loads(file_data)
for json_item in tmp_json["game_details"]:
if json_item["game_id"] == game_id:
return json_item
return "Not Found"
def populate_prompt(game_id, splits):
prompt_text = list()
idlist = []
for j in range(int(splits)):
idlist.append(game_id + "-" + str(j))
results=vector_db.fetch(ids=idlist)
for ids in idlist:
prompt_text.append(results['vectors'][ids]["metadata"]["text"])
whole_prompt = ' '.join(prompt_text).strip()
return whole_prompt
def initialize_game(game_id, user_id, user_input):
game_details = get_game_details(game_id)
whole_prompt = populate_prompt(game_id, game_details["splits"])
if debug:
print(whole_prompt[:1000])
whole_prompt = whole_prompt.replace("<<USER_INPUT_MSG>>", user_input)
if debug:
print(whole_prompt[:1000])
llm_prompt_op = call_gpt(whole_prompt)
#print(llm_prompt_op.choices[0]["message"]["content"])
fname="prompt_" + game_id + "_" + user_id + ".txt"
save_file(fname, llm_prompt_op.choices[0]["message"]["content"])
return llm_prompt_op.choices[0]["message"]["content"]
if __name__ == '__main__':
print("main")