|
from unittest import mock |
|
import pytest |
|
from typing import List |
|
import mlagents.trainers.tests.mock_brain as mb |
|
import numpy as np |
|
from mlagents.trainers.agent_processor import ( |
|
AgentProcessor, |
|
AgentManager, |
|
AgentManagerQueue, |
|
) |
|
from mlagents.trainers.action_info import ActionInfo |
|
from mlagents.trainers.torch_entities.action_log_probs import LogProbsTuple |
|
from mlagents.trainers.trajectory import Trajectory |
|
from mlagents.trainers.stats import StatsReporter, StatsSummary |
|
from mlagents.trainers.behavior_id_utils import get_global_agent_id |
|
from mlagents_envs.side_channel.stats_side_channel import StatsAggregationMethod |
|
from mlagents.trainers.tests.dummy_config import create_observation_specs_with_shapes |
|
from mlagents_envs.base_env import ActionSpec, ActionTuple |
|
|
|
|
|
def create_mock_policy(): |
|
mock_policy = mock.Mock() |
|
mock_policy.reward_signals = {} |
|
mock_policy.retrieve_previous_memories.return_value = np.zeros( |
|
(1, 1), dtype=np.float32 |
|
) |
|
mock_policy.retrieve_previous_action.return_value = np.zeros((1, 1), dtype=np.int32) |
|
return mock_policy |
|
|
|
|
|
def _create_action_info(num_agents: int, agent_ids: List[str]) -> ActionInfo: |
|
fake_action_outputs = { |
|
"action": ActionTuple( |
|
continuous=np.array([[0.1]] * num_agents, dtype=np.float32) |
|
), |
|
"entropy": np.array([1.0], dtype=np.float32), |
|
"learning_rate": 1.0, |
|
"log_probs": LogProbsTuple( |
|
continuous=np.array([[0.1]] * num_agents, dtype=np.float32) |
|
), |
|
} |
|
fake_action_info = ActionInfo( |
|
action=ActionTuple(continuous=np.array([[0.1]] * num_agents, dtype=np.float32)), |
|
env_action=ActionTuple( |
|
continuous=np.array([[0.1]] * num_agents, dtype=np.float32) |
|
), |
|
outputs=fake_action_outputs, |
|
agent_ids=agent_ids, |
|
) |
|
return fake_action_info |
|
|
|
|
|
@pytest.mark.parametrize("num_vis_obs", [0, 1, 2], ids=["vec", "1 viz", "2 viz"]) |
|
def test_agentprocessor(num_vis_obs): |
|
policy = create_mock_policy() |
|
tqueue = mock.Mock() |
|
name_behavior_id = "test_brain_name" |
|
processor = AgentProcessor( |
|
policy, |
|
name_behavior_id, |
|
max_trajectory_length=5, |
|
stats_reporter=StatsReporter("testcat"), |
|
) |
|
|
|
mock_decision_steps, mock_terminal_steps = mb.create_mock_steps( |
|
num_agents=2, |
|
observation_specs=create_observation_specs_with_shapes( |
|
[(8,)] + num_vis_obs * [(84, 84, 3)] |
|
), |
|
action_spec=ActionSpec.create_continuous(2), |
|
) |
|
fake_action_info = _create_action_info(2, mock_decision_steps.agent_id) |
|
processor.publish_trajectory_queue(tqueue) |
|
|
|
processor.add_experiences( |
|
mock_decision_steps, mock_terminal_steps, 0, ActionInfo.empty() |
|
) |
|
for _ in range(5): |
|
processor.add_experiences( |
|
mock_decision_steps, mock_terminal_steps, 0, fake_action_info |
|
) |
|
|
|
|
|
assert len(tqueue.put.call_args_list) == 2 |
|
|
|
|
|
trajectory = tqueue.put.call_args_list[0][0][0] |
|
assert len(trajectory.steps) == 5 |
|
|
|
for step in trajectory.steps: |
|
assert len(step.group_status) == 0 |
|
|
|
|
|
assert len(processor._experience_buffers[0]) == 0 |
|
|
|
|
|
mock_decision_steps, mock_terminal_steps = mb.create_mock_steps( |
|
num_agents=0, |
|
observation_specs=create_observation_specs_with_shapes( |
|
[(8,)] + num_vis_obs * [(84, 84, 3)] |
|
), |
|
action_spec=ActionSpec.create_continuous(2), |
|
) |
|
processor.add_experiences( |
|
mock_decision_steps, mock_terminal_steps, 0, ActionInfo.empty() |
|
) |
|
|
|
assert len(processor._experience_buffers[0]) == 0 |
|
|
|
|
|
def test_group_statuses(): |
|
policy = create_mock_policy() |
|
tqueue = mock.Mock() |
|
name_behavior_id = "test_brain_name" |
|
processor = AgentProcessor( |
|
policy, |
|
name_behavior_id, |
|
max_trajectory_length=5, |
|
stats_reporter=StatsReporter("testcat"), |
|
) |
|
|
|
mock_decision_steps, mock_terminal_steps = mb.create_mock_steps( |
|
num_agents=4, |
|
observation_specs=create_observation_specs_with_shapes([(8,)]), |
|
action_spec=ActionSpec.create_continuous(2), |
|
grouped=True, |
|
) |
|
fake_action_info = _create_action_info(4, mock_decision_steps.agent_id) |
|
processor.publish_trajectory_queue(tqueue) |
|
|
|
processor.add_experiences( |
|
mock_decision_steps, mock_terminal_steps, 0, ActionInfo.empty() |
|
) |
|
for _ in range(2): |
|
processor.add_experiences( |
|
mock_decision_steps, mock_terminal_steps, 0, fake_action_info |
|
) |
|
|
|
|
|
_, mock_terminal_steps_2 = mb.create_mock_steps( |
|
num_agents=2, |
|
observation_specs=create_observation_specs_with_shapes([(8,)]), |
|
action_spec=ActionSpec.create_continuous(2), |
|
done=True, |
|
grouped=True, |
|
agent_ids=[2, 3], |
|
) |
|
|
|
mock_decision_steps_2, _ = mb.create_mock_steps( |
|
num_agents=2, |
|
observation_specs=create_observation_specs_with_shapes([(8,)]), |
|
action_spec=ActionSpec.create_continuous(2), |
|
done=False, |
|
grouped=True, |
|
agent_ids=[0, 1], |
|
) |
|
|
|
processor.add_experiences( |
|
mock_decision_steps_2, mock_terminal_steps_2, 0, fake_action_info |
|
) |
|
|
|
fake_action_info = _create_action_info(4, mock_decision_steps_2.agent_id) |
|
for _ in range(3): |
|
processor.add_experiences( |
|
mock_decision_steps_2, mock_terminal_steps, 0, fake_action_info |
|
) |
|
|
|
|
|
assert len(tqueue.put.call_args_list) == 4 |
|
|
|
|
|
trajectory = tqueue.put.call_args_list[0][0][-1] |
|
assert len(trajectory.steps) == 3 |
|
|
|
|
|
for step in trajectory.steps: |
|
assert len(step.group_status) == 3 |
|
|
|
|
|
trajectory = tqueue.put.call_args_list[-1][0][-1] |
|
assert len(trajectory.steps) == 5 |
|
|
|
|
|
|
|
for step in trajectory.steps[0:3]: |
|
assert len(step.group_status) == 3 |
|
|
|
for step in trajectory.steps[3:]: |
|
assert len(step.group_status) == 1 |
|
|
|
|
|
def test_agent_deletion(): |
|
policy = create_mock_policy() |
|
tqueue = mock.Mock() |
|
name_behavior_id = "test_brain_name" |
|
processor = AgentProcessor( |
|
policy, |
|
name_behavior_id, |
|
max_trajectory_length=5, |
|
stats_reporter=StatsReporter("testcat"), |
|
) |
|
fake_action_outputs = { |
|
"action": ActionTuple(continuous=np.array([[0.1]], dtype=np.float32)), |
|
"entropy": np.array([1.0], dtype=np.float32), |
|
"learning_rate": 1.0, |
|
"log_probs": LogProbsTuple(continuous=np.array([[0.1]], dtype=np.float32)), |
|
} |
|
|
|
mock_decision_step, mock_terminal_step = mb.create_mock_steps( |
|
num_agents=1, |
|
observation_specs=create_observation_specs_with_shapes([(8,)]), |
|
action_spec=ActionSpec.create_continuous(2), |
|
) |
|
mock_done_decision_step, mock_done_terminal_step = mb.create_mock_steps( |
|
num_agents=1, |
|
observation_specs=create_observation_specs_with_shapes([(8,)]), |
|
action_spec=ActionSpec.create_continuous(2), |
|
done=True, |
|
) |
|
fake_action_info = ActionInfo( |
|
action=ActionTuple(continuous=np.array([[0.1]], dtype=np.float32)), |
|
env_action=ActionTuple(continuous=np.array([[0.1]], dtype=np.float32)), |
|
outputs=fake_action_outputs, |
|
agent_ids=mock_decision_step.agent_id, |
|
) |
|
|
|
processor.publish_trajectory_queue(tqueue) |
|
|
|
processor.add_experiences( |
|
mock_decision_step, mock_terminal_step, 0, ActionInfo.empty() |
|
) |
|
|
|
|
|
add_calls = [] |
|
remove_calls = [] |
|
for _ep in range(3): |
|
for _ in range(5): |
|
processor.add_experiences( |
|
mock_decision_step, mock_terminal_step, _ep, fake_action_info |
|
) |
|
add_calls.append( |
|
mock.call([get_global_agent_id(_ep, 0)], fake_action_outputs["action"]) |
|
) |
|
processor.add_experiences( |
|
mock_done_decision_step, mock_done_terminal_step, _ep, fake_action_info |
|
) |
|
|
|
remove_calls.append(mock.call([get_global_agent_id(_ep, 0)])) |
|
|
|
policy.save_previous_action.assert_has_calls(add_calls) |
|
policy.remove_previous_action.assert_has_calls(remove_calls) |
|
|
|
assert len(processor._experience_buffers.keys()) == 0 |
|
assert len(processor._last_take_action_outputs.keys()) == 0 |
|
assert len(processor._episode_steps.keys()) == 0 |
|
assert len(processor._episode_rewards.keys()) == 0 |
|
assert len(processor._last_step_result.keys()) == 0 |
|
|
|
|
|
processor.add_experiences( |
|
mock_done_decision_step, mock_done_terminal_step, 0, ActionInfo.empty() |
|
) |
|
assert len(processor._experience_buffers.keys()) == 0 |
|
assert len(processor._last_take_action_outputs.keys()) == 0 |
|
assert len(processor._episode_steps.keys()) == 0 |
|
assert len(processor._episode_rewards.keys()) == 0 |
|
assert len(processor._last_step_result.keys()) == 0 |
|
|
|
|
|
def test_end_episode(): |
|
policy = create_mock_policy() |
|
tqueue = mock.Mock() |
|
name_behavior_id = "test_brain_name" |
|
processor = AgentProcessor( |
|
policy, |
|
name_behavior_id, |
|
max_trajectory_length=5, |
|
stats_reporter=StatsReporter("testcat"), |
|
) |
|
fake_action_outputs = { |
|
"action": ActionTuple(continuous=np.array([[0.1]], dtype=np.float32)), |
|
"entropy": np.array([1.0], dtype=np.float32), |
|
"learning_rate": 1.0, |
|
"log_probs": LogProbsTuple(continuous=np.array([[0.1]], dtype=np.float32)), |
|
} |
|
|
|
mock_decision_step, mock_terminal_step = mb.create_mock_steps( |
|
num_agents=1, |
|
observation_specs=create_observation_specs_with_shapes([(8,)]), |
|
action_spec=ActionSpec.create_continuous(2), |
|
) |
|
fake_action_info = ActionInfo( |
|
action=ActionTuple(continuous=np.array([[0.1]], dtype=np.float32)), |
|
env_action=ActionTuple(continuous=np.array([[0.1]], dtype=np.float32)), |
|
outputs=fake_action_outputs, |
|
agent_ids=mock_decision_step.agent_id, |
|
) |
|
|
|
processor.publish_trajectory_queue(tqueue) |
|
|
|
processor.add_experiences( |
|
mock_decision_step, mock_terminal_step, 0, ActionInfo.empty() |
|
) |
|
|
|
remove_calls = [] |
|
for _ep in range(3): |
|
remove_calls.append(mock.call([get_global_agent_id(_ep, 0)])) |
|
for _ in range(5): |
|
processor.add_experiences( |
|
mock_decision_step, mock_terminal_step, _ep, fake_action_info |
|
) |
|
|
|
|
|
|
|
processor.end_episode() |
|
|
|
policy.remove_previous_action.assert_has_calls(remove_calls) |
|
|
|
assert len(processor._experience_buffers.keys()) == 0 |
|
assert len(processor._last_take_action_outputs.keys()) == 0 |
|
assert len(processor._episode_steps.keys()) == 0 |
|
assert len(processor._episode_rewards.keys()) == 0 |
|
|
|
|
|
def test_agent_manager(): |
|
policy = create_mock_policy() |
|
name_behavior_id = "test_brain_name" |
|
manager = AgentManager( |
|
policy, |
|
name_behavior_id, |
|
max_trajectory_length=5, |
|
stats_reporter=StatsReporter("testcat"), |
|
) |
|
assert len(manager._trajectory_queues) == 1 |
|
assert isinstance(manager._trajectory_queues[0], AgentManagerQueue) |
|
|
|
|
|
def test_agent_manager_queue(): |
|
queue = AgentManagerQueue(behavior_id="testbehavior") |
|
trajectory = mock.Mock(spec=Trajectory) |
|
assert queue.empty() |
|
queue.put(trajectory) |
|
assert not queue.empty() |
|
queue_traj = queue.get_nowait() |
|
assert isinstance(queue_traj, Trajectory) |
|
assert queue.empty() |
|
|
|
|
|
def test_agent_manager_stats(): |
|
policy = mock.Mock() |
|
stats_reporter = StatsReporter("FakeCategory") |
|
writer = mock.Mock() |
|
stats_reporter.add_writer(writer) |
|
manager = AgentManager(policy, "MyBehavior", stats_reporter) |
|
|
|
all_env_stats = [ |
|
{ |
|
"averaged": [(1.0, StatsAggregationMethod.AVERAGE)], |
|
"most_recent": [(2.0, StatsAggregationMethod.MOST_RECENT)], |
|
"summed": [(3.1, StatsAggregationMethod.SUM)], |
|
}, |
|
{ |
|
"averaged": [(3.0, StatsAggregationMethod.AVERAGE)], |
|
"most_recent": [(4.0, StatsAggregationMethod.MOST_RECENT)], |
|
"summed": [(1.1, StatsAggregationMethod.SUM)], |
|
}, |
|
] |
|
for env_stats in all_env_stats: |
|
manager.record_environment_stats(env_stats, worker_id=0) |
|
|
|
expected_stats = { |
|
"averaged": StatsSummary( |
|
full_dist=[1.0, 3.0], aggregation_method=StatsAggregationMethod.AVERAGE |
|
), |
|
"most_recent": StatsSummary( |
|
full_dist=[4.0], aggregation_method=StatsAggregationMethod.MOST_RECENT |
|
), |
|
"summed": StatsSummary( |
|
full_dist=[3.1, 1.1], aggregation_method=StatsAggregationMethod.SUM |
|
), |
|
} |
|
stats_reporter.write_stats(123) |
|
writer.write_stats.assert_any_call("FakeCategory", expected_stats, 123) |
|
|
|
|
|
StatsReporter.writers.remove(writer) |
|
|