File size: 5,912 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
from typing import Optional
import copy
import os
import gym
import numpy as np
from easydict import EasyDict
from ding.envs import BaseEnv, BaseEnvTimestep
from ding.envs import ObsPlusPrevActRewWrapper
from ding.envs.common import affine_transform, save_frames_as_gif
from ding.torch_utils import to_ndarray
from ding.utils import ENV_REGISTRY
@ENV_REGISTRY.register('carracing')
class CarRacingEnv(BaseEnv):
config = dict(
replay_path=None,
save_replay_gif=False,
replay_path_gif=None,
action_clip=False,
)
@classmethod
def default_config(cls: type) -> EasyDict:
cfg = EasyDict(copy.deepcopy(cls.config))
cfg.cfg_type = cls.__name__ + 'Dict'
return cfg
def __init__(self, cfg: dict) -> None:
self._cfg = cfg
self._init_flag = False
# env_id:CarRacing-v2
self._env_id = cfg.env_id
self._replay_path = None
self._replay_path_gif = cfg.replay_path_gif
self._save_replay_gif = cfg.save_replay_gif
self._save_replay_count = 0
if cfg.continuous:
self._act_scale = cfg.act_scale # act_scale only works in continuous env
self._action_clip = cfg.action_clip
else:
self._act_scale = False
def reset(self) -> np.ndarray:
if not self._init_flag:
self._env = gym.make(self._cfg.env_id, continuous=self._cfg.continuous)
if self._replay_path is not None:
self._env = gym.wrappers.RecordVideo(
self._env,
video_folder=self._replay_path,
episode_trigger=lambda episode_id: True,
name_prefix='rl-video-{}'.format(id(self))
)
self._observation_space = gym.spaces.Box(
low=np.min(self._env.observation_space.low.astype(np.float32) / 255),
high=np.max(self._env.observation_space.high.astype(np.float32) / 255),
shape=(
self._env.observation_space.shape[2], self._env.observation_space.shape[0],
self._env.observation_space.shape[1]
),
dtype=np.float32
)
self._action_space = self._env.action_space
self._reward_space = gym.spaces.Box(
low=self._env.reward_range[0], high=self._env.reward_range[1], shape=(1, ), dtype=np.float32
)
self._init_flag = True
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed:
np_seed = 100 * np.random.randint(1, 1000)
self._env.seed(self._seed + np_seed)
elif hasattr(self, '_seed'):
self._env.seed(self._seed)
self._eval_episode_return = 0
obs = self._env.reset()
obs = obs.astype(np.float32) / 255
obs = obs.transpose(2, 0, 1)
obs = to_ndarray(obs)
if self._save_replay_gif:
self._frames = []
return obs
def close(self) -> None:
if self._init_flag:
self._env.close()
self._init_flag = False
def render(self) -> None:
self._env.render()
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def step(self, action: np.ndarray) -> BaseEnvTimestep:
assert isinstance(action, np.ndarray), type(action)
if action.shape == (1, ):
action = action.item() # 0-dim array
if self._act_scale:
action = affine_transform(action, action_clip=self._action_clip, min_val=-1, max_val=1)
if self._save_replay_gif:
self._frames.append(self._env.render(mode='rgb_array'))
obs, rew, done, info = self._env.step(action)
obs = obs.astype(np.float32) / 255
obs = obs.transpose(2, 0, 1)
self._eval_episode_return += rew
if done:
info['eval_episode_return'] = self._eval_episode_return
if self._save_replay_gif:
if not os.path.exists(self._replay_path_gif):
os.makedirs(self._replay_path_gif)
path = os.path.join(
self._replay_path_gif, '{}_episode_{}.gif'.format(self._env_id, self._save_replay_count)
)
save_frames_as_gif(self._frames, path)
self._save_replay_count += 1
obs = to_ndarray(obs)
rew = to_ndarray([rew]).astype(np.float32) # wrapped to be transferred to a array with shape (1,)
return BaseEnvTimestep(obs, rew, done, info)
def enable_save_replay(self, replay_path: Optional[str] = None) -> None:
if replay_path is None:
replay_path = './video'
self._replay_path = replay_path
self._save_replay_gif = True
self._save_replay_count = 0
# this function can lead to the meaningless result
self._env = gym.wrappers.RecordVideo(
self._env,
video_folder=self._replay_path,
episode_trigger=lambda episode_id: True,
name_prefix='rl-video-{}'.format(id(self))
)
def random_action(self) -> np.ndarray:
random_action = self.action_space.sample()
if isinstance(random_action, np.ndarray):
pass
elif isinstance(random_action, int):
random_action = to_ndarray([random_action], dtype=np.int64)
return random_action
@property
def observation_space(self) -> gym.spaces.Space:
return self._observation_space
@property
def action_space(self) -> gym.spaces.Space:
return self._action_space
@property
def reward_space(self) -> gym.spaces.Space:
return self._reward_space
def __repr__(self) -> str:
return "DI-engine CarRacing Env"
|