import copy |
import logging |
import os |
import sys |
from typing import List |
import gymnasium as gym |
import imageio |
import matplotlib.font_manager as fm |
import matplotlib.pyplot as plt |
import numpy as np |
from PIL import Image, ImageDraw, ImageFont |
from ding.envs import BaseEnvTimestep |
from ding.torch_utils import to_ndarray |
from ding.utils import ENV_REGISTRY |
from easydict import EasyDict |
from gymnasium import spaces |
from gymnasium.utils import seeding |
@ENV_REGISTRY.register('game_2048') |
class Game2048Env(gym.Env): |
""" |
Overview: |
The Game2048Env is a gym environment implementation of the 2048 game. The goal of the game is to slide numbered tiles |
on a grid to combine them and create a tile with the number 2048 (or larger). The environment provides an interface to interact with |
the game and receive observations, rewards, and game status information. |
Interfaces: |
- reset(init_board=None, add_random_tile_flag=True): |
Resets the game board and starts a new episode. It returns the initial observation of the game. |
- step(action): |
Advances the game by one step based on the provided action. It returns the new observation, reward, game status, |
and additional information. |
- render(mode='human'): |
Renders the current state of the game for visualization purposes. |
MDP Definition: |
- Observation Space: |
The observation space is a 4x4 grid representing the game board. Each cell in the grid can contain a number from |
0 to 2048. The observation can be in different formats based on the 'obs_type' parameter in the environment configuration. |
- If 'obs_type' is set to 'encode_observation' (default): |
The observation is a 3D numpy array of shape (4, 4, 16). Each cell in the array is represented as a one-hot vector |
encoding the value of the tile in that cell. The one-hot vector has a length of 16, representing the possible tile |
values from 0 to 2048. The first element in the one-hot vector corresponds to an empty cell (0 value). |
- If 'obs_type' is set to 'dict_encoded_board': |
The observation is a dictionary with the following keys: |
- 'observation': A 3D numpy array representing the game board as described above. |
- 'action_mask': A binary mask representing the legal actions that can be taken in the current state. |
- 'to_play': A placeholder value (-1) indicating the current player (not applicable in this game). |
- 'chance': A placeholder value representing the chance outcome (not applicable in this game). |
- If 'obs_type' is set to 'raw_board': |
The observation is the raw game board as a 2D numpy array of shape (4, 4). |
- Action Space: |
The action space is a discrete space with 4 possible actions: |
- 0: Move Up |
- 1: Move Right |
- 2: Move Down |
- 3: Move Left |
- Reward: |
The reward depends on the 'reward_type' parameter in the environment configuration. |
- If 'reward_type' is set to 'raw': |
The reward is a floating-point number representing the immediate reward obtained from the last action. |
- If 'reward_type' is set to 'merged_tiles_plus_log_max_tile_num': |
The reward is a floating-point number representing the number of merged tiles in the current step. |
If the maximum tile number on the board after the step is greater than the previous maximum tile number, |
the reward is further adjusted by adding the logarithm of the new maximum tile number multiplied by 0.1. |
The reward is calculated as follows: reward = num_of_merged_tiles + (log2(new_max_tile_num) * 0.1) |
If the new maximum tile number is the same as the previous maximum tile number, the reward does not |
include the second term. Note: This reward type requires 'reward_normalize' to be set to False. |
- Done: |
The game ends when one of the following conditions is met: |
- The maximum tile number (configured by 'max_tile') is reached. |
- There are no legal moves left. |
- The number of steps in the episode exceeds the maximum episode steps (configured by 'max_episode_steps'). |
- Additional Information: |
The 'info' dictionary returned by the 'step' method contains additional information about the current state. |
The following keys are included in the dictionary: |
- 'raw_reward': The raw reward obtained from the last action. |
- 'current_max_tile_num': The current maximum tile number on the board. |
- Rendering: |
The render method provides a way to visually represent the current state of the game. It offers four distinct rendering modes: |
When set to None, the game state is not rendered. |
In 'state_realtime_mode', the game state is illustrated in a text-based format directly in the console. |
The 'image_realtime_mode' displays the game as an RGB image in real-time. |
With 'image_savefile_mode', the game is rendered as an RGB image but not displayed in real-time. Instead, the image is saved to a designated file. |
Please note that the default rendering mode is set to None. |
""" |
config = dict( |
env_name="game_2048", |
render_mode=None, |
replay_format='gif', |
replay_name_suffix='eval', |
replay_path=None, |
act_scale=True, |
channel_last=True, |
obs_type='dict_encoded_board', |
reward_normalize=False, |
reward_norm_scale=100, |
reward_type='raw', |
max_tile=int(2 ** 16), |
delay_reward_step=0, |
prob_random_agent=0., |
max_episode_steps=int(1e6), |
is_collect=True, |
ignore_legal_actions=True, |
need_flatten=False, |
num_of_possible_chance_tile=2, |
possible_tiles=np.array([2, 4]), |
tile_probabilities=np.array([0.9, 0.1]), |
) |
@classmethod |
def default_config(cls: type) -> EasyDict: |
cfg = EasyDict(copy.deepcopy(cls.config)) |
cfg.cfg_type = cls.__name__ + 'Dict' |
return cfg |
def __init__(self, cfg: dict) -> None: |
self._cfg = cfg |
self._init_flag = False |
self._env_name = cfg.env_name |
self.replay_format = cfg.replay_format |
self.replay_name_suffix = cfg.replay_name_suffix |
self.replay_path = cfg.replay_path |
self.render_mode = cfg.render_mode |
self.channel_last = cfg.channel_last |
self.obs_type = cfg.obs_type |
self.reward_type = cfg.reward_type |
self.reward_normalize = cfg.reward_normalize |
self.reward_norm_scale = cfg.reward_norm_scale |
assert self.reward_type in ['raw', 'merged_tiles_plus_log_max_tile_num'] |
assert self.reward_type == 'raw' or ( |
self.reward_type == 'merged_tiles_plus_log_max_tile_num' and self.reward_normalize is False) |
self.max_tile = cfg.max_tile |
assert self.max_tile is None or isinstance(self.max_tile, int) |
self.max_episode_steps = cfg.max_episode_steps |
self.is_collect = cfg.is_collect |
self.ignore_legal_actions = cfg.ignore_legal_actions |
self.need_flatten = cfg.need_flatten |
self.chance = 0 |
self.chance_space_size = 16 |
self.max_tile_num = 0 |
self.size = 4 |
self.w = self.size |
self.h = self.size |
self.squares = self.size * self.size |
self.episode_return = 0 |
self._action_space = spaces.Discrete(4) |
self._observation_space = spaces.Box(0, 1, (self.w, self.h, self.squares), dtype=int) |
self._reward_range = (0., self.max_tile) |
self.grid_size = 70 |
self.seed() |
self.frames = [] |
self.num_of_possible_chance_tile = cfg.num_of_possible_chance_tile |
self.possible_tiles = cfg.possible_tiles |
self.tile_probabilities = cfg.tile_probabilities |
if self.num_of_possible_chance_tile > 2: |
self.possible_tiles = np.array([2 ** (i + 1) for i in range(self.num_of_possible_chance_tile)]) |
self.tile_probabilities = np.array( |
[1 / self.num_of_possible_chance_tile for _ in range(self.num_of_possible_chance_tile)]) |
assert self.possible_tiles.shape[0] == self.tile_probabilities.shape[0] |
assert np.sum(self.tile_probabilities) == 1 |
def reset(self, init_board=None, add_random_tile_flag=True): |
"""Reset the game board-matrix and add 2 tiles.""" |
self.episode_length = 0 |
self.add_random_tile_flag = add_random_tile_flag |
if init_board is not None: |
self.board = copy.deepcopy(init_board) |
else: |
self.board = np.zeros((self.h, self.w), np.int32) |
for _ in range(2): |
if self.num_of_possible_chance_tile > 2: |
self.add_random_tile(self.possible_tiles, self.tile_probabilities) |
elif self.num_of_possible_chance_tile == 2: |
self.add_random_2_4_tile() |
self.episode_return = 0 |
self._final_eval_reward = 0.0 |
self.should_done = False |
action_mask = np.zeros(4, 'int8') |
action_mask[self.legal_actions] = 1 |
observation = encode_board(self.board).astype(np.float32) |
assert observation.shape == (4, 4, 16) |
if not self.channel_last: |
observation = np.transpose(observation, [2, 0, 1]) |
if self.need_flatten: |
observation = observation.reshape(-1) |
if self.obs_type == 'dict_encoded_board': |
observation = { |
'observation': observation, |
'action_mask': action_mask, |
'to_play': -1, |
'chance': self.chance |
} |
elif self.obs_type == 'raw_board': |
observation = self.board |
elif self.obs_type == 'raw_encoded_board': |
observation = observation |
else: |
raise NotImplementedError |
if self.render_mode is not None: |
self.render(self.render_mode) |
return observation |
def step(self, action): |
""" |
Overview: |
Perform one step of the game. This involves making a move, adding a new tile, and updating the game state. |
New tile could be added randomly or from the tile probabilities. |
The rewards are calculated based on the game configuration ('merged_tiles_plus_log_max_tile_num' or 'raw'). |
The observations are also returned based on the game configuration ('raw_board', 'raw_encoded_board' or 'dict_encoded_board'). |
Arguments: |
- action (:obj:`int`): The action to be performed. |
Returns: |
- BaseEnvTimestep: Contains the new state observation, reward, and other game information. |
""" |
self.episode_length += 1 |
if action not in self.legal_actions: |
logging.warning( |
f"Illegal action: {action}. Legal actions: {self.legal_actions}. " |
"Choosing a random action from legal actions." |
) |
action = np.random.choice(self.legal_actions) |
if self.reward_type == 'merged_tiles_plus_log_max_tile_num': |
empty_num1 = len(self.get_empty_location()) |
raw_reward = float(self.move(action)) |
if self.reward_type == 'merged_tiles_plus_log_max_tile_num': |
empty_num2 = len(self.get_empty_location()) |
num_of_merged_tiles = float(empty_num2 - empty_num1) |
reward_merged_tiles_plus_log_max_tile_num = num_of_merged_tiles |
max_tile_num = self.highest() |
if max_tile_num > self.max_tile_num: |
reward_merged_tiles_plus_log_max_tile_num += np.log2(max_tile_num) * 0.1 |
self.max_tile_num = max_tile_num |
self.episode_return += raw_reward |
assert raw_reward <= 2 ** (self.w * self.h) |
if self.add_random_tile_flag: |
if self.num_of_possible_chance_tile > 2: |
self.add_random_tile(self.possible_tiles, self.tile_probabilities) |
elif self.num_of_possible_chance_tile == 2: |
self.add_random_2_4_tile() |
done = self.is_done() |
if self.reward_type == 'merged_tiles_plus_log_max_tile_num': |
reward_merged_tiles_plus_log_max_tile_num = float(reward_merged_tiles_plus_log_max_tile_num) |
elif self.reward_type == 'raw': |
raw_reward = float(raw_reward) |
if self.episode_length >= self.max_episode_steps: |
done = True |
observation = encode_board(self.board) |
observation = observation.astype(np.float32) |
assert observation.shape == (4, 4, 16) |
if not self.channel_last: |
observation = np.transpose(observation, [2, 0, 1]) |
if self.need_flatten: |
observation = observation.reshape(-1) |
action_mask = np.zeros(4, 'int8') |
action_mask[self.legal_actions] = 1 |
if self.obs_type == 'dict_encoded_board': |
observation = {'observation': observation, 'action_mask': action_mask, 'to_play': -1, 'chance': self.chance} |
elif self.obs_type == 'raw_board': |
observation = self.board |
elif self.obs_type == 'raw_encoded_board': |
observation = observation |
else: |
raise NotImplementedError |
if self.reward_normalize: |
reward_normalize = raw_reward / self.reward_norm_scale |
reward = reward_normalize |
else: |
reward = raw_reward |
self._final_eval_reward += raw_reward |
if self.reward_type == 'merged_tiles_plus_log_max_tile_num': |
reward = to_ndarray([reward_merged_tiles_plus_log_max_tile_num]).astype(np.float32) |
elif self.reward_type == 'raw': |
reward = to_ndarray([reward]).astype(np.float32) |
info = {"raw_reward": raw_reward, "current_max_tile_num": self.highest()} |
if self.render_mode is not None: |
self.render(self.render_mode) |
if done: |
info['eval_episode_return'] = self._final_eval_reward |
if self.render_mode == 'image_savefile_mode': |
self.save_render_output(replay_name_suffix=self.replay_name_suffix, replay_path=self.replay_path, |
format=self.replay_format) |
return BaseEnvTimestep(observation, reward, done, info) |
def move(self, direction, trial=False): |
""" |
Overview: |
Perform one move in the game. The game board can be shifted in one of four directions: up (0), right (1), down (2), or left (3). |
This method manages the shifting process and combines similar adjacent elements. It also returns the reward generated from the move. |
Arguments: |
- direction (:obj:`int`): The direction of the move. |
- trial (:obj:`bool`): If true, this move is only simulated and does not change the actual game state. |
""" |
if not trial: |
logging.debug(["Up", "Right", "Down", "Left"][int(direction)]) |
move_reward = 0 |
merge_direction = 0 if direction in [0, 3] else 1 |
range_x = list(range(self.w)) |
range_y = list(range(self.h)) |
if direction in [0, 2]: |
for y in range(self.h): |
old_col = [self.board[x, y] for x in range_x] |
new_col, reward = self.shift(old_col, merge_direction) |
move_reward += reward |
if old_col != new_col and not trial: |
for x in range_x: |
self.board[x, y] = new_col[x] |
else: |
for x in range(self.w): |
old_row = [self.board[x, y] for y in range_y] |
new_row, reward = self.shift(old_row, merge_direction) |
move_reward += reward |
if old_row != new_row and not trial: |
for y in range_y: |
self.board[x, y] = new_row[y] |
return move_reward |
def shift(self, row, merge_direction): |
""" |
Overview: |
This method shifts the elements in a given row or column of the 2048 board in a specified direction. |
It performs three main operations: removal of zeroes, combination of similar elements, and filling up the |
remaining spaces with zeroes. The direction of shift can be either left (0) or right (1). |
Arguments: |
- row: A list of integers representing a row or a column in the 2048 board. |
- merge_direction: An integer that dictates the direction of merge. It can be either 0 or 1. |
- 0: The elements in the 'row' will be merged towards left/up. |
- 1: The elements in the 'row' will be merged towards right/down. |
Returns: |
- combined_row: A list of integers of the same length as 'row' after shifting and merging. |
- move_reward: The reward gained from combining similar elements in 'row'. It is the sum of all new |
combinations. |
Note: |
This method assumes that the input 'row' is a list of integers and 'merge_direction' is either 0 or 1. |
""" |
non_zero_row = [i for i in row if i != 0] |
start, stop, step = (0, len(non_zero_row), 1) if merge_direction == 0 else (len(non_zero_row) - 1, -1, -1) |
combined_row, move_reward = self.combine(non_zero_row, start, stop, step) |
if merge_direction == 1: |
combined_row = combined_row[::-1] |
if merge_direction == 0: |
combined_row += [0] * (len(row) - len(combined_row)) |
elif merge_direction == 1: |
combined_row = [0] * (len(row) - len(combined_row)) + combined_row |
return combined_row, move_reward |
def combine(self, row, start, stop, step): |
""" |
Overview: |
Combine similar adjacent elements in the row, starting from the specified start index, |
ending at the stop index, and moving in the direction indicated by the step. The function |
also calculates the reward as the sum of all combined elements. |
""" |
move_reward = 0 |
combined_row = [] |
skip_next = False |
for i in range(start, stop, step): |
if skip_next: |
skip_next = False |
continue |
if i + step != stop and row[i] == row[i + step]: |
combined_row.append(row[i] * 2) |
move_reward += row[i] * 2 |
skip_next = True |
else: |
combined_row.append(row[i]) |
return combined_row, move_reward |
@property |
def legal_actions(self): |
""" |
Overview: |
Return the legal actions for the current state. A move is considered legal if it changes the state of the board. |
""" |
if self.ignore_legal_actions: |
return [0, 1, 2, 3] |
legal_actions = [] |
for direction in range(4): |
merge_direction = 0 if direction in [0, 3] else 1 |
range_x = list(range(self.w)) |
range_y = list(range(self.h)) |
if direction % 2 == 0: |
for y in range(self.h): |
old_col = [self.board[x, y] for x in range_x] |
new_col, _ = self.shift(old_col, merge_direction) |
if old_col != new_col: |
legal_actions.append(direction) |
break |
else: |
for x in range(self.w): |
old_row = [self.board[x, y] for y in range_y] |
new_row, _ = self.shift(old_row, merge_direction) |
if old_row != new_row: |
legal_actions.append(direction) |
break |
return legal_actions |
def add_random_2_4_tile(self): |
"""Add a tile with value 2 or 4 with different probabilities.""" |
possible_tiles = np.array([2, 4]) |
tile_probabilities = np.array([0.9, 0.1]) |
tile_val = self.np_random.choice(possible_tiles, 1, p=tile_probabilities)[0] |
empty_location = self.get_empty_location() |
if empty_location.shape[0] == 0: |
self.should_done = True |
return |
empty_idx = self.np_random.choice(empty_location.shape[0]) |
empty = empty_location[empty_idx] |
logging.debug("Adding %s at %s", tile_val, (empty[0], empty[1])) |
if self.chance_space_size == 16: |
self.chance = 4 * empty[0] + empty[1] |
elif self.chance_space_size == 32: |
if tile_val == 2: |
self.chance = 4 * empty[0] + empty[1] |
elif tile_val == 4: |
self.chance = 16 + 4 * empty[0] + empty[1] |
self.board[empty[0], empty[1]] = tile_val |
def add_random_tile(self, possible_tiles: np.array = np.array([2, 4]), |
tile_probabilities: np.array = np.array([0.9, 0.1])): |
"""Add a tile with a value from possible_tiles array according to given probabilities.""" |
if len(possible_tiles) != len(tile_probabilities): |
raise ValueError("Length of possible_tiles and tile_probabilities must be the same") |
if np.sum(tile_probabilities) != 1: |
raise ValueError("Sum of tile_probabilities must be 1") |
tile_val = self.np_random.choice(possible_tiles, 1, p=tile_probabilities)[0] |
tile_idx = np.where(possible_tiles == tile_val)[0][0] |
empty_location = self.get_empty_location() |
if empty_location.shape[0] == 0: |
self.should_done = True |
return |
empty_idx = self.np_random.choice(empty_location.shape[0]) |
empty = empty_location[empty_idx] |
logging.debug("Adding %s at %s", tile_val, (empty[0], empty[1])) |
self.chance_space_size = len(possible_tiles) * 16 |
self.chance = tile_idx * 16 + 4 * empty[0] + empty[1] |
self.board[empty[0], empty[1]] = tile_val |
def get_empty_location(self): |
"""Return a 2d numpy array with the location of empty squares.""" |
return np.argwhere(self.board == 0) |
def highest(self): |
"""Report the highest tile on the board.""" |
return np.max(self.board) |
def is_done(self): |
"""Has the game ended. Game ends if there is a tile equal to the limit |
or there are no legal moves. If there are empty spaces then there |
must be legal moves.""" |
if self.max_tile is not None and self.highest() == self.max_tile: |
return True |
elif len(self.legal_actions) == 0: |
return True |
elif self.should_done: |
return True |
else: |
return False |
def get_board(self): |
"""Get the whole board-matrix, useful for testing.""" |
return self.board |
def set_board(self, new_board): |
"""Set the whole board-matrix, useful for testing.""" |
self.board = new_board |
def seed(self, seed=None, seed1=None): |
"""Set the random seed for the gym environment.""" |
self.np_random, seed = seeding.np_random(seed) |
return [seed] |
def random_action(self) -> np.ndarray: |
random_action = self.action_space.sample() |
if isinstance(random_action, np.ndarray): |
pass |
elif isinstance(random_action, int): |
random_action = to_ndarray([random_action], dtype=np.int64) |
return random_action |
def human_to_action(self): |
""" |
Overview: |
For multiplayer games, ask the user for a legal action |
and return the corresponding action number. |
Returns: |
An integer from the action space. |
""" |
while True: |
try: |
action = int( |
input( |
f"Enter the action (0(Up), 1(Right), 2(Down), or 3(Left)) to play: " |
) |
) |
if action in self.legal_actions: |
break |
else: |
print("Wrong input, try again") |
except KeyboardInterrupt: |
print("exit") |
sys.exit(0) |
return action |
def render(self, mode: str = None): |
""" |
Overview: |
Renders the 2048 game environment. |
Arguments: |
- mode (:obj:`str`): The rendering mode. Options are None, 'state_realtime_mode', 'image_realtime_mode' or 'image_savefile_mode'. |
When set to None, the game state is not rendered. |
In 'state_realtime_mode', the game state is illustrated in a text-based format directly in the console. |
The 'image_realtime_mode' displays the game as an RGB image in real-time. |
With 'image_savefile_mode', the game is rendered as an RGB image but not displayed in real-time. Instead, the image is saved to a designated file. |
Please note that the default rendering mode is set to None. |
""" |
if mode == 'state_realtime_mode': |
s = 'Current Return: {}, '.format(self.episode_return) |
s += 'Current Highest Tile number: {}\n'.format(self.highest()) |
npa = np.array(self.board) |
grid = npa.reshape((self.size, self.size)) |
s += "{}\n".format(grid) |
print(s) |
else: |
grey = (128, 128, 128) |
grid_size = self.grid_size |
pil_board = Image.new("RGB", (grid_size * 4, grid_size * 4)) |
draw = ImageDraw.Draw(pil_board) |
draw.rectangle([0, 0, 4 * grid_size, 4 * grid_size], grey) |
fnt_path = fm.findfont(fm.FontProperties(family='DejaVu Sans')) |
fnt = ImageFont.truetype(fnt_path, 30) |
for y in range(4): |
for x in range(4): |
o = self.board[y, x] |
if o: |
self.draw_tile(draw, x, y, o, fnt) |
if mode == 'image_realtime_mode': |
plt.imshow(np.asarray(pil_board)) |
plt.draw() |
elif mode == 'image_savefile_mode': |
self.frames.append(np.asarray(pil_board)) |
def draw_tile(self, draw, x, y, o, fnt): |
grid_size = self.grid_size |
white = (255, 255, 255) |
tile_colour_map = { |
0: (204, 192, 179), |
2: (238, 228, 218), |
4: (237, 224, 200), |
8: (242, 177, 121), |
16: (245, 149, 99), |
32: (246, 124, 95), |
64: (246, 94, 59), |
128: (237, 207, 114), |
256: (237, 204, 97), |
512: (237, 200, 80), |
1024: (237, 197, 63), |
2048: (237, 194, 46), |
4096: (237, 194, 46), |
8192: (237, 194, 46), |
16384: (237, 194, 46), |
} |
if o: |
draw.rectangle([x * grid_size, y * grid_size, (x + 1) * grid_size, (y + 1) * grid_size], |
tile_colour_map[o]) |
bbox = draw.textbbox((x, y), str(o), font=fnt) |
text_x_size, text_y_size = bbox[2] - bbox[0], bbox[3] - bbox[1] |
draw.text((x * grid_size + (grid_size - text_x_size) // 2, |
y * grid_size + (grid_size - text_y_size) // 2), str(o), font=fnt, fill=white) |
def save_render_output(self, replay_name_suffix: str = '', replay_path=None, format='gif'): |
if replay_path is None: |
filename = f'2048_{replay_name_suffix}.{format}' |
else: |
if not os.path.exists(replay_path): |
os.makedirs(replay_path) |
filename = replay_path + f'/2048_{replay_name_suffix}.{format}' |
if format == 'gif': |
imageio.mimsave(filename, self.frames, 'GIF') |
elif format == 'mp4': |
imageio.mimsave(filename, self.frames, fps=30, codec='mpeg4') |
else: |
raise ValueError("Unsupported format: {}".format(format)) |
logging.info("Saved output to {}".format(filename)) |
self.frames = [] |
@property |
def observation_space(self) -> gym.spaces.Space: |
return self._observation_space |
@property |
def action_space(self) -> gym.spaces.Space: |
return self._action_space |
@property |
def reward_space(self) -> gym.spaces.Space: |
return self._reward_range |
@staticmethod |
def create_collector_env_cfg(cfg: dict) -> List[dict]: |
collector_env_num = cfg.pop('collector_env_num') |
cfg = copy.deepcopy(cfg) |
cfg.is_collect = True |
return [cfg for _ in range(collector_env_num)] |
@staticmethod |
def create_evaluator_env_cfg(cfg: dict) -> List[dict]: |
evaluator_env_num = cfg.pop('evaluator_env_num') |
cfg = copy.deepcopy(cfg) |
cfg.reward_normalize = False |
cfg.is_collect = False |
return [cfg for _ in range(evaluator_env_num)] |
def __repr__(self) -> str: |
return "LightZero game 2048 Env." |
def encode_board(flat_board, num_of_template_tiles=16): |
""" |
Overview: |
This function converts a [4, 4] raw game board into a [4, 4, num_of_template_tiles] one-hot encoded board. |
Arguments: |
- flat_board (:obj:`np.ndarray`): The raw game board, expected to be a 2D numpy array. |
- num_of_template_tiles (:obj:`int`): The number of unique tiles to consider in the encoding, |
default value is 16. |
Returns: |
- one_hot_board (:obj:`np.ndarray`): The one-hot encoded game board. |
""" |
tile_values = 2 ** np.arange(num_of_template_tiles, dtype=int) |
tile_values[0] = 0 |
layered_board = np.repeat(flat_board[:, :, np.newaxis], num_of_template_tiles, axis=-1) |
one_hot_board = (layered_board == tile_values).astype(int) |
return one_hot_board |