File size: 6,247 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import sys
from typing import Any, List, Optional, Union
import gym
import gym_soccer
import numpy as np
from ding.envs import BaseEnv, BaseEnvInfo, BaseEnvTimestep
from ding.envs.common.common_function import affine_transform
from ding.envs.common.env_element import EnvElementInfo
from ding.torch_utils import to_list, to_ndarray, to_tensor
from ding.utils import ENV_REGISTRY
from gym.utils import seeding
import copy
@ENV_REGISTRY.register('gym_soccer')
class GymSoccerEnv(BaseEnv):
default_env_id = ['Soccer-v0', 'SoccerEmptyGoal-v0', 'SoccerAgainstKeeper-v0']
def __init__(self, cfg: dict = {}) -> None:
self._cfg = cfg
self._act_scale = cfg.act_scale
self._env_id = cfg.env_id
assert self._env_id in self.default_env_id
self._init_flag = False
self._replay_path = './game_log'
def reset(self) -> np.array:
if not self._init_flag:
self._env = gym.make(self._env_id, replay_path=self._replay_path, port=self._cfg.port) # TODO
self._init_flag = True
self._eval_episode_return = 0
obs = self._env.reset()
obs = to_ndarray(obs).astype(np.float32)
return obs
def step(self, action: List) -> BaseEnvTimestep:
if self._act_scale:
# The continuous action is a Tensor of size = (1,)
# We indexed at [0] to fetch it as a scalar value
action[1][0] = affine_transform(action[1][0], min_val=0, max_val=100)
action[2][0] = affine_transform(action[2][0], min_val=-180, max_val=180)
action[3][0] = affine_transform(action[3][0], min_val=-180, max_val=180)
action[4][0] = affine_transform(action[4][0], min_val=0, max_val=100)
action[5][0] = affine_transform(action[5][0], min_val=-180, max_val=180)
obs, rew, done, info = self._env.step(action)
self._eval_episode_return += rew
if done:
info['eval_episode_return'] = self._eval_episode_return
obs = to_ndarray(obs).astype(np.float32)
# reward wrapped to be transfered to a numpy array with shape (1,)
rew = to_ndarray([rew])
# '1' indicates the discrete action is associated with the continuous parameters
info['action_args_mask'] = np.array([[1, 1, 0, 0, 0], [0, 0, 1, 0, 0], [0, 0, 0, 1, 1]])
return BaseEnvTimestep(obs, rew, done, info)
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def close(self) -> None:
self._init_flag = False
def get_random_action(self):
# discrete action type: 0, 1, 2
# continuous action_args:
# - power: [0, 100]
# - direction: [-180, 180]
# the action space is (6,), the first indicates discrete action and the remaining indicates continuous action
# discrete action 0 assotiated with the first and second continuous parameters
# discrete action 1 assotiated with the third continuous parameter
# discrete action 2 assotiated with the forth and fifth continuous parameters
return self._env.action_space.sample()
def info(self) -> BaseEnvInfo:
T = EnvElementInfo
return BaseEnvInfo(
agent_num=1,
obs_space=T(
(59, ),
{
# [min, max]
'min': -1,
'max': 1,
'dtype': np.float32,
},
),
act_space=T(
# the discrete action shape is (3,)
# however, the continuous action shape is (5,), which is not revealed in the info
(
3,
),
{
# [min, max)
'min': 0,
'max': 3,
'dtype': int,
},
),
rew_space=T(
(1, ),
{
# [min, max)
'min': 0,
'max': 2.0,
'dtype': int,
},
),
use_wrappers=None,
)
def render(self, close=False):
self._env.render(close)
def __repr__(self) -> str:
return "DI-engine gym soccer Env"
def replay_log(self, log_path):
self._env.replay_log(log_path)
def enable_save_replay(self, replay_path: Optional[str] = None) -> None:
if replay_path is None:
replay_path = './game_log'
self._replay_path = replay_path
def create_collector_env_cfg(cfg: dict) -> List[dict]:
"""
Overview:
Return a list of all of the environment from input config.
Arguments:
- cfg (:obj:`Dict`) Env config, same config where ``self.__init__()`` takes arguments from
Returns:
- List of ``cfg`` including all of the collector env's config
"""
cfg_list = []
collector_env_num = cfg.pop('collector_env_num')
port_pool = list(range(6000, 9999))
port_candidates = np.random.choice(port_pool, size=collector_env_num, replace=False)
for i in range(collector_env_num):
cfg_copy = copy.deepcopy(cfg)
cfg_copy.port = port_candidates[i]
cfg_list.append(cfg_copy)
return cfg_list
def create_evaluator_env_cfg(cfg: dict) -> List[dict]:
"""
Overview:
Return a list of all of the environment from input config.
Arguments:
- cfg (:obj:`Dict`) Env config, same config where ``self.__init__()`` takes arguments from
Returns:
- List of ``cfg`` including all of the evaluator env's config
"""
cfg_list = []
evaluator_env_num = cfg.pop('evaluator_env_num')
port_pool = list(range(6000, 9999))
port_candidates = np.random.choice(port_pool, size=evaluator_env_num, replace=False)
for i in range(evaluator_env_num):
cfg_copy = copy.deepcopy(cfg)
cfg_copy.port = port_candidates[i]
cfg_list.append(cfg_copy)
return cfg_list
|