File size: 6,607 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
"""
Adapt the Chess environment in PettingZoo (https://github.com/Farama-Foundation/PettingZoo) to the BaseEnv interface.
"""
import sys
import chess
import numpy as np
from ding.envs import BaseEnv, BaseEnvTimestep
from ding.utils import ENV_REGISTRY
from gymnasium import spaces
from pettingzoo.classic.chess import chess_utils
from pettingzoo.utils.agent_selector import agent_selector
@ENV_REGISTRY.register('Chess')
class ChessEnv(BaseEnv):
def __init__(self, cfg=None):
self.cfg = cfg
self.current_player_index = 0
self.next_player_index = 1
self.board = chess.Board()
self.agents = [f"player_{i + 1}" for i in range(2)]
self.possible_agents = self.agents[:]
self._agent_selector = agent_selector(self.agents)
self._action_spaces = {name: spaces.Discrete(8 * 8 * 73) for name in self.agents}
self._observation_spaces = {
name: spaces.Dict(
{
'observation': spaces.Box(low=0, high=1, shape=(8, 8, 111), dtype=bool),
'action_mask': spaces.Box(low=0, high=1, shape=(4672, ), dtype=np.int8)
}
)
for name in self.agents
}
self.rewards = None
self.dones = None
self.infos = {name: {} for name in self.agents}
self.agent_selection = None
self.board_history = np.zeros((8, 8, 104), dtype=bool)
@property
def current_player(self):
return self.current_player_index
def to_play(self):
return self.next_player_index
def reset(self):
self.has_reset = True
self.agents = self.possible_agents[:]
self.board = chess.Board()
self._agent_selector = agent_selector(self.agents)
self.agent_selection = self._agent_selector.reset()
self.rewards = {name: 0 for name in self.agents}
self._cumulative_rewards = {name: 0 for name in self.agents}
self.dones = {name: False for name in self.agents}
self.infos = {name: {} for name in self.agents}
self.board_history = np.zeros((8, 8, 104), dtype=bool)
self.current_player_index = 0
for agent, reward in self.rewards.items():
self._cumulative_rewards[agent] += reward
agent = self.agent_selection
current_index = self.agents.index(agent)
self.current_player_index = current_index
obs = self.observe(agent)
return obs
def observe(self, agent):
observation = chess_utils.get_observation(self.board, self.possible_agents.index(agent))
observation = np.dstack((observation[:, :, :7], self.board_history))
action_mask = self.legal_actions
return {'observation': observation, 'action_mask': action_mask}
def set_game_result(self, result_val):
for i, name in enumerate(self.agents):
self.dones[name] = True
result_coef = 1 if i == 0 else -1
self.rewards[name] = result_val * result_coef
self.infos[name] = {'legal_moves': []}
def step(self, action):
if self.dones[self.agent_selection]:
return self._was_done_step(action)
current_agent = self.agent_selection
current_index = self.agents.index(current_agent)
self.current_player_index = current_index
next_board = chess_utils.get_observation(self.board, current_agent)
self.board_history = np.dstack((next_board[:, :, 7:], self.board_history[:, :, :-13]))
chosen_move = chess_utils.action_to_move(self.board, action, current_index)
assert chosen_move in self.board.legal_moves
self.board.push(chosen_move) # NOTE
next_legal_moves = chess_utils.legal_moves(self.board)
is_stale_or_checkmate = not any(next_legal_moves)
# claim draw is set to be true to align with normal tournament rules
is_repetition = self.board.is_repetition(3)
is_50_move_rule = self.board.can_claim_fifty_moves()
is_claimable_draw = is_repetition or is_50_move_rule
game_over = is_claimable_draw or is_stale_or_checkmate
if game_over:
result = self.board.result(claim_draw=True)
result_val = chess_utils.result_to_int(result)
self.set_game_result(result_val)
# self._accumulate_rewards()
for agent, reward in self.rewards.items():
self._cumulative_rewards[agent] += reward
self.agent_selection = self._agent_selector.next()
agent = self.agent_selection
self.next_player_index = self.agents.index(agent)
observation = self.observe(agent)
return BaseEnvTimestep(observation, self._cumulative_rewards[agent], self.dones[agent], self.infos[agent])
@property
def legal_actions(self):
action_mask = np.zeros(4672, 'uint8')
action_mask[chess_utils.legal_moves(self.board)] = 1
return action_mask # 4672 dim {0,1}
def legal_moves(self):
legal_moves = chess_utils.legal_moves(self.board)
return legal_moves
def random_action(self):
action_list = self.legal_moves()
return np.random.choice(action_list)
def bot_action(self):
# TODO
pass
def human_to_action(self):
"""
Overview:
For multiplayer games, ask the user for a legal action
and return the corresponding action number.
Returns:
An integer from the action space.
"""
while True:
try:
print(f"Current available actions for the player {self.to_play()} are:{self.legal_moves()}")
choice = int(input(f"Enter the index of next move for the player {self.to_play()}: "))
if choice in self.legal_moves():
break
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
print("Wrong input, try again")
return choice
def render(self, mode='human'):
print(self.board)
@property
def observation_space(self):
return self._observation_spaces
@property
def action_space(self):
return self._action_spaces
@property
def reward_space(self):
return self._reward_space
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def close(self) -> None:
pass
def __repr__(self) -> str:
return "LightZero Chess Env"
|