from typing import List
import copy
import numpy as np
import gym
from gym import spaces
from gym.utils import seeding
from ding.envs import BaseEnvTimestep
from ding.utils import ENV_REGISTRY
class Maze(gym.Env):
Environment with random maze layouts. The ASCII representation of the mazes include the following objects:
- `<SPACE>`: empty
- `x`: wall
- `S`: the start location (optional)
- `T`: the target location.
KEY_WALL: 'x',
def __init__(
self._size = cfg.size
self._init_flag = False
self._random_start = True
self._seed = None
self._step = 0
def reset(self):
obs = self._get_obs()
self._step = 0
return self.process_states(obs, self.get_maze_map())
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
def active_init(
reward_fn=lambda x, y, tx, ty: 1 if (x == tx and y == ty) else 0,
done_fn=lambda x, y, tx, ty: x == tx and y == ty
self._maze = self.generate_maze(self.size, self._seed, 'tunnel')
self._num_maze_keys = len(Maze.ASCII_MAP.keys())
nav_map = self.maze_to_ascii(self._maze)
self._map = nav_map
self._tabular_obs = tabular_obs
self._reward_fn = reward_fn
self._done_fn = done_fn
if self._reward_fn is None:
self._reward_fn = lambda x, y, tx, ty: float(x == tx and y == ty)
if self._done_fn is None:
self._done_fn = lambda x, y, tx, ty: False
self._max_x = len(self._map)
if not self._max_x:
raise ValueError('Invalid map.')
self._max_y = len(self._map[0])
if not all(len(m) == self._max_y for m in self._map):
raise ValueError('Invalid map.')
self._start_x, self._start_y = self._find_initial_point()
self._target_x, self._target_y = self._find_target_point()
self._x, self._y = self._start_x, self._start_y
self._n_state = self._max_x * self._max_y
self._n_action = 4
if self._tabular_obs:
self.observation_space = spaces.Discrete(self._n_state)
self.observation_space = spaces.Box(low=0.0, high=np.inf, shape=(16, 16, 3))
self.action_space = spaces.Discrete(self._n_action)
self.reward_space = spaces.Box(low=0, high=1, shape=(1, ), dtype=np.float32)
def random_start(self):
init_x, init_y = self._x, self._y
while True: # Find empty grid cell.
self._x = self.np_random.integers(self._max_x)
self._y = self.np_random.integers(self._max_y)
if self._map[self._x][self._y] != 'x':
ret = copy.deepcopy(self.process_states(self._get_obs(), self.get_maze_map()))
self._x, self._y = init_x, init_y
return ret
def close(self) -> None:
if self._init_flag:
self._init_flag = False
def num_maze_keys(self):
return self._num_maze_keys
def size(self):
return self._size
def process_states(self, observations, maze_maps):
"""Returns [B, W, W, 3] binary values. Channels are (wall; goal; obs)"""
loc = np.eye(self._size * self._size, dtype=np.int64)[observations[0] * self._size + observations[1]]
loc = np.reshape(loc, [self._size, self._size])
maze_maps = maze_maps.astype(np.int64)
states = np.concatenate([maze_maps, loc[Ellipsis, None]], axis=-1, dtype=np.int64)
return states
def get_maze_map(self, stacked=True):
if not stacked:
return self._maze.copy()
wall = self._maze.copy()
target_x, target_y = self.target_location
assert wall[target_x][target_y] == Maze.KEY_TARGET
wall[target_x][target_y] = 0
target = np.zeros((self._size, self._size))
target[target_x][target_y] = 1
assert wall[self._start_x][self._start_y] == Maze.KEY_START
wall[self._start_x][self._start_y] = 0
return np.stack([wall, target], axis=-1)
def generate_maze(self, size, seed, wall_type):
rng, _ = seeding.np_random(seed)
maze = np.full((size, size), fill_value=Maze.KEY_EMPTY, dtype=int)
if wall_type == 'none':
maze[[0, -1], :] = Maze.KEY_WALL
maze[:, [0, -1]] = Maze.KEY_WALL
elif wall_type == 'tunnel':
self.sample_wall(maze, rng)
elif wall_type.startswith('blocks:'):
maze[[0, -1], :] = Maze.KEY_WALL
maze[:, [0, -1]] = Maze.KEY_WALL
self.sample_blocks(maze, rng, int(wall_type.split(':')[-1]))
raise ValueError('Unknown wall type: %s' % wall_type)
loc_target = self.sample_location(maze, rng)
maze[loc_target] = Maze.KEY_TARGET
loc_start = self.sample_location(maze, rng)
maze[loc_start] = Maze.KEY_START
self._start_x, self._start_y = loc_start
return maze
def sample_blocks(self, maze, rng, num_blocks):
"""Sample single-block 'wall' or 'obstacles'."""
for _ in range(num_blocks):
loc = self.sample_location(maze, rng)
maze[loc] = Maze.KEY_WALL
def sample_wall(
self, maze, rng, shortcut_prob=0.1, inner_wall_thickness=1, outer_wall_thickness=1, corridor_thickness=2
room = maze
# step 1: fill everything as wall
room[:] = Maze.KEY_WALL
# step 2: prepare
# we move two pixels at a time, because the walls are also occupying pixels
delta = inner_wall_thickness + corridor_thickness
dx = [delta, -delta, 0, 0]
dy = [0, 0, delta, -delta]
def get_loc_type(y, x):
# remember there is a outside wall of 1 pixel surrounding the room
if (y < outer_wall_thickness or y + corridor_thickness - 1 >= room.shape[0] - outer_wall_thickness):
return 'invalid'
if (x < outer_wall_thickness or x + corridor_thickness - 1 >= room.shape[1] - outer_wall_thickness):
return 'invalid'
# already visited
if room[y, x] == Maze.KEY_EMPTY:
return 'occupied'
return 'valid'
def connect_pixel(y, x, ny, nx):
pixel = Maze.KEY_EMPTY
if ny == y:
room[y:y + corridor_thickness, min(x, nx):max(x, nx) + corridor_thickness] = pixel
room[min(y, ny):max(y, ny) + corridor_thickness, x:x + corridor_thickness] = pixel
def carve_passage_from(y, x):
room[y, x] = Maze.KEY_EMPTY
for direction in rng.permutation(len(dx)):
ny = y + dy[direction]
nx = x + dx[direction]
loc_type = get_loc_type(ny, nx)
if loc_type == 'invalid':
elif loc_type == 'valid':
connect_pixel(y, x, ny, nx)
# recursion
carve_passage_from(ny, nx)
# occupied
# we create shortcut with some probability, this is because
# we do not want to restrict to only one feasible path.
if rng.random() < shortcut_prob:
connect_pixel(y, x, ny, nx)
carve_passage_from(outer_wall_thickness, outer_wall_thickness)
def sample_location(self, maze, rng):
for _ in range(1000):
x, y = rng.integers(low=1, high=self._size, size=2)
if maze[x, y] == Maze.KEY_EMPTY:
return x, y
raise ValueError('Cannot sample empty location, make maze bigger?')
def key_to_ascii(key):
if key in Maze.ASCII_MAP:
return Maze.ASCII_MAP[key]
assert (Maze.KEY_OBJ <= key < Maze.KEY_OBJ + Maze.MAX_OBJ_TYPES)
return chr(ord('1') + key - Maze.KEY_OBJ)
def maze_to_ascii(self, maze):
return [[Maze.key_to_ascii(x) for x in row] for row in maze]
def tabular_obs_action(self, status_obs, action, include_maze_layout=False):
tabular_obs = self.get_tabular_obs(status_obs)
multiplier = self._n_action
if include_maze_layout:
multiplier += self._num_maze_keys
return multiplier * tabular_obs + action
def create_collector_env_cfg(cfg: dict) -> List[dict]:
collector_env_num = cfg.pop('collector_env_num')
cfg = copy.deepcopy(cfg)
cfg.is_train = True
return [cfg for _ in range(collector_env_num)]
def create_evaluator_env_cfg(cfg: dict) -> List[dict]:
evaluator_env_num = cfg.pop('evaluator_env_num')
cfg = copy.deepcopy(cfg)
cfg.is_train = False
return [cfg for _ in range(evaluator_env_num)]
def nav_map(self):
return self._map
def n_state(self):
return self._n_state
def n_action(self):
return self._n_action
def target_location(self):
return self._target_x, self._target_y
def tabular_obs(self):
return self._tabular_obs
def _find_initial_point(self):
for x in range(self._max_x):
for y in range(self._max_y):
if self._map[x][y] == 'S':
return None, None
return None, None
return x, y
def _find_target_point(self):
for x in range(self._max_x):
for y in range(self._max_y):
if self._map[x][y] == 'T':
if self._map[x][y] == 'T':
raise ValueError('Target point not found in map.')
return x, y
def _get_obs(self):
if self._tabular_obs:
return self._x * self._max_y + self._y
return np.array([self._x, self._y])
def get_tabular_obs(self, status_obs):
return self._max_y * status_obs[..., 0] + status_obs[..., 1]
def get_xy(self, state):
x = state / self._max_y
y = state % self._max_y
return x, y
def step(self, action):
last_x, last_y = self._x, self._y
if action == 0:
if self._x < self._max_x - 1:
self._x += 1
elif action == 1:
if self._y < self._max_y - 1:
self._y += 1
elif action == 2:
if self._x > 0:
self._x -= 1
elif action == 3:
if self._y > 0:
self._y -= 1
if self._map[self._x][self._y] == 'x':
self._x, self._y = last_x, last_y
self._step += 1
reward = self._reward_fn(self._x, self._y, self._target_x, self._target_y)
done = self._done_fn(self._x, self._y, self._target_x, self._target_y)
info = {}
if self._step > 100:
done = True
if done:
info['final_eval_reward'] = reward
info['eval_episode_return'] = reward
return BaseEnvTimestep(self.process_states(self._get_obs(), self.get_maze_map()), reward, done, info)
def get_value_map(env):
"""Returns [W, W, A] one-hot VI actions."""
target_location = env.target_location
nav_map = env.nav_map
current_points = [target_location]
chosen_actions = {target_location: 0}
visited_points = {target_location: True}
while current_points:
next_points = []
for point_x, point_y in current_points:
for (action, (next_point_x, next_point_y)) in [(0, (point_x - 1, point_y)), (1, (point_x, point_y - 1)),
(2, (point_x + 1, point_y)), (3, (point_x, point_y + 1))]:
if (next_point_x, next_point_y) in visited_points:
if not (0 <= next_point_x < len(nav_map) and 0 <= next_point_y < len(nav_map[next_point_x])):
if nav_map[next_point_x][next_point_y] == 'x':
next_points.append((next_point_x, next_point_y))
visited_points[(next_point_x, next_point_y)] = True
chosen_actions[(next_point_x, next_point_y)] = action
current_points = next_points
value_map = np.zeros([env.size, env.size, env.n_action])
for (x, y), action in chosen_actions.items():
value_map[x][y][action] = 1
return value_map