zjowowen's picture
init space
079c32c
import torch
import torch.nn as nn
from typing import Union, Optional, Dict
import numpy as np
from ding.model.common.head import DiscreteHead, RegressionHead, ReparameterizationHead
from ding.utils import SequenceType, squeeze
from ding.model.common.encoder import FCEncoder, ConvEncoder
from torch.distributions import Independent, Normal
class InverseDynamicsModel(nn.Module):
"""
InverseDynamicsModel: infering missing action information from state transition.
input and output: given pair of observation, return action (s0,s1 --> a0 if n=2)
"""
def __init__(
self,
obs_shape: Union[int, SequenceType],
action_shape: Union[int, SequenceType],
encoder_hidden_size_list: SequenceType = [60, 80, 100, 40],
action_space: str = "regression",
activation: Optional[nn.Module] = nn.LeakyReLU(),
norm_type: Optional[str] = None
) -> None:
r"""
Overview:
Init the Inverse Dynamics (encoder + head) Model according to input arguments.
Arguments:
- obs_shape (:obj:`Union[int, SequenceType]`): Observation space shape, such as 8 or [4, 84, 84].
- action_shape (:obj:`Union[int, SequenceType]`): Action space shape, such as 6 or [2, 3, 3].
- encoder_hidden_size_list (:obj:`SequenceType`): Collection of ``hidden_size`` to pass to ``Encoder``, \
the last element must match ``head_hidden_size``.
- action_space (:obj:`String`): Action space, such as 'regression', 'reparameterization', 'discrete'.
- activation (:obj:`Optional[nn.Module]`): The type of activation function in networks \
if ``None`` then default set it to ``nn.LeakyReLU()`` refer to https://arxiv.org/abs/1805.01954
- norm_type (:obj:`Optional[str]`): The type of normalization in networks, see \
``ding.torch_utils.fc_block`` for more details.
"""
super(InverseDynamicsModel, self).__init__()
# For compatibility: 1, (1, ), [4, 32, 32]
obs_shape, action_shape = squeeze(obs_shape), squeeze(action_shape)
# FC encoder: obs and obs[next] ,so input shape is obs_shape*2
if isinstance(obs_shape, int) or len(obs_shape) == 1:
self.encoder = FCEncoder(
obs_shape * 2, encoder_hidden_size_list, activation=activation, norm_type=norm_type
)
elif len(obs_shape) == 3:
# FC encoder: obs and obs[next] ,so first channel need multiply 2
obs_shape = (obs_shape[0] * 2, *obs_shape[1:])
self.encoder = ConvEncoder(obs_shape, encoder_hidden_size_list, activation=activation, norm_type=norm_type)
else:
raise RuntimeError(
"not support obs_shape for pre-defined encoder: {}, please customize your own Model".format(obs_shape)
)
self.action_space = action_space
assert self.action_space in ['regression', 'reparameterization',
'discrete'], "not supported action_space: {}".format(self.action_space)
if self.action_space == "regression":
self.header = RegressionHead(
encoder_hidden_size_list[-1],
action_shape,
final_tanh=False,
activation=activation,
norm_type=norm_type
)
elif self.action_space == "reparameterization":
self.header = ReparameterizationHead(
encoder_hidden_size_list[-1],
action_shape,
sigma_type='conditioned',
activation=activation,
norm_type=norm_type
)
elif self.action_space == "discrete":
self.header = DiscreteHead(
encoder_hidden_size_list[-1], action_shape, activation=activation, norm_type=norm_type
)
def forward(self, x: torch.Tensor) -> Dict:
if self.action_space == "regression":
x = self.encoder(x)
x = self.header(x)
return {'action': x['pred']}
elif self.action_space == "reparameterization":
x = self.encoder(x)
x = self.header(x)
mu, sigma = x['mu'], x['sigma']
dist = Independent(Normal(mu, sigma), 1)
pred = dist.rsample()
action = torch.tanh(pred)
return {'logit': [mu, sigma], 'action': action}
elif self.action_space == "discrete":
x = self.encoder(x)
x = self.header(x)
return x
def predict_action(self, x: torch.Tensor) -> Dict:
if self.action_space == "discrete":
res = nn.Softmax(dim=-1)
action = torch.argmax(res(self.forward(x)['logit']), -1)
return {'action': action}
else:
return self.forward(x)
def train(self, training_set: dict, n_epoch: int, learning_rate: float, weight_decay: float):
r"""
Overview:
Train idm model, given pair of states return action (s_t,s_t+1,a_t)
Arguments:
- training_set (:obj:`dict`):states transition
- n_epoch (:obj:`int`): number of epoches
- learning_rate (:obj:`float`): learning rate for optimizer
- weight_decay (:obj:`float`): weight decay for optimizer
"""
if self.action_space == "discrete":
criterion = nn.CrossEntropyLoss()
else:
# criterion = nn.MSELoss()
criterion = nn.L1Loss()
optimizer = torch.optim.AdamW(self.parameters(), lr=learning_rate, weight_decay=weight_decay)
loss_list = []
for itr in range(n_epoch):
data = training_set['obs']
y = training_set['action']
if self.action_space == "discrete":
y_pred = self.forward(data)['logit']
else:
y_pred = self.forward(data)['action']
loss = criterion(y_pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_list.append(loss.item())
loss = np.mean(loss_list)
return loss