File size: 5,614 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
from typing import Union, Optional, List, Any, Tuple
import os
import torch
import logging
from functools import partial
from tensorboardX import SummaryWriter
from copy import deepcopy
from ding.envs import get_vec_env_setting, create_env_manager
from ding.worker import BaseLearner, InteractionSerialEvaluator, BaseSerialCommander, create_buffer, \
create_serial_collector
from ding.config import read_config, compile_config
from ding.policy import create_policy, PolicyFactory
from ding.reward_model import create_reward_model
from ding.utils import set_pkg_seed
from ding.data.level_replay.level_sampler import LevelSampler
from ding.policy.common_utils import default_preprocess_learn
def generate_seeds(num_seeds=500, base_seed=0):
return [base_seed + i for i in range(num_seeds)]
def serial_pipeline_plr(
input_cfg: Union[str, Tuple[dict, dict]],
seed: int = 0,
env_setting: Optional[List[Any]] = None,
model: Optional[torch.nn.Module] = None,
max_train_iter: Optional[int] = int(1e10),
max_env_step: Optional[int] = int(1e10),
) -> 'Policy': # noqa
"""
Overview:
Serial pipeline entry for Priority Level Replay.
Arguments:
- input_cfg (:obj:`Union[str, Tuple[dict, dict]]`): Config in dict type. \
``str`` type means config file path. \
``Tuple[dict, dict]`` type means [user_config, create_cfg].
- seed (:obj:`int`): Random seed.
- env_setting (:obj:`Optional[List[Any]]`): A list with 3 elements: \
``BaseEnv`` subclass, collector env config, and evaluator env config.
- model (:obj:`Optional[torch.nn.Module]`): Instance of torch.nn.Module.
- max_train_iter (:obj:`Optional[int]`): Maximum policy update iterations in training.
- max_env_step (:obj:`Optional[int]`): Maximum collected environment interaction steps.
Returns:
- policy (:obj:`Policy`): Converged policy.
"""
if isinstance(input_cfg, str):
cfg, create_cfg = read_config(input_cfg)
else:
cfg, create_cfg = deepcopy(input_cfg)
create_cfg.policy.type = create_cfg.policy.type + '_command'
env_fn = None if env_setting is None else env_setting[0]
cfg = compile_config(cfg, seed=seed, env=env_fn, auto=True, create_cfg=create_cfg, save_cfg=True)
collector_env_num = cfg.env.collector_env_num
# Create main components: env, policy
if env_setting is None:
env_fn, collector_env_cfg, evaluator_env_cfg = get_vec_env_setting(cfg.env)
else:
env_fn, collector_env_cfg, evaluator_env_cfg = env_setting
collector_env = create_env_manager(cfg.env.manager, [partial(env_fn, cfg=c) for c in collector_env_cfg])
evaluator_env = create_env_manager(cfg.env.manager, [partial(env_fn, cfg=c) for c in evaluator_env_cfg])
collector_env.seed(cfg.seed, dynamic_seed=False)
evaluator_env.seed(cfg.seed, dynamic_seed=True)
train_seeds = generate_seeds()
level_sampler = LevelSampler(
train_seeds, cfg.policy.model.obs_shape, cfg.policy.model.action_shape, collector_env_num, cfg.level_replay
)
set_pkg_seed(cfg.seed, use_cuda=cfg.policy.cuda)
policy = create_policy(cfg.policy, model=model, enable_field=['learn', 'collect', 'eval', 'command'])
# Create worker components: learner, collector, evaluator, replay buffer, commander.
tb_logger = SummaryWriter(os.path.join('./{}/log/'.format(cfg.exp_name), 'serial'))
learner = BaseLearner(cfg.policy.learn.learner, policy.learn_mode, tb_logger, exp_name=cfg.exp_name)
collector = create_serial_collector(
cfg.policy.collect.collector,
env=collector_env,
policy=policy.collect_mode,
tb_logger=tb_logger,
exp_name=cfg.exp_name
)
evaluator = InteractionSerialEvaluator(
cfg.policy.eval.evaluator, evaluator_env, policy.collect_mode, tb_logger, exp_name=cfg.exp_name
)
commander = BaseSerialCommander(
cfg.policy.other.commander, learner, collector, evaluator, None, policy.command_mode
)
# ==========
# Main loop
# ==========
# Learner's before_run hook.
learner.call_hook('before_run')
seeds = [int(level_sampler.sample('sequential')) for _ in range(collector_env_num)]
# default_preprocess_learn function can only deal with the Tensor data
level_seeds = torch.Tensor(seeds)
collector_env.seed(seeds)
collector_env.reset()
while True:
collect_kwargs = commander.step()
# Evaluate policy performance
if evaluator.should_eval(learner.train_iter):
stop, reward = evaluator.eval(learner.save_checkpoint, learner.train_iter, collector.envstep)
if stop:
break
# Collect data by default config n_sample/n_episode
new_data = collector.collect(
train_iter=learner.train_iter, level_seeds=level_seeds, policy_kwargs=collect_kwargs
)
# Learn policy from collected data
learner.train(new_data, collector.envstep)
stacked_data = default_preprocess_learn(new_data, ignore_done=cfg.policy.learn.ignore_done, use_nstep=False)
level_sampler.update_with_rollouts(stacked_data, collector_env_num)
seeds = [int(level_sampler.sample()) for _ in range(collector_env_num)]
level_seeds = torch.Tensor(seeds)
collector_env.seed(seeds)
collector_env.reset()
if collector.envstep >= max_env_step or learner.train_iter >= max_train_iter:
break
# Learner's after_run hook.
learner.call_hook('after_run')
return policy
|