AnnaMats's picture
Second Push
05c9ac2
import math
import tempfile
import numpy as np
from typing import Dict
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.trainer import TrainerFactory
from mlagents.trainers.simple_env_manager import SimpleEnvManager
from mlagents.trainers.stats import StatsReporter, StatsWriter, StatsSummary
from mlagents.trainers.environment_parameter_manager import EnvironmentParameterManager
from mlagents_envs.side_channel.environment_parameters_channel import (
EnvironmentParametersChannel,
)
class DebugWriter(StatsWriter):
"""
Print to stdout so stats can be viewed in pytest
"""
def __init__(self):
self._last_reward_summary: Dict[str, float] = {}
def get_last_rewards(self):
return self._last_reward_summary
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
for val, stats_summary in values.items():
if (
val == "Environment/Cumulative Reward"
or val == "Environment/Group Cumulative Reward"
):
print(step, val, stats_summary.aggregated_value)
self._last_reward_summary[category] = stats_summary.aggregated_value
# The reward processor is passed as an argument to _check_environment_trains.
# It is applied to the list of all final rewards for each brain individually.
# This is so that we can process all final rewards in different ways for different algorithms.
# Custom reward processors should be built within the test function and passed to _check_environment_trains
# Default is average over the last 5 final rewards
def default_reward_processor(rewards, last_n_rewards=5):
rewards_to_use = rewards[-last_n_rewards:]
# For debugging tests
print(f"Last {last_n_rewards} rewards:", rewards_to_use)
return np.array(rewards[-last_n_rewards:], dtype=np.float32).mean()
def check_environment_trains(
env,
trainer_config,
reward_processor=default_reward_processor,
env_parameter_manager=None,
success_threshold=0.9,
env_manager=None,
training_seed=None,
):
if env_parameter_manager is None:
env_parameter_manager = EnvironmentParameterManager()
# Create controller and begin training.
with tempfile.TemporaryDirectory() as dir:
run_id = "id"
seed = 1337 if training_seed is None else training_seed
StatsReporter.writers.clear() # Clear StatsReporters so we don't write to file
debug_writer = DebugWriter()
StatsReporter.add_writer(debug_writer)
if env_manager is None:
env_manager = SimpleEnvManager(env, EnvironmentParametersChannel())
trainer_factory = TrainerFactory(
trainer_config=trainer_config,
output_path=dir,
train_model=True,
load_model=False,
seed=seed,
param_manager=env_parameter_manager,
multi_gpu=False,
)
tc = TrainerController(
trainer_factory=trainer_factory,
output_path=dir,
run_id=run_id,
param_manager=env_parameter_manager,
train=True,
training_seed=seed,
)
# Begin training
tc.start_learning(env_manager)
if (
success_threshold is not None
): # For tests where we are just checking setup and not reward
processed_rewards = [
reward_processor(rewards) for rewards in env.final_rewards.values()
]
assert all(not math.isnan(reward) for reward in processed_rewards)
assert all(reward > success_threshold for reward in processed_rewards)