import threading import openai import os import time import random openai.api_key = os.environ["OPENAI_KEY"] games = {} all_games_lock = threading.Lock() TIME_TO_SUBMIT = 30 TIME_TO_VOTE = 30 TIME_TO_REVEAL = 18 GUESS_AI_PTS = 2 GUESSED_AS_AI_PTS = 1 with open("prompt.txt", "r") as f: PROMPT = f.read() def new_minigame(): return "Write a poem about mice!" def ai_response(question): prompt = PROMPT.format( question, ) completions = None while completions is None: try: completions = openai.Completion.create( engine="text-davinci-003", prompt=prompt, max_tokens=200, n=1, temperature=0.5 ) except: print("OpenAI connection error") time.sleep(1) response = completions["choices"][0]["text"].strip() return response class Game: def __init__(self, room_name): self.room_name = room_name self.command = None self.players = [] self.prompts = [] self.prompts_completed = 0 self.responses = {} self.response_list = [] self.ai_response_index = None self.lock = threading.Lock() self.started = False self.stage = None self.stage_start_time = None self.stage_time_remaining = None self.total_stage_time = None self.complete = False self.last_update_index = 0 self.input_allowed = False self.last_update_time = time.time() self.chat = [] self.player_status = {} self.player_scores = {} self.player_guess = {} self.ai_answer = None games[room_name] = self def update(self): self.last_update_index += 1 self.last_update_time = time.time() def start(self): self.started = True self.stage = "START" self.update() def get_player_state(self): return [ [player, self.player_status[player], self.player_scores[player]] for player in self.players ] def format_chat(self, player_name): formatted_chat = [[None, self.command]] if self.stage == "COLLECTING_RESPONSSES": if player_name in self.responses: formatted_chat.append([self.responses[player_name], None]) elif self.stage == "COLLECTING_VOTES": for msg_player, msg in self.response_list: if msg_player == player_name: formatted_chat.append([msg, None]) else: formatted_chat.append([None, msg]) elif self.stage == "REVEALING": for msg_player, msg in self.response_list: output_msg = f"**{msg_player or 'GPT'}**: {msg}\nGuessed by: {', '.join(player for player, guess in self.player_guess.items() if guess == msg_player)}" if msg_player == player_name: formatted_chat.append([output_msg, None]) else: formatted_chat.append([None, output_msg]) return formatted_chat def add_player(self, player_name, player_prompt): with self.lock: self.players.append(player_name) self.prompts.append(player_prompt) self.player_status[player_name] = None self.player_scores[player_name] = 0 if len(self.players) == 1: self.command = f"You have entered the game." else: self.command = f"{', '.join(self.players[:-1])} and {self.players[-1]} are in this room." self.update() def add_chat(self, player, message): with self.lock: self.player_status[player] = "Submitted" self.responses[player] = message def select_chat(self, player, index): with self.lock: if self.stage != "COLLECTING_VOTES": return index = index - 1 selected_player = self.response_list[index][0] self.player_guess[player] = selected_player self.player_status[player] = "Voted" def new_game(room_name): with all_games_lock: if room_name in games: game = games[room_name] return game return Game(room_name) def game_thread(): while True: now = time.time() for room_name, game in games.items(): if not game.started or game.complete: continue with game.lock: if game.stage == "START": prompt = game.prompts[game.prompts_completed] game.command = ( f"**{game.players[game.prompts_completed]}**: {prompt}" ) game.input_allowed = True game.player_status = {player: None for player in game.players} game.player_guess = {player: None for player in game.players} game.ai_answer = ai_response(prompt) game.stage = "COLLECTING_RESPONSES" game.responses = {} game.stage_start_time = now game.total_stage_time = TIME_TO_SUBMIT game.stage_time_remaining = TIME_TO_SUBMIT game.update() stage_time_remaining = int( game.total_stage_time - (now - game.stage_start_time) ) if stage_time_remaining != game.stage_time_remaining: game.stage_time_remaining = stage_time_remaining game.update() out_of_time = stage_time_remaining <= 0 if game.stage == "COLLECTING_RESPONSES": if out_of_time or all( val == "Submitted" for val in game.player_status.values() ): game.stage = "COLLECTING_VOTES" game.stage_start_time = now game.total_stage_time = TIME_TO_VOTE game.command = "Select the AI response!" game.player_status = {player: None for player in game.players} game.response_list = list(game.responses.items()) game.input_allowed = False ai_answer = (None, game.ai_answer) game.response_list.append(ai_answer) random.shuffle(game.response_list) game.ai_response_index = game.response_list.index(ai_answer) game.update() elif game.stage == "COLLECTING_VOTES": if out_of_time or all( val == "Voted" for val in game.player_status.values() ): game.stage = "REVEALING" game.command = "Here's the results!" game.stage_start_time = now game.total_stage_time = TIME_TO_REVEAL for player, guess in game.player_guess.items(): if guess == None: game.player_scores[player] += GUESS_AI_PTS else: game.player_scores[guess] += GUESSED_AS_AI_PTS game.update() elif game.stage == "REVEALING": if out_of_time: game.stage = "START" game.prompts_completed += 1 if game.prompts_completed == len(game.players): game.complete = True game.command = f"Game complete! Winners: {', '.join(player for player, score in game.player_scores.items() if score == max(game.player_scores.values()))}" game.update() time.sleep(0.1) thread = threading.Thread(target=game_thread) thread.daemon = True thread.start()