import pytest from itertools import product import torch from ding.model.template import BCQ from ding.torch_utils import is_differentiable B = 4 obs_shape = [4, (8, )] act_shape = [3, (6, )] args = list(product(*[obs_shape, act_shape])) @pytest.mark.unittest class TestBCQ: def output_check(self, model, outputs): if isinstance(outputs, torch.Tensor): loss = outputs.sum() elif isinstance(outputs, dict): loss = sum([v.sum() for v in outputs.values()]) is_differentiable(loss, model) @pytest.mark.parametrize('obs_shape, act_shape', args) def test_BCQ(self, obs_shape, act_shape): if isinstance(obs_shape, int): inputs_obs = torch.randn(B, obs_shape) else: inputs_obs = torch.randn(B, *obs_shape) if isinstance(act_shape, int): inputs_act = torch.randn(B, act_shape) else: inputs_act = torch.randn(B, *act_shape) inputs = {'obs': inputs_obs, 'action': inputs_act} model = BCQ(obs_shape, act_shape) outputs_c = model(inputs, mode='compute_critic') assert isinstance(outputs_c, dict) if isinstance(act_shape, int): assert torch.stack(outputs_c['q_value']).shape == (2, B) else: assert torch.stack(outputs_c['q_value']).shape == (2, B) self.output_check(model.critic, torch.stack(outputs_c['q_value'])) outputs_a = model(inputs, mode='compute_actor') assert isinstance(outputs_a, dict) if isinstance(act_shape, int): assert outputs_a['action'].shape == (B, act_shape) elif len(act_shape) == 1: assert outputs_a['action'].shape == (B, *act_shape) self.output_check(model.actor, outputs_a) outputs_vae = model(inputs, mode='compute_vae') assert isinstance(outputs_vae, dict) if isinstance(act_shape, int): assert outputs_vae['recons_action'].shape == (B, act_shape) assert outputs_vae['mu'].shape == (B, act_shape * 2) assert outputs_vae['log_var'].shape == (B, act_shape * 2) assert outputs_vae['z'].shape == (B, act_shape * 2) elif len(act_shape) == 1: assert outputs_vae['recons_action'].shape == (B, *act_shape) assert outputs_vae['mu'].shape == (B, act_shape[0] * 2) assert outputs_vae['log_var'].shape == (B, act_shape[0] * 2) assert outputs_vae['z'].shape == (B, act_shape[0] * 2) if isinstance(obs_shape, int): assert outputs_vae['prediction_residual'].shape == (B, obs_shape) else: assert outputs_vae['prediction_residual'].shape == (B, *obs_shape) outputs_eval = model(inputs, mode='compute_eval') assert isinstance(outputs_eval, dict) assert isinstance(outputs_eval, dict) if isinstance(act_shape, int): assert outputs_eval['action'].shape == (B, act_shape) elif len(act_shape) == 1: assert outputs_eval['action'].shape == (B, *act_shape)