File size: 7,430 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import copy
from collections import namedtuple
from typing import Any, List, Union
import gfootball
import gfootball.env as football_env
import numpy as np
from ding.envs import BaseEnv, BaseEnvTimestep, BaseEnvInfo
from ding.envs.common.env_element import EnvElement, EnvElementInfo
from ding.torch_utils import to_ndarray
from ding.utils import ENV_REGISTRY
from dizoo.gfootball.envs.obs.encoder import FeatureEncoder
from dizoo.gfootball.envs.obs.gfootball_obs import FullObs
from dizoo.gfootball.envs.action.gfootball_action import GfootballSpAction
@ENV_REGISTRY.register('gfootball_sp')
class GfootballEnv(BaseEnv):
timestep = namedtuple('GfootballTimestep', ['obs', 'reward', 'done', 'info'])
info_template = namedtuple('GFootballEnvInfo', ['obs_space', 'act_space', 'rew_space'])
def __init__(self, cfg: dict) -> None:
self._cfg = cfg
self.save_replay = self._cfg.save_replay
# self.env_name = cfg.get("env_name", "11_vs_11_kaggle")
self.gui = self._cfg.render
self._obs_helper = FullObs(cfg)
self._action_helper = GfootballSpAction(cfg)
self._launch_env_flag = False
self._encoder = FeatureEncoder()
self.is_evaluator = self._cfg.get("is_evaluator", False)
if self.is_evaluator:
self.env_name = "11_vs_11_hard_stochastic"
self.right_role_num = 0
else:
self.env_name = "11_vs_11_kaggle"
self.right_role_num = 1
def _make_env(self):
self._env = football_env.create_environment(
env_name=self.env_name,
representation='raw',
stacked=False,
logdir='/tmp/football',
write_goal_dumps=False,
write_full_episode_dumps=self.save_replay,
write_video=self.save_replay,
render=self.gui,
number_of_right_players_agent_controls=self.right_role_num
)
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed:
np_seed = 100 * np.random.randint(1, 1000)
self._env.seed(self._seed + np_seed)
elif hasattr(self, '_seed'):
self._env.seed(self._seed)
self._launch_env_flag = True
if self.is_evaluator:
self._eval_episode_return = [0, 0]
else:
self._eval_episode_return = [0, 0]
def reset(self) -> np.ndarray:
if not self._launch_env_flag:
self._make_env()
self._init_flag = True
self._env.reset()
obs = self._env.observation()
if self.is_evaluator:
self._prev_obs = obs[0]
obs = self._encoder.encode(obs[0])
return [obs, obs]
else:
self._prev_obs, self.prev_obs_opponent = obs
obs_ = self._encoder.encode(obs[0])
obs_opponent = self._encoder.encode(obs[1])
return [obs_, obs_opponent]
def close(self) -> None:
if self._launch_env_flag:
self._env.close()
self._launch_env_flag = False
def seed(self, seed: int, dynamic_seed: int = None) -> None:
self._seed = seed
if dynamic_seed:
self._dynamic_seed = dynamic_seed
def step(self, action) -> 'GfootballEnv.timestep':
action = to_ndarray(action)
# action = self.process_action(action) # process
raw_obs, raw_rew, done, info = self._env.step(action)
if self.is_evaluator:
raw_obs = raw_obs[0]
rew = GfootballEnv.calc_reward(raw_rew, self._prev_obs, raw_obs)
obs = to_ndarray(self._encoder.encode(raw_obs))
rew = [rew, rew]
obs = [obs, obs]
self._eval_episode_return[0] += raw_rew
self._eval_episode_return[1] += raw_rew
else:
rew = GfootballEnv.calc_reward(raw_rew[0], self._prev_obs, raw_obs[0])
rew_oppo = GfootballEnv.calc_reward(raw_rew[1], self._prev_obs, raw_obs[1])
rew = [rew, rew_oppo]
obs = [to_ndarray(self._encoder.encode(raw_obs[0])), to_ndarray(self._encoder.encode(raw_obs[1]))]
self._eval_episode_return[0] += raw_rew[0]
self._eval_episode_return[1] += raw_rew[1]
if done:
if self.is_evaluator:
info['eval_episode_return'] = self._eval_episode_return
else:
info[0]['eval_episode_return'] = self._eval_episode_return[0]
info[1]['eval_episode_return'] = self._eval_episode_return[1]
return BaseEnvTimestep(obs, rew, done, info)
def info(self) -> BaseEnvInfo:
info_data = {
'obs_space': self._obs_helper.info,
'act_space': self._action_helper.info,
'rew_space': EnvElementInfo(
shape=1,
value={
'min': np.float64("-inf"),
'max': np.float64("inf"),
'dtype': np.float32
},
),
}
return GfootballEnv.info_template(**info_data)
def __repr__(self) -> str:
return "DI-engine Gfootball Env({})".format(self.env_name)
@staticmethod
def calc_reward(rew, prev_obs, obs):
"""
Reward disign referred to [football-pairs](https://github.com/seungeunrho/football-paris/blob/main/rewarders/rewarder_basic.py)
"""
ball_x, ball_y, ball_z = obs['ball']
MIDDLE_X, PENALTY_X, END_X = 0.2, 0.64, 1.0
PENALTY_Y, END_Y = 0.27, 0.42
ball_position_r = 0.0
if (-END_X <= ball_x and ball_x < -PENALTY_X) and (-PENALTY_Y < ball_y and ball_y < PENALTY_Y):
ball_position_r = -2.0
elif (-END_X <= ball_x and ball_x < -MIDDLE_X) and (-END_Y < ball_y and ball_y < END_Y):
ball_position_r = -1.0
elif (-MIDDLE_X <= ball_x and ball_x <= MIDDLE_X) and (-END_Y < ball_y and ball_y < END_Y):
ball_position_r = 0.0
elif (PENALTY_X < ball_x and ball_x <= END_X) and (-PENALTY_Y < ball_y and ball_y < PENALTY_Y):
ball_position_r = 2.0
elif (MIDDLE_X < ball_x and ball_x <= END_X) and (-END_Y < ball_y and ball_y < END_Y):
ball_position_r = 1.0
else:
ball_position_r = 0.0
left_yellow = np.sum(obs["left_team_yellow_card"]) - np.sum(prev_obs["left_team_yellow_card"])
right_yellow = np.sum(obs["right_team_yellow_card"]) - np.sum(prev_obs["right_team_yellow_card"])
yellow_r = right_yellow - left_yellow
win_reward = 0.0
if obs['steps_left'] == 0:
[my_score, opponent_score] = obs['score']
if my_score > opponent_score:
win_reward = 1.0
reward = 5.0 * win_reward + 5.0 * rew + 0.003 * ball_position_r + yellow_r
return reward
@staticmethod
def create_collector_env_cfg(cfg: dict) -> List[dict]:
collector_cfg = copy.deepcopy(cfg)
collector_env_num = collector_cfg.pop('collector_env_num', 1)
collector_cfg.is_evaluator = False
return [collector_cfg for _ in range(collector_env_num)]
@staticmethod
def create_evaluator_env_cfg(cfg: dict) -> List[dict]:
evaluator_cfg = copy.deepcopy(cfg)
evaluator_env_num = evaluator_cfg.pop('evaluator_env_num', 1)
evaluator_cfg.is_evaluator = True
return [evaluator_cfg for _ in range(evaluator_env_num)]
|