import random from typing import Dict, List, Any, Tuple import numpy as np from mlagents_envs.base_env import ( ActionSpec, ObservationSpec, ObservationType, ActionTuple, BaseEnv, BehaviorSpec, DecisionSteps, TerminalSteps, BehaviorMapping, ) from .test_rpc_utils import proto_from_steps_and_action from mlagents_envs.communicator_objects.agent_info_action_pair_pb2 import ( AgentInfoActionPairProto, ) from mlagents.trainers.tests.dummy_config import create_observation_specs_with_shapes OBS_SIZE = 1 VIS_OBS_SIZE = (20, 20, 3) VAR_LEN_SIZE = (10, 5) STEP_SIZE = 0.2 TIME_PENALTY = 0.01 MIN_STEPS = int(1.0 / STEP_SIZE) + 1 SUCCESS_REWARD = 1.0 + MIN_STEPS * TIME_PENALTY def clamp(x, min_val, max_val): return max(min_val, min(x, max_val)) class SimpleEnvironment(BaseEnv): """ Very simple "game" - the agent has a position on [-1, 1], gets a reward of 1 if it reaches 1, and a reward of -1 if it reaches -1. The position is incremented by the action amount (clamped to [-step_size, step_size]). """ def __init__( self, brain_names, step_size=STEP_SIZE, num_visual=0, num_vector=1, num_var_len=0, vis_obs_size=VIS_OBS_SIZE, vec_obs_size=OBS_SIZE, var_len_obs_size=VAR_LEN_SIZE, action_sizes=(1, 0), goal_indices=None, ): super().__init__() self.num_visual = num_visual self.num_vector = num_vector self.num_var_len = num_var_len self.vis_obs_size = vis_obs_size self.vec_obs_size = vec_obs_size self.var_len_obs_size = var_len_obs_size self.goal_indices = goal_indices continuous_action_size, discrete_action_size = action_sizes discrete_tuple = tuple(2 for _ in range(discrete_action_size)) action_spec = ActionSpec(continuous_action_size, discrete_tuple) self.total_action_size = ( continuous_action_size + discrete_action_size ) # to set the goals/positions self.action_spec = action_spec self.behavior_spec = BehaviorSpec(self._make_observation_specs(), action_spec) self.action_spec = action_spec self.names = brain_names self.positions: Dict[str, List[float]] = {} self.step_count: Dict[str, float] = {} # Concatenate the arguments for a consistent random seed seed = ( brain_names, step_size, num_visual, num_vector, num_var_len, vis_obs_size, vec_obs_size, var_len_obs_size, action_sizes, ) self.random = random.Random(str(seed)) self.goal: Dict[str, int] = {} self.action = {} self.rewards: Dict[str, float] = {} self.final_rewards: Dict[str, List[float]] = {} self.step_result: Dict[str, Tuple[DecisionSteps, TerminalSteps]] = {} self.agent_id: Dict[str, int] = {} self.step_size = step_size # defines the difficulty of the test # Allow to be used as a UnityEnvironment during tests self.academy_capabilities = None for name in self.names: self.agent_id[name] = 0 self.goal[name] = self.random.choice([-1, 1]) self.rewards[name] = 0 self.final_rewards[name] = [] self._reset_agent(name) self.action[name] = None self.step_result[name] = None def _make_observation_specs(self) -> List[ObservationSpec]: obs_shape: List[Any] = [] for _ in range(self.num_vector): obs_shape.append((self.vec_obs_size,)) for _ in range(self.num_visual): obs_shape.append(self.vis_obs_size) for _ in range(self.num_var_len): obs_shape.append(self.var_len_obs_size) obs_spec = create_observation_specs_with_shapes(obs_shape) if self.goal_indices is not None: for i in range(len(obs_spec)): if i in self.goal_indices: obs_spec[i] = ObservationSpec( shape=obs_spec[i].shape, dimension_property=obs_spec[i].dimension_property, observation_type=ObservationType.GOAL_SIGNAL, name=obs_spec[i].name, ) return obs_spec def _make_obs(self, value: float) -> List[np.ndarray]: obs = [] for _ in range(self.num_vector): obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * value) for _ in range(self.num_visual): obs.append(np.ones((1,) + self.vis_obs_size, dtype=np.float32) * value) for _ in range(self.num_var_len): obs.append(np.ones((1,) + self.var_len_obs_size, dtype=np.float32) * value) return obs @property def behavior_specs(self): behavior_dict = {} for n in self.names: behavior_dict[n] = self.behavior_spec return BehaviorMapping(behavior_dict) def set_action_for_agent(self, behavior_name, agent_id, action): pass def set_actions(self, behavior_name, action): self.action[behavior_name] = action def get_steps(self, behavior_name): return self.step_result[behavior_name] def _take_action(self, name: str) -> bool: deltas = [] _act = self.action[name] if self.action_spec.continuous_size > 0: for _cont in _act.continuous[0]: deltas.append(_cont) if self.action_spec.discrete_size > 0: for _disc in _act.discrete[0]: deltas.append(1 if _disc else -1) for i, _delta in enumerate(deltas): _delta = clamp(_delta, -self.step_size, self.step_size) self.positions[name][i] += _delta self.positions[name][i] = clamp(self.positions[name][i], -1, 1) self.step_count[name] += 1 # Both must be in 1.0 to be done done = all(pos >= 1.0 or pos <= -1.0 for pos in self.positions[name]) return done def _generate_mask(self): action_mask = None if self.action_spec.discrete_size > 0: # LL-Python API will return an empty dim if there is only 1 agent. ndmask = np.array( 2 * self.action_spec.discrete_size * [False], dtype=np.bool ) ndmask = np.expand_dims(ndmask, axis=0) action_mask = [ndmask] return action_mask def _compute_reward(self, name: str, done: bool) -> float: if done: reward = 0.0 for _pos in self.positions[name]: reward += (SUCCESS_REWARD * _pos * self.goal[name]) / len( self.positions[name] ) else: reward = -TIME_PENALTY return reward def _reset_agent(self, name): self.goal[name] = self.random.choice([-1, 1]) self.positions[name] = [0.0 for _ in range(self.total_action_size)] self.step_count[name] = 0 self.rewards[name] = 0 self.agent_id[name] = self.agent_id[name] + 1 def _make_batched_step( self, name: str, done: bool, reward: float, group_reward: float ) -> Tuple[DecisionSteps, TerminalSteps]: m_vector_obs = self._make_obs(self.goal[name]) m_reward = np.array([reward], dtype=np.float32) m_agent_id = np.array([self.agent_id[name]], dtype=np.int32) m_group_id = np.array([0], dtype=np.int32) m_group_reward = np.array([group_reward], dtype=np.float32) action_mask = self._generate_mask() decision_step = DecisionSteps( m_vector_obs, m_reward, m_agent_id, action_mask, m_group_id, m_group_reward ) terminal_step = TerminalSteps.empty(self.behavior_spec) if done: self.final_rewards[name].append(self.rewards[name]) self._reset_agent(name) new_vector_obs = self._make_obs(self.goal[name]) ( new_reward, new_done, new_agent_id, new_action_mask, new_group_id, new_group_reward, ) = self._construct_reset_step(name) decision_step = DecisionSteps( new_vector_obs, new_reward, new_agent_id, new_action_mask, new_group_id, new_group_reward, ) terminal_step = TerminalSteps( m_vector_obs, m_reward, np.array([False], dtype=np.bool), m_agent_id, m_group_id, m_group_reward, ) return (decision_step, terminal_step) def _construct_reset_step( self, name: str ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: new_reward = np.array([0.0], dtype=np.float32) new_done = np.array([False], dtype=np.bool) new_agent_id = np.array([self.agent_id[name]], dtype=np.int32) new_action_mask = self._generate_mask() new_group_id = np.array([0], dtype=np.int32) new_group_reward = np.array([0.0], dtype=np.float32) return ( new_reward, new_done, new_agent_id, new_action_mask, new_group_id, new_group_reward, ) def step(self) -> None: assert all(action is not None for action in self.action.values()) for name in self.names: done = self._take_action(name) reward = self._compute_reward(name, done) self.rewards[name] += reward self.step_result[name] = self._make_batched_step(name, done, reward, 0.0) def reset(self) -> None: # type: ignore for name in self.names: self._reset_agent(name) self.step_result[name] = self._make_batched_step(name, False, 0.0, 0.0) @property def reset_parameters(self) -> Dict[str, str]: return {} def close(self): pass class MemoryEnvironment(SimpleEnvironment): def __init__(self, brain_names, action_sizes=(1, 0), step_size=0.2): super().__init__(brain_names, action_sizes=action_sizes, step_size=step_size) # Number of steps to reveal the goal for. Lower is harder. Should be # less than 1/step_size to force agent to use memory self.num_show_steps = 2 def _make_batched_step( self, name: str, done: bool, reward: float, group_reward: float ) -> Tuple[DecisionSteps, TerminalSteps]: recurrent_obs_val = ( self.goal[name] if self.step_count[name] <= self.num_show_steps else 0 ) m_vector_obs = self._make_obs(recurrent_obs_val) m_reward = np.array([reward], dtype=np.float32) m_agent_id = np.array([self.agent_id[name]], dtype=np.int32) m_group_id = np.array([0], dtype=np.int32) m_group_reward = np.array([group_reward], dtype=np.float32) action_mask = self._generate_mask() decision_step = DecisionSteps( m_vector_obs, m_reward, m_agent_id, action_mask, m_group_id, m_group_reward ) terminal_step = TerminalSteps.empty(self.behavior_spec) if done: self.final_rewards[name].append(self.rewards[name]) self._reset_agent(name) recurrent_obs_val = ( self.goal[name] if self.step_count[name] <= self.num_show_steps else 0 ) new_vector_obs = self._make_obs(recurrent_obs_val) ( new_reward, new_done, new_agent_id, new_action_mask, new_group_id, new_group_reward, ) = self._construct_reset_step(name) decision_step = DecisionSteps( new_vector_obs, new_reward, new_agent_id, new_action_mask, new_group_id, new_group_reward, ) terminal_step = TerminalSteps( m_vector_obs, m_reward, np.array([False], dtype=np.bool), m_agent_id, m_group_id, m_group_reward, ) return (decision_step, terminal_step) class MultiAgentEnvironment(BaseEnv): """ The MultiAgentEnvironment maintains a list of SimpleEnvironment, one for each agent. When sending DecisionSteps and TerminalSteps to the trainers, it first batches the decision steps from the individual environments. When setting actions, it indexes the batched ActionTuple to obtain the ActionTuple for individual agents """ def __init__( self, brain_names, step_size=STEP_SIZE, num_visual=0, num_vector=1, num_var_len=0, vis_obs_size=VIS_OBS_SIZE, vec_obs_size=OBS_SIZE, var_len_obs_size=VAR_LEN_SIZE, action_sizes=(1, 0), num_agents=2, goal_indices=None, ): super().__init__() self.envs = {} self.dones = {} self.just_died = set() self.names = brain_names self.final_rewards: Dict[str, List[float]] = {} for name in brain_names: self.final_rewards[name] = [] for i in range(num_agents): name_and_num = name + str(i) self.envs[name_and_num] = SimpleEnvironment( [name], step_size, num_visual, num_vector, num_var_len, vis_obs_size, vec_obs_size, var_len_obs_size, action_sizes, goal_indices, ) self.dones[name_and_num] = False self.envs[name_and_num].reset() # All envs have the same behavior spec, so just get the last one. self.behavior_spec = self.envs[name_and_num].behavior_spec self.action_spec = self.envs[name_and_num].action_spec self.num_agents = num_agents @property def all_done(self): return all(self.dones.values()) @property def behavior_specs(self): behavior_dict = {} for n in self.names: behavior_dict[n] = self.behavior_spec return BehaviorMapping(behavior_dict) def set_action_for_agent(self, behavior_name, agent_id, action): pass def set_actions(self, behavior_name, action): # The ActionTuple contains the actions for all n_agents. This # slices the ActionTuple into an action tuple for each environment # and sets it. The index j is used to ignore agents that have already # reached done. j = 0 for i in range(self.num_agents): _act = ActionTuple() name_and_num = behavior_name + str(i) env = self.envs[name_and_num] if not self.dones[name_and_num]: if self.action_spec.continuous_size > 0: _act.add_continuous(action.continuous[j : j + 1]) if self.action_spec.discrete_size > 0: _disc_list = [action.discrete[j, :]] _act.add_discrete(np.array(_disc_list)) j += 1 env.action[behavior_name] = _act def get_steps(self, behavior_name): # This gets the individual DecisionSteps and TerminalSteps # from the envs and merges them into a batch to be sent # to the AgentProcessor. dec_vec_obs = [] dec_reward = [] dec_group_reward = [] dec_agent_id = [] dec_group_id = [] ter_vec_obs = [] ter_reward = [] ter_group_reward = [] ter_agent_id = [] ter_group_id = [] interrupted = [] action_mask = None terminal_step = TerminalSteps.empty(self.behavior_spec) decision_step = None for i in range(self.num_agents): name_and_num = behavior_name + str(i) env = self.envs[name_and_num] _dec, _term = env.step_result[behavior_name] if not self.dones[name_and_num]: dec_agent_id.append(i) dec_group_id.append(1) if len(dec_vec_obs) > 0: for j, obs in enumerate(_dec.obs): dec_vec_obs[j] = np.concatenate((dec_vec_obs[j], obs), axis=0) else: for obs in _dec.obs: dec_vec_obs.append(obs) dec_reward.append(_dec.reward[0]) dec_group_reward.append(_dec.group_reward[0]) if _dec.action_mask is not None: if action_mask is None: action_mask = [] if len(action_mask) > 0: action_mask[0] = np.concatenate( (action_mask[0], _dec.action_mask[0]), axis=0 ) else: action_mask.append(_dec.action_mask[0]) if len(_term.reward) > 0 and name_and_num in self.just_died: ter_agent_id.append(i) ter_group_id.append(1) if len(ter_vec_obs) > 0: for j, obs in enumerate(_term.obs): ter_vec_obs[j] = np.concatenate((ter_vec_obs[j], obs), axis=0) else: for obs in _term.obs: ter_vec_obs.append(obs) ter_reward.append(_term.reward[0]) ter_group_reward.append(_term.group_reward[0]) interrupted.append(False) self.just_died.remove(name_and_num) decision_step = DecisionSteps( dec_vec_obs, dec_reward, dec_agent_id, action_mask, dec_group_id, dec_group_reward, ) terminal_step = TerminalSteps( ter_vec_obs, ter_reward, interrupted, ter_agent_id, ter_group_id, ter_group_reward, ) return (decision_step, terminal_step) def step(self) -> None: # Steps all environments and calls reset if all agents are done. for name in self.names: for i in range(self.num_agents): name_and_num = name + str(i) # Does not step the env if done if not self.dones[name_and_num]: env = self.envs[name_and_num] # Reproducing part of env step to intercept Dones assert all(action is not None for action in env.action.values()) done = env._take_action(name) reward = env._compute_reward(name, done) self.dones[name_and_num] = done if done: self.just_died.add(name_and_num) if self.all_done: env.step_result[name] = env._make_batched_step( name, done, 0.0, reward ) self.final_rewards[name].append(reward) self.reset() elif done: # This agent has finished but others are still running. # This gives a reward of the time penalty if this agent # is successful and the negative env reward if it fails. ceil_reward = min(-TIME_PENALTY, reward) env.step_result[name] = env._make_batched_step( name, done, ceil_reward, 0.0 ) self.final_rewards[name].append(reward) else: env.step_result[name] = env._make_batched_step( name, done, reward, 0.0 ) def reset(self) -> None: # type: ignore for name in self.names: for i in range(self.num_agents): name_and_num = name + str(i) self.dones[name_and_num] = False @property def reset_parameters(self) -> Dict[str, str]: return {} def close(self): pass class RecordEnvironment(SimpleEnvironment): def __init__( self, brain_names, step_size=0.2, num_visual=0, num_vector=1, action_sizes=(1, 0), n_demos=30, ): super().__init__( brain_names, step_size=step_size, num_visual=num_visual, num_vector=num_vector, action_sizes=action_sizes, ) self.demonstration_protos: Dict[str, List[AgentInfoActionPairProto]] = {} self.n_demos = n_demos for name in self.names: self.demonstration_protos[name] = [] def step(self) -> None: super().step() for name in self.names: discrete_actions = ( self.action[name].discrete if self.action_spec.discrete_size > 0 else None ) continuous_actions = ( self.action[name].continuous if self.action_spec.continuous_size > 0 else None ) self.demonstration_protos[name] += proto_from_steps_and_action( self.step_result[name][0], self.step_result[name][1], continuous_actions, discrete_actions, ) self.demonstration_protos[name] = self.demonstration_protos[name][ -self.n_demos : ] def solve(self) -> None: self.reset() for _ in range(self.n_demos): for name in self.names: if self.action_spec.discrete_size > 0: self.action[name] = ActionTuple( np.array([], dtype=np.float32), np.array( [[1]] if self.goal[name] > 0 else [[0]], dtype=np.int32 ), ) else: self.action[name] = ActionTuple( np.array([[float(self.goal[name])]], dtype=np.float32), np.array([], dtype=np.int32), ) self.step() class UnexpectedExceptionEnvironment(SimpleEnvironment): def __init__(self, brain_names, use_discrete, to_raise): super().__init__(brain_names, use_discrete) self.to_raise = to_raise def step(self) -> None: raise self.to_raise()