File size: 7,946 Bytes
05c9ac2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
from typing import List, Optional, Tuple
import numpy as np

from mlagents.trainers.buffer import AgentBuffer, AgentBufferKey
from mlagents.trainers.torch_entities.action_log_probs import LogProbsTuple
from mlagents.trainers.trajectory import AgentStatus, Trajectory, AgentExperience
from mlagents_envs.base_env import (
    DecisionSteps,
    TerminalSteps,
    ObservationSpec,
    BehaviorSpec,
    ActionSpec,
    ActionTuple,
)
from mlagents.trainers.tests.dummy_config import create_observation_specs_with_shapes


def create_mock_steps(
    num_agents: int,
    observation_specs: List[ObservationSpec],
    action_spec: ActionSpec,
    done: bool = False,
    grouped: bool = False,
    agent_ids: Optional[List[int]] = None,
) -> Tuple[DecisionSteps, TerminalSteps]:
    """
    Creates a mock Tuple[DecisionSteps, TerminalSteps] with observations.
    Imitates constant vector/visual observations, rewards, dones, and agents.

    :int num_agents: Number of "agents" to imitate.
    :List observation_specs: A List of the observation specs in your steps
    :int action_spec: ActionSpec for the agent
    :bool done: Whether all the agents in the batch are done
    """
    obs_list = []
    for obs_spec in observation_specs:
        obs_list.append(np.ones((num_agents,) + obs_spec.shape, dtype=np.float32))
    action_mask = None
    if action_spec.is_discrete():
        action_mask = [
            np.array(num_agents * [action_size * [False]])
            for action_size in action_spec.discrete_branches  # type: ignore
        ]  # type: ignore

    reward = np.array(num_agents * [1.0], dtype=np.float32)
    interrupted = np.array(num_agents * [False], dtype=np.bool)
    if agent_ids is not None:
        agent_id = np.array(agent_ids, dtype=np.int32)
    else:
        agent_id = np.arange(num_agents, dtype=np.int32)
    _gid = 1 if grouped else 0
    group_id = np.array(num_agents * [_gid], dtype=np.int32)
    group_reward = np.array(num_agents * [0.0], dtype=np.float32)
    behavior_spec = BehaviorSpec(observation_specs, action_spec)
    if done:
        return (
            DecisionSteps.empty(behavior_spec),
            TerminalSteps(
                obs_list, reward, interrupted, agent_id, group_id, group_reward
            ),
        )
    else:
        return (
            DecisionSteps(
                obs_list, reward, agent_id, action_mask, group_id, group_reward
            ),
            TerminalSteps.empty(behavior_spec),
        )


def create_steps_from_behavior_spec(
    behavior_spec: BehaviorSpec, num_agents: int = 1
) -> Tuple[DecisionSteps, TerminalSteps]:
    return create_mock_steps(
        num_agents=num_agents,
        observation_specs=behavior_spec.observation_specs,
        action_spec=behavior_spec.action_spec,
    )


def make_fake_trajectory(
    length: int,
    observation_specs: List[ObservationSpec],
    action_spec: ActionSpec,
    max_step_complete: bool = False,
    memory_size: int = 10,
    num_other_agents_in_group: int = 0,
    group_reward: float = 0.0,
    is_terminal: bool = True,
    team_id: int = 0,
) -> Trajectory:
    """
    Makes a fake trajectory of length length. If max_step_complete,
    the trajectory is terminated by a max step rather than a done.
    """
    steps_list = []

    action_size = action_spec.discrete_size + action_spec.continuous_size
    for _i in range(length - 1):
        obs = []
        for obs_spec in observation_specs:
            obs.append(np.ones(obs_spec.shape, dtype=np.float32))
        reward = 1.0
        done = False
        action = ActionTuple(
            continuous=np.zeros(action_spec.continuous_size, dtype=np.float32),
            discrete=np.zeros(action_spec.discrete_size, dtype=np.int32),
        )
        action_probs = LogProbsTuple(
            continuous=np.ones(action_spec.continuous_size, dtype=np.float32),
            discrete=np.ones(action_spec.discrete_size, dtype=np.float32),
        )
        action_mask = (
            [
                [False for _ in range(branch)]
                for branch in action_spec.discrete_branches
            ]  # type: ignore
            if action_spec.is_discrete()
            else None
        )
        if action_spec.is_discrete():
            prev_action = np.ones(action_size, dtype=np.int32)
        else:
            prev_action = np.ones(action_size, dtype=np.float32)

        max_step = False
        memory = np.ones(memory_size, dtype=np.float32)
        agent_id = "test_agent"
        behavior_id = "test_brain?team=" + str(team_id)
        group_status = []
        for _ in range(num_other_agents_in_group):
            group_status.append(AgentStatus(obs, reward, action, done))
        experience = AgentExperience(
            obs=obs,
            reward=reward,
            done=done,
            action=action,
            action_probs=action_probs,
            action_mask=action_mask,
            prev_action=prev_action,
            interrupted=max_step,
            memory=memory,
            group_status=group_status,
            group_reward=group_reward,
        )
        steps_list.append(experience)
    obs = []
    for obs_spec in observation_specs:
        obs.append(np.ones(obs_spec.shape, dtype=np.float32))
    last_group_status = []
    for _ in range(num_other_agents_in_group):
        last_group_status.append(
            AgentStatus(obs, reward, action, not max_step_complete and is_terminal)
        )
    last_experience = AgentExperience(
        obs=obs,
        reward=reward,
        done=not max_step_complete and is_terminal,
        action=action,
        action_probs=action_probs,
        action_mask=action_mask,
        prev_action=prev_action,
        interrupted=max_step_complete,
        memory=memory,
        group_status=last_group_status,
        group_reward=group_reward,
    )
    steps_list.append(last_experience)
    return Trajectory(
        steps=steps_list,
        agent_id=agent_id,
        behavior_id=behavior_id,
        next_obs=obs,
        next_group_obs=[obs] * num_other_agents_in_group,
    )


def copy_buffer_fields(
    buffer: AgentBuffer, src_key: AgentBufferKey, dst_keys: List[AgentBufferKey]
) -> None:
    for dst_key in dst_keys:
        buffer[dst_key] = buffer[src_key]


def simulate_rollout(
    length: int,
    behavior_spec: BehaviorSpec,
    memory_size: int = 10,
    exclude_key_list: List[str] = None,
    num_other_agents_in_group: int = 0,
) -> AgentBuffer:
    trajectory = make_fake_trajectory(
        length,
        behavior_spec.observation_specs,
        action_spec=behavior_spec.action_spec,
        memory_size=memory_size,
        num_other_agents_in_group=num_other_agents_in_group,
    )
    buffer = trajectory.to_agentbuffer()
    # If a key_list was given, remove those keys
    if exclude_key_list:
        for key in exclude_key_list:
            if key in buffer:
                buffer.pop(key)
    return buffer


def setup_test_behavior_specs(
    use_discrete=True, use_visual=False, vector_action_space=2, vector_obs_space=8
):
    if use_discrete:
        action_spec = ActionSpec.create_discrete(tuple(vector_action_space))
    else:
        action_spec = ActionSpec.create_continuous(vector_action_space)
    observation_shapes = [(84, 84, 3)] * int(use_visual) + [(vector_obs_space,)]
    obs_spec = create_observation_specs_with_shapes(observation_shapes)
    behavior_spec = BehaviorSpec(obs_spec, action_spec)
    return behavior_spec


def create_mock_3dball_behavior_specs():
    return setup_test_behavior_specs(
        False, False, vector_action_space=2, vector_obs_space=8
    )


def create_mock_pushblock_behavior_specs():
    return setup_test_behavior_specs(
        True, False, vector_action_space=7, vector_obs_space=70
    )


def create_mock_banana_behavior_specs():
    return setup_test_behavior_specs(
        True, True, vector_action_space=[3, 3, 3, 2], vector_obs_space=0
    )