from time import time, sleep import datetime import dotenv import os import openai import json import pinecone from uuid import uuid4 from helper import open_file, save_file import re from langchain.memory import VectorStoreRetrieverMemory ## Read the environment variables dotenv.load_dotenv('.env') openai.api_key = os.getenv('OPENAI_API_KEY') embedding_model = os.getenv('EMBEDDING_ENGINE') convo_length = int(os.getenv('CONVO_LENGTH_TO_FETCH')) llm_model = os.getenv('LLM_MODEL') debug=False if os.getenv('DEBUG') == 'True': debug=True pinecone_api_key = os.getenv('PINECONE_API_KEY') pinecone_env = os.getenv('PINECONE_REGION') pinecone_index = os.getenv('PINECONE_INDEX') pinecone.init( api_key=pinecone_api_key, environment=pinecone_env ) vector_db = pinecone.Index(pinecone_index) file_path = os.getenv('GAME_DOCS_FOLDER') file_name = os.getenv('GAME_DOCS_FILE') game_index = os.getenv('GAME_ID_INDEX') def timestamp_to_datetime(unix_time): return datetime.datetime.fromtimestamp(unix_time).strftime("%A, %B %d, %Y at %I:%M%p %Z") def perform_embedding(content): content = content.encode(encoding='ASCII',errors='ignore').decode() response = openai.Embedding.create(model=embedding_model, input=content) vector = response['data'][0]['embedding'] return vector def load_conversation(results): result = list() for m in results['matches']: result.append({'time1': m['metadata']['timestring'], 'text': m['metadata']['text']}) ordered = sorted(result, key=lambda d: d['time1'], reverse = False) messages = [i['text'] for i in ordered] message_block = '\n'.join(messages).strip() return message_block def call_gpt(prompt): max_retry = 5 retry = 0 prompt = prompt.encode(encoding='ASCII',errors='ignore').decode() while True: try: response = openai.ChatCompletion.create( model=llm_model, temperature=0.9, messages=[ {"role": "user", "content": prompt} ] ) text = response.choices[0].message.content text = re.sub('[\r\n]+', '\n', text) text = re.sub('[\t ]+', ' ', text) filename = '%s_gpt3.txt' % time() if not os.path.exists('gpt3_logs'): os.makedirs('gpt3_logs') save_file('gpt3_logs/%s' % filename, prompt + '\n\n==========\n\n' + text) response.choices[0].message.content = text return response except Exception as oops: retry += 1 if retry >= max_retry: return "GPT3 error: %s" % oops print('Error communicating with OpenAI:', oops) sleep(1) def start_game(game_id, user_id, user_input): payload = list() # Get user input, save it, vectorize it and save to pinecone timestamp = time() timestring = timestamp_to_datetime(timestamp) unique_id = str(uuid4()) vector = perform_embedding(user_input) metadata = {'speaker': 'USER', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': user_input} payload.append((unique_id, vector, metadata)) # Search for relevant messages and return a response results=vector_db.query(vector=vector, top_k=convo_length, include_metadata=True, filter={ "$and": [{ "user_id": { "$eq": user_id } }, { "game_id": { "$eq": game_id } }] } ) conversation = load_conversation(results) # Populate prompt prompt_text = open_file(f"prompt_{game_id}_{user_id}.txt") prompt = open_file('prompt_response.txt').replace('<>', prompt_text).replace('<>', conversation).replace('<>', user_input).replace('<>', user_id) # Generate response, vectorize llm_output_msg = call_gpt(prompt) llm_output = llm_output_msg.choices[0].message.content timestamp_op = time() timestring_op = timestamp_to_datetime(timestamp) vector_op = perform_embedding(llm_output) unique_id_op = str(uuid4) metadata_op = {'speaker': 'BOT', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': llm_output} payload.append((unique_id_op, vector_op, metadata_op)) # Upsert into the vector database vector_db.upsert(payload) return(llm_output) def get_game_details(game_id): file_data = open_file(f"{file_path}/{game_index}") tmp_json = json.loads(file_data) for json_item in tmp_json["game_details"]: if json_item["game_id"] == game_id: return json_item return "Not Found" def populate_prompt(game_id, splits): prompt_text = list() idlist = [] for j in range(int(splits)): idlist.append(game_id + "-" + str(j)) results=vector_db.fetch(ids=idlist) for ids in idlist: prompt_text.append(results['vectors'][ids]["metadata"]["text"]) whole_prompt = ' '.join(prompt_text).strip() return whole_prompt def initialize_game(game_id, user_id, user_input): game_details = get_game_details(game_id) whole_prompt = populate_prompt(game_id, game_details["splits"]) if debug: print(whole_prompt[:1000]) whole_prompt = whole_prompt.replace("<>", user_input) if debug: print(whole_prompt[:1000]) llm_prompt_op = call_gpt(whole_prompt) #print(llm_prompt_op.choices[0]["message"]["content"]) fname="prompt_" + game_id + "_" + user_id + ".txt" save_file(fname, llm_prompt_op.choices[0]["message"]["content"]) return llm_prompt_op.choices[0]["message"]["content"] if __name__ == '__main__': print("main")