|
import threading |
|
import openai |
|
import os |
|
import time |
|
import random |
|
|
|
openai.api_key = os.environ["OPENAI_KEY"] |
|
|
|
games = {} |
|
all_games_lock = threading.Lock() |
|
TIME_TO_SUBMIT = 30 |
|
TIME_TO_VOTE = 30 |
|
TIME_TO_REVEAL = 10 |
|
|
|
GUESS_AI_PTS = 2 |
|
GUESSED_AS_AI_PTS = 1 |
|
|
|
with open("prompt.txt", "r") as f: |
|
PROMPT = f.read() |
|
|
|
|
|
def new_minigame(): |
|
return "Write a poem about mice!" |
|
|
|
|
|
def ai_response(question): |
|
prompt = PROMPT.format( |
|
question, |
|
) |
|
completions = openai.Completion.create( |
|
engine="gpt-3.5-turbo", prompt=prompt, max_tokens=200, n=1, temperature=0.5 |
|
) |
|
response = completions["choices"][0]["text"] |
|
return response |
|
|
|
|
|
class Game: |
|
def __init__(self, room_name): |
|
self.room_name = room_name |
|
self.command = None |
|
self.players = [] |
|
self.prompts = [] |
|
self.prompts_completed = 0 |
|
self.responses = {} |
|
self.response_list = [] |
|
self.ai_response_index = None |
|
self.lock = threading.Lock() |
|
self.started = False |
|
self.stage = None |
|
self.stage_start_time = None |
|
self.stage_time_remaining = None |
|
self.total_stage_time = None |
|
self.complete = False |
|
self.last_update_index = 0 |
|
self.input_allowed = False |
|
self.last_update_time = time.time() |
|
self.chat = [] |
|
self.player_status = {} |
|
self.player_scores = {} |
|
self.player_guess = {} |
|
self.ai_answer = None |
|
|
|
games[room_name] = self |
|
|
|
def update(self): |
|
self.last_update_index += 1 |
|
self.last_update_time = time.time() |
|
|
|
def start(self): |
|
self.started = True |
|
self.stage = "START" |
|
self.update() |
|
|
|
def get_player_state(self): |
|
return [ |
|
[player, self.player_status[player], self.player_scores[player]] |
|
for player in self.players |
|
] |
|
|
|
def format_chat(self, player_name): |
|
formatted_chat = [[None, self.command]] |
|
if self.stage == "COLLECTING_RESPONSSES": |
|
if player_name in self.responses: |
|
formatted_chat.append([self.responses[player_name], None]) |
|
elif self.stage == "COLLECTING_VOTES": |
|
for msg_player, msg in self.response_list: |
|
if msg_player == player_name: |
|
formatted_chat.append([msg, None]) |
|
else: |
|
formatted_chat.append([None, msg]) |
|
elif self.stage == "REVEALING": |
|
for msg_player, msg in self.response_list: |
|
output_msg = f"**{msg_player or 'GPT'}**: {msg}\nGuessed by: {', '.join(player for player, guess in self.player_guess.items() if guess == msg_player)}" |
|
if msg_player == player_name: |
|
formatted_chat.append([output_msg, None]) |
|
else: |
|
formatted_chat.append([None, output_msg]) |
|
return formatted_chat |
|
|
|
def add_player(self, player_name, player_prompt): |
|
with self.lock: |
|
self.players.append(player_name) |
|
self.prompts.append(player_prompt) |
|
self.player_status[player_name] = None |
|
self.player_scores[player_name] = 0 |
|
if len(self.players) == 1: |
|
self.command = f"You have entered the game." |
|
else: |
|
self.command = f"{', '.join(self.players[:-1])} and {self.players[-1]} are in this room." |
|
self.update() |
|
|
|
def add_chat(self, player, message): |
|
with self.lock: |
|
self.player_status[player] = "Submitted" |
|
self.responses[player] = message |
|
|
|
def select_chat(self, player, index): |
|
with self.lock: |
|
if self.stage != "COLLECTING_VOTES": |
|
return |
|
index = index - 1 |
|
selected_player = self.response_list[index][0] |
|
self.player_guess[player] = selected_player |
|
self.player_status[player] = "Voted" |
|
|
|
|
|
def new_game(room_name): |
|
with all_games_lock: |
|
if room_name in games: |
|
game = games[room_name] |
|
return game |
|
return Game(room_name) |
|
|
|
|
|
def game_thread(): |
|
while True: |
|
now = time.time() |
|
for room_name, game in games.items(): |
|
if not game.started or game.complete: |
|
continue |
|
with game.lock: |
|
if game.stage == "START": |
|
prompt = game.prompts[game.prompts_completed] |
|
game.command = ( |
|
f"**{game.players[game.prompts_completed]}**: {prompt}" |
|
) |
|
game.input_allowed = True |
|
game.player_status = {player: None for player in game.players} |
|
game.player_guess = {player: None for player in game.players} |
|
game.ai_answer = ai_response(prompt) |
|
game.stage = "COLLECTING_RESPONSES" |
|
game.responses = {} |
|
game.stage_start_time = now |
|
game.total_stage_time = TIME_TO_SUBMIT |
|
game.stage_time_remaining = TIME_TO_SUBMIT |
|
game.update() |
|
|
|
stage_time_remaining = int( |
|
game.total_stage_time - (now - game.stage_start_time) |
|
) |
|
if stage_time_remaining != game.stage_time_remaining: |
|
game.stage_time_remaining = stage_time_remaining |
|
game.update() |
|
out_of_time = stage_time_remaining <= 0 |
|
|
|
if game.stage == "COLLECTING_RESPONSES": |
|
if out_of_time or all( |
|
val == "Submitted" for val in game.player_status.values() |
|
): |
|
game.stage = "COLLECTING_VOTES" |
|
game.stage_start_time = now |
|
game.total_stage_time = TIME_TO_VOTE |
|
game.command = "Select the AI response!" |
|
game.player_status = {player: None for player in game.players} |
|
game.response_list = list(game.responses.items()) |
|
game.input_allowed = False |
|
ai_answer = (None, game.ai_answer) |
|
game.response_list.append(ai_answer) |
|
random.shuffle(game.response_list) |
|
game.ai_response_index = game.response_list.index(ai_answer) |
|
game.update() |
|
|
|
if game.stage == "COLLECTING_VOTES": |
|
if out_of_time or all( |
|
val == "Voted" for val in game.player_status.values() |
|
): |
|
game.stage = "REVEALING" |
|
game.command = "Here's the results!" |
|
game.stage_start_time = now |
|
game.total_stage_time = TIME_TO_REVEAL |
|
for player, guess in game.player_guess.items(): |
|
if guess == None: |
|
game.player_scores[player] += GUESS_AI_PTS |
|
else: |
|
game.player_scores[guess] += GUESSED_AS_AI_PTS |
|
game.update() |
|
|
|
if game.stage == "REVEALING": |
|
if out_of_time: |
|
game.stage = "START" |
|
game.prompts_completed += 1 |
|
if game.prompts_completed == len(game.players): |
|
game.complete = True |
|
game.command = f"Game complete! Winners: {', '.join(player for player, score in game.player_scores.items() if score == max(game.player_scores.values()))}" |
|
game.update() |
|
|
|
time.sleep(0.1) |
|
|
|
|
|
thread = threading.Thread(target=game_thread) |
|
thread.daemon = True |
|
thread.start() |
|
|