|
import random |
|
from typing import Dict, List, Any, Tuple |
|
import numpy as np |
|
|
|
from mlagents_envs.base_env import ( |
|
ActionSpec, |
|
ObservationSpec, |
|
ObservationType, |
|
ActionTuple, |
|
BaseEnv, |
|
BehaviorSpec, |
|
DecisionSteps, |
|
TerminalSteps, |
|
BehaviorMapping, |
|
) |
|
from .test_rpc_utils import proto_from_steps_and_action |
|
from mlagents_envs.communicator_objects.agent_info_action_pair_pb2 import ( |
|
AgentInfoActionPairProto, |
|
) |
|
from mlagents.trainers.tests.dummy_config import create_observation_specs_with_shapes |
|
|
|
OBS_SIZE = 1 |
|
VIS_OBS_SIZE = (20, 20, 3) |
|
VAR_LEN_SIZE = (10, 5) |
|
STEP_SIZE = 0.2 |
|
|
|
TIME_PENALTY = 0.01 |
|
MIN_STEPS = int(1.0 / STEP_SIZE) + 1 |
|
SUCCESS_REWARD = 1.0 + MIN_STEPS * TIME_PENALTY |
|
|
|
|
|
def clamp(x, min_val, max_val): |
|
return max(min_val, min(x, max_val)) |
|
|
|
|
|
class SimpleEnvironment(BaseEnv): |
|
""" |
|
Very simple "game" - the agent has a position on [-1, 1], gets a reward of 1 if it reaches 1, and a reward of -1 if |
|
it reaches -1. The position is incremented by the action amount (clamped to [-step_size, step_size]). |
|
""" |
|
|
|
def __init__( |
|
self, |
|
brain_names, |
|
step_size=STEP_SIZE, |
|
num_visual=0, |
|
num_vector=1, |
|
num_var_len=0, |
|
vis_obs_size=VIS_OBS_SIZE, |
|
vec_obs_size=OBS_SIZE, |
|
var_len_obs_size=VAR_LEN_SIZE, |
|
action_sizes=(1, 0), |
|
goal_indices=None, |
|
): |
|
super().__init__() |
|
self.num_visual = num_visual |
|
self.num_vector = num_vector |
|
self.num_var_len = num_var_len |
|
self.vis_obs_size = vis_obs_size |
|
self.vec_obs_size = vec_obs_size |
|
self.var_len_obs_size = var_len_obs_size |
|
self.goal_indices = goal_indices |
|
continuous_action_size, discrete_action_size = action_sizes |
|
discrete_tuple = tuple(2 for _ in range(discrete_action_size)) |
|
action_spec = ActionSpec(continuous_action_size, discrete_tuple) |
|
self.total_action_size = ( |
|
continuous_action_size + discrete_action_size |
|
) |
|
self.action_spec = action_spec |
|
self.behavior_spec = BehaviorSpec(self._make_observation_specs(), action_spec) |
|
self.action_spec = action_spec |
|
self.names = brain_names |
|
self.positions: Dict[str, List[float]] = {} |
|
self.step_count: Dict[str, float] = {} |
|
|
|
|
|
seed = ( |
|
brain_names, |
|
step_size, |
|
num_visual, |
|
num_vector, |
|
num_var_len, |
|
vis_obs_size, |
|
vec_obs_size, |
|
var_len_obs_size, |
|
action_sizes, |
|
) |
|
self.random = random.Random(str(seed)) |
|
|
|
self.goal: Dict[str, int] = {} |
|
self.action = {} |
|
self.rewards: Dict[str, float] = {} |
|
self.final_rewards: Dict[str, List[float]] = {} |
|
self.step_result: Dict[str, Tuple[DecisionSteps, TerminalSteps]] = {} |
|
self.agent_id: Dict[str, int] = {} |
|
self.step_size = step_size |
|
|
|
self.academy_capabilities = None |
|
|
|
for name in self.names: |
|
self.agent_id[name] = 0 |
|
self.goal[name] = self.random.choice([-1, 1]) |
|
self.rewards[name] = 0 |
|
self.final_rewards[name] = [] |
|
self._reset_agent(name) |
|
self.action[name] = None |
|
self.step_result[name] = None |
|
|
|
def _make_observation_specs(self) -> List[ObservationSpec]: |
|
obs_shape: List[Any] = [] |
|
for _ in range(self.num_vector): |
|
obs_shape.append((self.vec_obs_size,)) |
|
for _ in range(self.num_visual): |
|
obs_shape.append(self.vis_obs_size) |
|
for _ in range(self.num_var_len): |
|
obs_shape.append(self.var_len_obs_size) |
|
obs_spec = create_observation_specs_with_shapes(obs_shape) |
|
if self.goal_indices is not None: |
|
for i in range(len(obs_spec)): |
|
if i in self.goal_indices: |
|
obs_spec[i] = ObservationSpec( |
|
shape=obs_spec[i].shape, |
|
dimension_property=obs_spec[i].dimension_property, |
|
observation_type=ObservationType.GOAL_SIGNAL, |
|
name=obs_spec[i].name, |
|
) |
|
return obs_spec |
|
|
|
def _make_obs(self, value: float) -> List[np.ndarray]: |
|
obs = [] |
|
for _ in range(self.num_vector): |
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * value) |
|
for _ in range(self.num_visual): |
|
obs.append(np.ones((1,) + self.vis_obs_size, dtype=np.float32) * value) |
|
for _ in range(self.num_var_len): |
|
obs.append(np.ones((1,) + self.var_len_obs_size, dtype=np.float32) * value) |
|
return obs |
|
|
|
@property |
|
def behavior_specs(self): |
|
behavior_dict = {} |
|
for n in self.names: |
|
behavior_dict[n] = self.behavior_spec |
|
return BehaviorMapping(behavior_dict) |
|
|
|
def set_action_for_agent(self, behavior_name, agent_id, action): |
|
pass |
|
|
|
def set_actions(self, behavior_name, action): |
|
self.action[behavior_name] = action |
|
|
|
def get_steps(self, behavior_name): |
|
return self.step_result[behavior_name] |
|
|
|
def _take_action(self, name: str) -> bool: |
|
deltas = [] |
|
_act = self.action[name] |
|
if self.action_spec.continuous_size > 0: |
|
for _cont in _act.continuous[0]: |
|
deltas.append(_cont) |
|
if self.action_spec.discrete_size > 0: |
|
for _disc in _act.discrete[0]: |
|
deltas.append(1 if _disc else -1) |
|
for i, _delta in enumerate(deltas): |
|
_delta = clamp(_delta, -self.step_size, self.step_size) |
|
self.positions[name][i] += _delta |
|
self.positions[name][i] = clamp(self.positions[name][i], -1, 1) |
|
self.step_count[name] += 1 |
|
|
|
done = all(pos >= 1.0 or pos <= -1.0 for pos in self.positions[name]) |
|
return done |
|
|
|
def _generate_mask(self): |
|
action_mask = None |
|
if self.action_spec.discrete_size > 0: |
|
|
|
ndmask = np.array( |
|
2 * self.action_spec.discrete_size * [False], dtype=np.bool |
|
) |
|
ndmask = np.expand_dims(ndmask, axis=0) |
|
action_mask = [ndmask] |
|
return action_mask |
|
|
|
def _compute_reward(self, name: str, done: bool) -> float: |
|
if done: |
|
reward = 0.0 |
|
for _pos in self.positions[name]: |
|
reward += (SUCCESS_REWARD * _pos * self.goal[name]) / len( |
|
self.positions[name] |
|
) |
|
else: |
|
reward = -TIME_PENALTY |
|
return reward |
|
|
|
def _reset_agent(self, name): |
|
self.goal[name] = self.random.choice([-1, 1]) |
|
self.positions[name] = [0.0 for _ in range(self.total_action_size)] |
|
self.step_count[name] = 0 |
|
self.rewards[name] = 0 |
|
self.agent_id[name] = self.agent_id[name] + 1 |
|
|
|
def _make_batched_step( |
|
self, name: str, done: bool, reward: float, group_reward: float |
|
) -> Tuple[DecisionSteps, TerminalSteps]: |
|
m_vector_obs = self._make_obs(self.goal[name]) |
|
m_reward = np.array([reward], dtype=np.float32) |
|
m_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
|
m_group_id = np.array([0], dtype=np.int32) |
|
m_group_reward = np.array([group_reward], dtype=np.float32) |
|
action_mask = self._generate_mask() |
|
decision_step = DecisionSteps( |
|
m_vector_obs, m_reward, m_agent_id, action_mask, m_group_id, m_group_reward |
|
) |
|
terminal_step = TerminalSteps.empty(self.behavior_spec) |
|
if done: |
|
self.final_rewards[name].append(self.rewards[name]) |
|
self._reset_agent(name) |
|
new_vector_obs = self._make_obs(self.goal[name]) |
|
( |
|
new_reward, |
|
new_done, |
|
new_agent_id, |
|
new_action_mask, |
|
new_group_id, |
|
new_group_reward, |
|
) = self._construct_reset_step(name) |
|
|
|
decision_step = DecisionSteps( |
|
new_vector_obs, |
|
new_reward, |
|
new_agent_id, |
|
new_action_mask, |
|
new_group_id, |
|
new_group_reward, |
|
) |
|
terminal_step = TerminalSteps( |
|
m_vector_obs, |
|
m_reward, |
|
np.array([False], dtype=np.bool), |
|
m_agent_id, |
|
m_group_id, |
|
m_group_reward, |
|
) |
|
return (decision_step, terminal_step) |
|
|
|
def _construct_reset_step( |
|
self, name: str |
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
|
new_reward = np.array([0.0], dtype=np.float32) |
|
new_done = np.array([False], dtype=np.bool) |
|
new_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
|
new_action_mask = self._generate_mask() |
|
new_group_id = np.array([0], dtype=np.int32) |
|
new_group_reward = np.array([0.0], dtype=np.float32) |
|
return ( |
|
new_reward, |
|
new_done, |
|
new_agent_id, |
|
new_action_mask, |
|
new_group_id, |
|
new_group_reward, |
|
) |
|
|
|
def step(self) -> None: |
|
assert all(action is not None for action in self.action.values()) |
|
for name in self.names: |
|
|
|
done = self._take_action(name) |
|
reward = self._compute_reward(name, done) |
|
self.rewards[name] += reward |
|
self.step_result[name] = self._make_batched_step(name, done, reward, 0.0) |
|
|
|
def reset(self) -> None: |
|
for name in self.names: |
|
self._reset_agent(name) |
|
self.step_result[name] = self._make_batched_step(name, False, 0.0, 0.0) |
|
|
|
@property |
|
def reset_parameters(self) -> Dict[str, str]: |
|
return {} |
|
|
|
def close(self): |
|
pass |
|
|
|
|
|
class MemoryEnvironment(SimpleEnvironment): |
|
def __init__(self, brain_names, action_sizes=(1, 0), step_size=0.2): |
|
super().__init__(brain_names, action_sizes=action_sizes, step_size=step_size) |
|
|
|
|
|
self.num_show_steps = 2 |
|
|
|
def _make_batched_step( |
|
self, name: str, done: bool, reward: float, group_reward: float |
|
) -> Tuple[DecisionSteps, TerminalSteps]: |
|
recurrent_obs_val = ( |
|
self.goal[name] if self.step_count[name] <= self.num_show_steps else 0 |
|
) |
|
m_vector_obs = self._make_obs(recurrent_obs_val) |
|
m_reward = np.array([reward], dtype=np.float32) |
|
m_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
|
m_group_id = np.array([0], dtype=np.int32) |
|
m_group_reward = np.array([group_reward], dtype=np.float32) |
|
action_mask = self._generate_mask() |
|
decision_step = DecisionSteps( |
|
m_vector_obs, m_reward, m_agent_id, action_mask, m_group_id, m_group_reward |
|
) |
|
terminal_step = TerminalSteps.empty(self.behavior_spec) |
|
if done: |
|
self.final_rewards[name].append(self.rewards[name]) |
|
self._reset_agent(name) |
|
recurrent_obs_val = ( |
|
self.goal[name] if self.step_count[name] <= self.num_show_steps else 0 |
|
) |
|
new_vector_obs = self._make_obs(recurrent_obs_val) |
|
( |
|
new_reward, |
|
new_done, |
|
new_agent_id, |
|
new_action_mask, |
|
new_group_id, |
|
new_group_reward, |
|
) = self._construct_reset_step(name) |
|
decision_step = DecisionSteps( |
|
new_vector_obs, |
|
new_reward, |
|
new_agent_id, |
|
new_action_mask, |
|
new_group_id, |
|
new_group_reward, |
|
) |
|
terminal_step = TerminalSteps( |
|
m_vector_obs, |
|
m_reward, |
|
np.array([False], dtype=np.bool), |
|
m_agent_id, |
|
m_group_id, |
|
m_group_reward, |
|
) |
|
return (decision_step, terminal_step) |
|
|
|
|
|
class MultiAgentEnvironment(BaseEnv): |
|
""" |
|
The MultiAgentEnvironment maintains a list of SimpleEnvironment, one for each agent. |
|
When sending DecisionSteps and TerminalSteps to the trainers, it first batches the |
|
decision steps from the individual environments. When setting actions, it indexes the |
|
batched ActionTuple to obtain the ActionTuple for individual agents |
|
""" |
|
|
|
def __init__( |
|
self, |
|
brain_names, |
|
step_size=STEP_SIZE, |
|
num_visual=0, |
|
num_vector=1, |
|
num_var_len=0, |
|
vis_obs_size=VIS_OBS_SIZE, |
|
vec_obs_size=OBS_SIZE, |
|
var_len_obs_size=VAR_LEN_SIZE, |
|
action_sizes=(1, 0), |
|
num_agents=2, |
|
goal_indices=None, |
|
): |
|
super().__init__() |
|
self.envs = {} |
|
self.dones = {} |
|
self.just_died = set() |
|
self.names = brain_names |
|
self.final_rewards: Dict[str, List[float]] = {} |
|
for name in brain_names: |
|
self.final_rewards[name] = [] |
|
for i in range(num_agents): |
|
name_and_num = name + str(i) |
|
self.envs[name_and_num] = SimpleEnvironment( |
|
[name], |
|
step_size, |
|
num_visual, |
|
num_vector, |
|
num_var_len, |
|
vis_obs_size, |
|
vec_obs_size, |
|
var_len_obs_size, |
|
action_sizes, |
|
goal_indices, |
|
) |
|
self.dones[name_and_num] = False |
|
self.envs[name_and_num].reset() |
|
|
|
self.behavior_spec = self.envs[name_and_num].behavior_spec |
|
self.action_spec = self.envs[name_and_num].action_spec |
|
self.num_agents = num_agents |
|
|
|
@property |
|
def all_done(self): |
|
return all(self.dones.values()) |
|
|
|
@property |
|
def behavior_specs(self): |
|
behavior_dict = {} |
|
for n in self.names: |
|
behavior_dict[n] = self.behavior_spec |
|
return BehaviorMapping(behavior_dict) |
|
|
|
def set_action_for_agent(self, behavior_name, agent_id, action): |
|
pass |
|
|
|
def set_actions(self, behavior_name, action): |
|
|
|
|
|
|
|
|
|
j = 0 |
|
for i in range(self.num_agents): |
|
_act = ActionTuple() |
|
name_and_num = behavior_name + str(i) |
|
env = self.envs[name_and_num] |
|
if not self.dones[name_and_num]: |
|
if self.action_spec.continuous_size > 0: |
|
_act.add_continuous(action.continuous[j : j + 1]) |
|
if self.action_spec.discrete_size > 0: |
|
_disc_list = [action.discrete[j, :]] |
|
_act.add_discrete(np.array(_disc_list)) |
|
j += 1 |
|
env.action[behavior_name] = _act |
|
|
|
def get_steps(self, behavior_name): |
|
|
|
|
|
|
|
dec_vec_obs = [] |
|
dec_reward = [] |
|
dec_group_reward = [] |
|
dec_agent_id = [] |
|
dec_group_id = [] |
|
ter_vec_obs = [] |
|
ter_reward = [] |
|
ter_group_reward = [] |
|
ter_agent_id = [] |
|
ter_group_id = [] |
|
interrupted = [] |
|
|
|
action_mask = None |
|
terminal_step = TerminalSteps.empty(self.behavior_spec) |
|
decision_step = None |
|
for i in range(self.num_agents): |
|
name_and_num = behavior_name + str(i) |
|
env = self.envs[name_and_num] |
|
_dec, _term = env.step_result[behavior_name] |
|
if not self.dones[name_and_num]: |
|
dec_agent_id.append(i) |
|
dec_group_id.append(1) |
|
if len(dec_vec_obs) > 0: |
|
for j, obs in enumerate(_dec.obs): |
|
dec_vec_obs[j] = np.concatenate((dec_vec_obs[j], obs), axis=0) |
|
else: |
|
for obs in _dec.obs: |
|
dec_vec_obs.append(obs) |
|
dec_reward.append(_dec.reward[0]) |
|
dec_group_reward.append(_dec.group_reward[0]) |
|
if _dec.action_mask is not None: |
|
if action_mask is None: |
|
action_mask = [] |
|
if len(action_mask) > 0: |
|
action_mask[0] = np.concatenate( |
|
(action_mask[0], _dec.action_mask[0]), axis=0 |
|
) |
|
else: |
|
action_mask.append(_dec.action_mask[0]) |
|
if len(_term.reward) > 0 and name_and_num in self.just_died: |
|
ter_agent_id.append(i) |
|
ter_group_id.append(1) |
|
if len(ter_vec_obs) > 0: |
|
for j, obs in enumerate(_term.obs): |
|
ter_vec_obs[j] = np.concatenate((ter_vec_obs[j], obs), axis=0) |
|
else: |
|
for obs in _term.obs: |
|
ter_vec_obs.append(obs) |
|
ter_reward.append(_term.reward[0]) |
|
ter_group_reward.append(_term.group_reward[0]) |
|
interrupted.append(False) |
|
self.just_died.remove(name_and_num) |
|
decision_step = DecisionSteps( |
|
dec_vec_obs, |
|
dec_reward, |
|
dec_agent_id, |
|
action_mask, |
|
dec_group_id, |
|
dec_group_reward, |
|
) |
|
terminal_step = TerminalSteps( |
|
ter_vec_obs, |
|
ter_reward, |
|
interrupted, |
|
ter_agent_id, |
|
ter_group_id, |
|
ter_group_reward, |
|
) |
|
return (decision_step, terminal_step) |
|
|
|
def step(self) -> None: |
|
|
|
for name in self.names: |
|
for i in range(self.num_agents): |
|
name_and_num = name + str(i) |
|
|
|
if not self.dones[name_and_num]: |
|
env = self.envs[name_and_num] |
|
|
|
assert all(action is not None for action in env.action.values()) |
|
done = env._take_action(name) |
|
reward = env._compute_reward(name, done) |
|
self.dones[name_and_num] = done |
|
if done: |
|
self.just_died.add(name_and_num) |
|
if self.all_done: |
|
env.step_result[name] = env._make_batched_step( |
|
name, done, 0.0, reward |
|
) |
|
self.final_rewards[name].append(reward) |
|
self.reset() |
|
elif done: |
|
|
|
|
|
|
|
ceil_reward = min(-TIME_PENALTY, reward) |
|
env.step_result[name] = env._make_batched_step( |
|
name, done, ceil_reward, 0.0 |
|
) |
|
self.final_rewards[name].append(reward) |
|
|
|
else: |
|
env.step_result[name] = env._make_batched_step( |
|
name, done, reward, 0.0 |
|
) |
|
|
|
def reset(self) -> None: |
|
for name in self.names: |
|
for i in range(self.num_agents): |
|
name_and_num = name + str(i) |
|
self.dones[name_and_num] = False |
|
|
|
@property |
|
def reset_parameters(self) -> Dict[str, str]: |
|
return {} |
|
|
|
def close(self): |
|
pass |
|
|
|
|
|
class RecordEnvironment(SimpleEnvironment): |
|
def __init__( |
|
self, |
|
brain_names, |
|
step_size=0.2, |
|
num_visual=0, |
|
num_vector=1, |
|
action_sizes=(1, 0), |
|
n_demos=30, |
|
): |
|
super().__init__( |
|
brain_names, |
|
step_size=step_size, |
|
num_visual=num_visual, |
|
num_vector=num_vector, |
|
action_sizes=action_sizes, |
|
) |
|
self.demonstration_protos: Dict[str, List[AgentInfoActionPairProto]] = {} |
|
self.n_demos = n_demos |
|
for name in self.names: |
|
self.demonstration_protos[name] = [] |
|
|
|
def step(self) -> None: |
|
super().step() |
|
for name in self.names: |
|
discrete_actions = ( |
|
self.action[name].discrete |
|
if self.action_spec.discrete_size > 0 |
|
else None |
|
) |
|
continuous_actions = ( |
|
self.action[name].continuous |
|
if self.action_spec.continuous_size > 0 |
|
else None |
|
) |
|
self.demonstration_protos[name] += proto_from_steps_and_action( |
|
self.step_result[name][0], |
|
self.step_result[name][1], |
|
continuous_actions, |
|
discrete_actions, |
|
) |
|
self.demonstration_protos[name] = self.demonstration_protos[name][ |
|
-self.n_demos : |
|
] |
|
|
|
def solve(self) -> None: |
|
self.reset() |
|
for _ in range(self.n_demos): |
|
for name in self.names: |
|
if self.action_spec.discrete_size > 0: |
|
self.action[name] = ActionTuple( |
|
np.array([], dtype=np.float32), |
|
np.array( |
|
[[1]] if self.goal[name] > 0 else [[0]], dtype=np.int32 |
|
), |
|
) |
|
else: |
|
self.action[name] = ActionTuple( |
|
np.array([[float(self.goal[name])]], dtype=np.float32), |
|
np.array([], dtype=np.int32), |
|
) |
|
self.step() |
|
|
|
|
|
class UnexpectedExceptionEnvironment(SimpleEnvironment): |
|
def __init__(self, brain_names, use_discrete, to_raise): |
|
super().__init__(brain_names, use_discrete) |
|
self.to_raise = to_raise |
|
|
|
def step(self) -> None: |
|
raise self.to_raise() |
|
|