File size: 2,004 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
import pytest
import torch
from ding.torch_utils import is_differentiable
from ding.model.template.wqmix import MixerStar, WQMix
args = [True, False]
@pytest.mark.unittest
def test_mixer_star():
agent_num, bs, embedding_dim = 4, 3, 32
agent_q = torch.randn(bs, agent_num)
state_embedding = torch.randn(bs, embedding_dim)
mixer_star = MixerStar(agent_num, embedding_dim, 64)
total_q = mixer_star(agent_q, state_embedding)
assert total_q.shape == (bs, )
loss = total_q.mean()
is_differentiable(loss, mixer_star)
@pytest.mark.unittest
@pytest.mark.parametrize('is_q_star', args)
def test_wqmix(is_q_star):
agent_num, bs, T = 4, 3, 8
obs_dim, global_obs_dim, action_dim = 32, 32 * 4, 9
embedding_dim = 64
wqmix_model = WQMix(agent_num, obs_dim, global_obs_dim, action_dim, [128, embedding_dim], 'gru')
data = {
'obs': {
'agent_state': torch.randn(T, bs, agent_num, obs_dim),
'global_state': torch.randn(T, bs, global_obs_dim),
'action_mask': torch.randint(0, 2, size=(T, bs, agent_num, action_dim))
},
'prev_state': [[None for _ in range(agent_num)] for _ in range(bs)],
'action': torch.randint(0, action_dim, size=(T, bs, agent_num))
}
output = wqmix_model(data, single_step=False, q_star=is_q_star)
assert set(output.keys()) == set(['total_q', 'logit', 'next_state', 'action_mask'])
assert output['total_q'].shape == (T, bs)
assert output['logit'].shape == (T, bs, agent_num, action_dim)
assert len(output['next_state']) == bs and all([len(n) == agent_num for n in output['next_state']])
print(output['next_state'][0][0]['h'].shape)
loss = output['total_q'].sum()
if is_q_star:
is_differentiable(loss, [wqmix_model._q_network_star, wqmix_model._mixer_star])
else:
is_differentiable(loss, [wqmix_model._q_network, wqmix_model._mixer])
data.pop('action')
output = wqmix_model(data, single_step=False, q_star=is_q_star)
|