File size: 6,228 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import torch
import torch.nn as nn
from typing import Union, Optional, Dict
import numpy as np
from ding.model.common.head import DiscreteHead, RegressionHead, ReparameterizationHead
from ding.utils import SequenceType, squeeze
from ding.model.common.encoder import FCEncoder, ConvEncoder
from torch.distributions import Independent, Normal
class InverseDynamicsModel(nn.Module):
"""
InverseDynamicsModel: infering missing action information from state transition.
input and output: given pair of observation, return action (s0,s1 --> a0 if n=2)
"""
def __init__(
self,
obs_shape: Union[int, SequenceType],
action_shape: Union[int, SequenceType],
encoder_hidden_size_list: SequenceType = [60, 80, 100, 40],
action_space: str = "regression",
activation: Optional[nn.Module] = nn.LeakyReLU(),
norm_type: Optional[str] = None
) -> None:
r"""
Overview:
Init the Inverse Dynamics (encoder + head) Model according to input arguments.
Arguments:
- obs_shape (:obj:`Union[int, SequenceType]`): Observation space shape, such as 8 or [4, 84, 84].
- action_shape (:obj:`Union[int, SequenceType]`): Action space shape, such as 6 or [2, 3, 3].
- encoder_hidden_size_list (:obj:`SequenceType`): Collection of ``hidden_size`` to pass to ``Encoder``, \
the last element must match ``head_hidden_size``.
- action_space (:obj:`String`): Action space, such as 'regression', 'reparameterization', 'discrete'.
- activation (:obj:`Optional[nn.Module]`): The type of activation function in networks \
if ``None`` then default set it to ``nn.LeakyReLU()`` refer to https://arxiv.org/abs/1805.01954
- norm_type (:obj:`Optional[str]`): The type of normalization in networks, see \
``ding.torch_utils.fc_block`` for more details.
"""
super(InverseDynamicsModel, self).__init__()
# For compatibility: 1, (1, ), [4, 32, 32]
obs_shape, action_shape = squeeze(obs_shape), squeeze(action_shape)
# FC encoder: obs and obs[next] ,so input shape is obs_shape*2
if isinstance(obs_shape, int) or len(obs_shape) == 1:
self.encoder = FCEncoder(
obs_shape * 2, encoder_hidden_size_list, activation=activation, norm_type=norm_type
)
elif len(obs_shape) == 3:
# FC encoder: obs and obs[next] ,so first channel need multiply 2
obs_shape = (obs_shape[0] * 2, *obs_shape[1:])
self.encoder = ConvEncoder(obs_shape, encoder_hidden_size_list, activation=activation, norm_type=norm_type)
else:
raise RuntimeError(
"not support obs_shape for pre-defined encoder: {}, please customize your own Model".format(obs_shape)
)
self.action_space = action_space
assert self.action_space in ['regression', 'reparameterization',
'discrete'], "not supported action_space: {}".format(self.action_space)
if self.action_space == "regression":
self.header = RegressionHead(
encoder_hidden_size_list[-1],
action_shape,
final_tanh=False,
activation=activation,
norm_type=norm_type
)
elif self.action_space == "reparameterization":
self.header = ReparameterizationHead(
encoder_hidden_size_list[-1],
action_shape,
sigma_type='conditioned',
activation=activation,
norm_type=norm_type
)
elif self.action_space == "discrete":
self.header = DiscreteHead(
encoder_hidden_size_list[-1], action_shape, activation=activation, norm_type=norm_type
)
def forward(self, x: torch.Tensor) -> Dict:
if self.action_space == "regression":
x = self.encoder(x)
x = self.header(x)
return {'action': x['pred']}
elif self.action_space == "reparameterization":
x = self.encoder(x)
x = self.header(x)
mu, sigma = x['mu'], x['sigma']
dist = Independent(Normal(mu, sigma), 1)
pred = dist.rsample()
action = torch.tanh(pred)
return {'logit': [mu, sigma], 'action': action}
elif self.action_space == "discrete":
x = self.encoder(x)
x = self.header(x)
return x
def predict_action(self, x: torch.Tensor) -> Dict:
if self.action_space == "discrete":
res = nn.Softmax(dim=-1)
action = torch.argmax(res(self.forward(x)['logit']), -1)
return {'action': action}
else:
return self.forward(x)
def train(self, training_set: dict, n_epoch: int, learning_rate: float, weight_decay: float):
r"""
Overview:
Train idm model, given pair of states return action (s_t,s_t+1,a_t)
Arguments:
- training_set (:obj:`dict`):states transition
- n_epoch (:obj:`int`): number of epoches
- learning_rate (:obj:`float`): learning rate for optimizer
- weight_decay (:obj:`float`): weight decay for optimizer
"""
if self.action_space == "discrete":
criterion = nn.CrossEntropyLoss()
else:
# criterion = nn.MSELoss()
criterion = nn.L1Loss()
optimizer = torch.optim.AdamW(self.parameters(), lr=learning_rate, weight_decay=weight_decay)
loss_list = []
for itr in range(n_epoch):
data = training_set['obs']
y = training_set['action']
if self.action_space == "discrete":
y_pred = self.forward(data)['logit']
else:
y_pred = self.forward(data)['action']
loss = criterion(y_pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_list.append(loss.item())
loss = np.mean(loss_list)
return loss
|