File size: 3,071 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
import pytest
from itertools import product
import torch
from ding.model.template import BCQ
from ding.torch_utils import is_differentiable
B = 4
obs_shape = [4, (8, )]
act_shape = [3, (6, )]
args = list(product(*[obs_shape, act_shape]))
@pytest.mark.unittest
class TestBCQ:
def output_check(self, model, outputs):
if isinstance(outputs, torch.Tensor):
loss = outputs.sum()
elif isinstance(outputs, dict):
loss = sum([v.sum() for v in outputs.values()])
is_differentiable(loss, model)
@pytest.mark.parametrize('obs_shape, act_shape', args)
def test_BCQ(self, obs_shape, act_shape):
if isinstance(obs_shape, int):
inputs_obs = torch.randn(B, obs_shape)
else:
inputs_obs = torch.randn(B, *obs_shape)
if isinstance(act_shape, int):
inputs_act = torch.randn(B, act_shape)
else:
inputs_act = torch.randn(B, *act_shape)
inputs = {'obs': inputs_obs, 'action': inputs_act}
model = BCQ(obs_shape, act_shape)
outputs_c = model(inputs, mode='compute_critic')
assert isinstance(outputs_c, dict)
if isinstance(act_shape, int):
assert torch.stack(outputs_c['q_value']).shape == (2, B)
else:
assert torch.stack(outputs_c['q_value']).shape == (2, B)
self.output_check(model.critic, torch.stack(outputs_c['q_value']))
outputs_a = model(inputs, mode='compute_actor')
assert isinstance(outputs_a, dict)
if isinstance(act_shape, int):
assert outputs_a['action'].shape == (B, act_shape)
elif len(act_shape) == 1:
assert outputs_a['action'].shape == (B, *act_shape)
self.output_check(model.actor, outputs_a)
outputs_vae = model(inputs, mode='compute_vae')
assert isinstance(outputs_vae, dict)
if isinstance(act_shape, int):
assert outputs_vae['recons_action'].shape == (B, act_shape)
assert outputs_vae['mu'].shape == (B, act_shape * 2)
assert outputs_vae['log_var'].shape == (B, act_shape * 2)
assert outputs_vae['z'].shape == (B, act_shape * 2)
elif len(act_shape) == 1:
assert outputs_vae['recons_action'].shape == (B, *act_shape)
assert outputs_vae['mu'].shape == (B, act_shape[0] * 2)
assert outputs_vae['log_var'].shape == (B, act_shape[0] * 2)
assert outputs_vae['z'].shape == (B, act_shape[0] * 2)
if isinstance(obs_shape, int):
assert outputs_vae['prediction_residual'].shape == (B, obs_shape)
else:
assert outputs_vae['prediction_residual'].shape == (B, *obs_shape)
outputs_eval = model(inputs, mode='compute_eval')
assert isinstance(outputs_eval, dict)
assert isinstance(outputs_eval, dict)
if isinstance(act_shape, int):
assert outputs_eval['action'].shape == (B, act_shape)
elif len(act_shape) == 1:
assert outputs_eval['action'].shape == (B, *act_shape)
|