File size: 7,223 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
from typing import Union, Optional, Tuple
import os
from functools import partial
from copy import deepcopy
import easydict
import torch
import numpy as np
from tensorboardX import SummaryWriter
from torch.utils.data import DataLoader, Dataset
from ding.envs import get_vec_env_setting, create_env_manager
from ding.worker import BaseLearner, InteractionSerialEvaluator
from ding.config import read_config, compile_config
from ding.policy import create_policy
from ding.utils import set_pkg_seed
from dizoo.maze.envs.maze_env import Maze
# BFS algorithm
def get_vi_sequence(env, observation):
"""Returns [L, W, W] optimal actions."""
xy = np.where(observation[Ellipsis, -1] == 1)
start_x, start_y = xy[0][0], xy[1][0]
target_location = env.target_location
nav_map = env.nav_map
current_points = [target_location]
chosen_actions = {target_location: 0}
visited_points = {target_location: True}
vi_sequence = []
vi_map = np.full((env.size, env.size), fill_value=env.n_action, dtype=np.int32)
found_start = False
while current_points and not found_start:
next_points = []
for point_x, point_y in current_points:
for (action, (next_point_x, next_point_y)) in [(0, (point_x - 1, point_y)), (1, (point_x, point_y - 1)),
(2, (point_x + 1, point_y)), (3, (point_x, point_y + 1))]:
if (next_point_x, next_point_y) in visited_points:
continue
if not (0 <= next_point_x < len(nav_map) and 0 <= next_point_y < len(nav_map[next_point_x])):
continue
if nav_map[next_point_x][next_point_y] == 'x':
continue
next_points.append((next_point_x, next_point_y))
visited_points[(next_point_x, next_point_y)] = True
chosen_actions[(next_point_x, next_point_y)] = action
vi_map[next_point_x, next_point_y] = action
if next_point_x == start_x and next_point_y == start_y:
found_start = True
vi_sequence.append(vi_map.copy())
current_points = next_points
track_back = []
if found_start:
cur_x, cur_y = start_x, start_y
while cur_x != target_location[0] or cur_y != target_location[1]:
act = vi_sequence[-1][cur_x, cur_y]
track_back.append((torch.FloatTensor(env.process_states([cur_x, cur_y], env.get_maze_map())), act))
if act == 0:
cur_x += 1
elif act == 1:
cur_y += 1
elif act == 2:
cur_x -= 1
elif act == 3:
cur_y -= 1
return np.array(vi_sequence), track_back
class BCDataset(Dataset):
def __init__(self, all_data):
self._data = all_data
def __getitem__(self, item):
return {'obs': self._data[item][0], 'action': self._data[item][1]}
def __len__(self):
return len(self._data)
def load_bc_dataset(train_seeds=1, test_seeds=1, batch_size=32):
def load_env(seed):
ccc = easydict.EasyDict({'size': 16})
e = Maze(ccc)
e.seed(seed)
e.reset()
return e
envs = [load_env(i) for i in range(train_seeds + test_seeds)]
data_train = []
data_test = []
for idx, env in enumerate(envs):
if idx < train_seeds:
data = data_train
else:
data = data_test
start_obs = env.process_states(env._get_obs(), env.get_maze_map())
_, track_back = get_vi_sequence(env, start_obs)
data += track_back
train_data = BCDataset(data_train)
test_data = BCDataset(data_test)
train_dataset = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_dataset = DataLoader(test_data, batch_size=batch_size, shuffle=True)
return train_dataset, test_dataset
def serial_pipeline_bc(
input_cfg: Union[str, Tuple[dict, dict]],
seed: int = 0,
model: Optional[torch.nn.Module] = None,
max_iter=int(1e6),
) -> Union['Policy', bool]: # noqa
r"""
Overview:
Serial pipeline entry of imitation learning.
Arguments:
- input_cfg (:obj:`Union[str, Tuple[dict, dict]]`): Config in dict type. \
``str`` type means config file path. \
``Tuple[dict, dict]`` type means [user_config, create_cfg].
- seed (:obj:`int`): Random seed.
- data_path (:obj:`str`): Path of training data.
- model (:obj:`Optional[torch.nn.Module]`): Instance of torch.nn.Module.
Returns:
- policy (:obj:`Policy`): Converged policy.
- convergence (:obj:`bool`): whether il training is converged
"""
if isinstance(input_cfg, str):
cfg, create_cfg = read_config(input_cfg)
else:
cfg, create_cfg = deepcopy(input_cfg)
cfg = compile_config(cfg, seed=seed, auto=True, create_cfg=create_cfg)
# Env, Policy
env_fn, _, evaluator_env_cfg = get_vec_env_setting(cfg.env)
evaluator_env = create_env_manager(cfg.env.manager, [partial(env_fn, cfg=c) for c in evaluator_env_cfg])
# Random seed
evaluator_env.seed(cfg.seed, dynamic_seed=False)
set_pkg_seed(cfg.seed, use_cuda=cfg.policy.cuda)
policy = create_policy(cfg.policy, model=model, enable_field=['learn', 'eval'])
# Main components
tb_logger = SummaryWriter(os.path.join('./{}/log/'.format(cfg.exp_name), 'serial'))
dataloader, test_dataloader = load_bc_dataset()
learner = BaseLearner(cfg.policy.learn.learner, policy.learn_mode, tb_logger, exp_name=cfg.exp_name)
evaluator = InteractionSerialEvaluator(
cfg.policy.eval.evaluator, evaluator_env, policy.eval_mode, tb_logger, exp_name=cfg.exp_name
)
# ==========
# Main loop
# ==========
learner.call_hook('before_run')
stop = False
iter_cnt = 0
for epoch in range(cfg.policy.learn.train_epoch):
# Evaluate policy performance
loss_list = []
for _, bat in enumerate(test_dataloader):
bat['action'] = bat['action'].long()
res = policy._forward_eval(bat['obs'])
res = torch.argmax(res['logit'], dim=1)
loss_list.append(torch.sum(res == bat['action'].squeeze(-1)).item() / bat['action'].shape[0])
label = 'validation_acc'
tb_logger.add_scalar(label, sum(loss_list) / len(loss_list), iter_cnt)
for i, train_data in enumerate(dataloader):
if evaluator.should_eval(learner.train_iter):
stop, reward = evaluator.eval(learner.save_checkpoint, learner.train_iter)
if stop:
break
train_data['action'] = train_data['action'].long()
learner.train(train_data)
iter_cnt += 1
if iter_cnt >= max_iter:
stop = True
break
if stop:
break
learner.call_hook('after_run')
print('final reward is: {}'.format(reward))
return policy, stop
if __name__ == '__main__':
from dizoo.maze.config.maze_bc_config import main_config, create_config
serial_pipeline_bc([main_config, create_config], seed=0)
|