zjowowen's picture
init space
079c32c
raw
history blame
6.25 kB
import sys
from typing import Any, List, Optional, Union
import gym
import gym_soccer
import numpy as np
from ding.envs import BaseEnv, BaseEnvInfo, BaseEnvTimestep
from ding.envs.common.common_function import affine_transform
from ding.envs.common.env_element import EnvElementInfo
from ding.torch_utils import to_list, to_ndarray, to_tensor
from ding.utils import ENV_REGISTRY
from gym.utils import seeding
import copy
@ENV_REGISTRY.register('gym_soccer')
class GymSoccerEnv(BaseEnv):
default_env_id = ['Soccer-v0', 'SoccerEmptyGoal-v0', 'SoccerAgainstKeeper-v0']
def __init__(self, cfg: dict = {}) -> None:
self._cfg = cfg
self._act_scale = cfg.act_scale
self._env_id = cfg.env_id
assert self._env_id in self.default_env_id
self._init_flag = False
self._replay_path = './game_log'
def reset(self) -> np.array:
if not self._init_flag:
self._env = gym.make(self._env_id, replay_path=self._replay_path, port=self._cfg.port) # TODO
self._init_flag = True
self._eval_episode_return = 0
obs = self._env.reset()
obs = to_ndarray(obs).astype(np.float32)
return obs
def step(self, action: List) -> BaseEnvTimestep:
if self._act_scale:
# The continuous action is a Tensor of size = (1,)
# We indexed at [0] to fetch it as a scalar value
action[1][0] = affine_transform(action[1][0], min_val=0, max_val=100)
action[2][0] = affine_transform(action[2][0], min_val=-180, max_val=180)
action[3][0] = affine_transform(action[3][0], min_val=-180, max_val=180)
action[4][0] = affine_transform(action[4][0], min_val=0, max_val=100)
action[5][0] = affine_transform(action[5][0], min_val=-180, max_val=180)
obs, rew, done, info = self._env.step(action)
self._eval_episode_return += rew
if done:
info['eval_episode_return'] = self._eval_episode_return
obs = to_ndarray(obs).astype(np.float32)
# reward wrapped to be transfered to a numpy array with shape (1,)
rew = to_ndarray([rew])
# '1' indicates the discrete action is associated with the continuous parameters
info['action_args_mask'] = np.array([[1, 1, 0, 0, 0], [0, 0, 1, 0, 0], [0, 0, 0, 1, 1]])
return BaseEnvTimestep(obs, rew, done, info)
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def close(self) -> None:
self._init_flag = False
def get_random_action(self):
# discrete action type: 0, 1, 2
# continuous action_args:
# - power: [0, 100]
# - direction: [-180, 180]
# the action space is (6,), the first indicates discrete action and the remaining indicates continuous action
# discrete action 0 assotiated with the first and second continuous parameters
# discrete action 1 assotiated with the third continuous parameter
# discrete action 2 assotiated with the forth and fifth continuous parameters
return self._env.action_space.sample()
def info(self) -> BaseEnvInfo:
T = EnvElementInfo
return BaseEnvInfo(
agent_num=1,
obs_space=T(
(59, ),
{
# [min, max]
'min': -1,
'max': 1,
'dtype': np.float32,
},
),
act_space=T(
# the discrete action shape is (3,)
# however, the continuous action shape is (5,), which is not revealed in the info
(
3,
),
{
# [min, max)
'min': 0,
'max': 3,
'dtype': int,
},
),
rew_space=T(
(1, ),
{
# [min, max)
'min': 0,
'max': 2.0,
'dtype': int,
},
),
use_wrappers=None,
)
def render(self, close=False):
self._env.render(close)
def __repr__(self) -> str:
return "DI-engine gym soccer Env"
def replay_log(self, log_path):
self._env.replay_log(log_path)
def enable_save_replay(self, replay_path: Optional[str] = None) -> None:
if replay_path is None:
replay_path = './game_log'
self._replay_path = replay_path
def create_collector_env_cfg(cfg: dict) -> List[dict]:
"""
Overview:
Return a list of all of the environment from input config.
Arguments:
- cfg (:obj:`Dict`) Env config, same config where ``self.__init__()`` takes arguments from
Returns:
- List of ``cfg`` including all of the collector env's config
"""
cfg_list = []
collector_env_num = cfg.pop('collector_env_num')
port_pool = list(range(6000, 9999))
port_candidates = np.random.choice(port_pool, size=collector_env_num, replace=False)
for i in range(collector_env_num):
cfg_copy = copy.deepcopy(cfg)
cfg_copy.port = port_candidates[i]
cfg_list.append(cfg_copy)
return cfg_list
def create_evaluator_env_cfg(cfg: dict) -> List[dict]:
"""
Overview:
Return a list of all of the environment from input config.
Arguments:
- cfg (:obj:`Dict`) Env config, same config where ``self.__init__()`` takes arguments from
Returns:
- List of ``cfg`` including all of the evaluator env's config
"""
cfg_list = []
evaluator_env_num = cfg.pop('evaluator_env_num')
port_pool = list(range(6000, 9999))
port_candidates = np.random.choice(port_pool, size=evaluator_env_num, replace=False)
for i in range(evaluator_env_num):
cfg_copy = copy.deepcopy(cfg)
cfg_copy.port = port_candidates[i]
cfg_list.append(cfg_copy)
return cfg_list