File size: 6,228 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import torch
import torch.nn as nn
from typing import Union, Optional, Dict
import numpy as np

from ding.model.common.head import DiscreteHead, RegressionHead, ReparameterizationHead
from ding.utils import SequenceType, squeeze
from ding.model.common.encoder import FCEncoder, ConvEncoder
from torch.distributions import Independent, Normal


class InverseDynamicsModel(nn.Module):
    """
    InverseDynamicsModel: infering missing action information from state transition.
    input and output: given pair of observation, return action (s0,s1 --> a0 if n=2)
    """

    def __init__(
            self,
            obs_shape: Union[int, SequenceType],
            action_shape: Union[int, SequenceType],
            encoder_hidden_size_list: SequenceType = [60, 80, 100, 40],
            action_space: str = "regression",
            activation: Optional[nn.Module] = nn.LeakyReLU(),
            norm_type: Optional[str] = None
    ) -> None:
        r"""
        Overview:
            Init the Inverse Dynamics (encoder + head) Model according to input arguments.
        Arguments:
            - obs_shape (:obj:`Union[int, SequenceType]`): Observation space shape, such as 8 or [4, 84, 84].
            - action_shape (:obj:`Union[int, SequenceType]`): Action space shape, such as 6 or [2, 3, 3].
            - encoder_hidden_size_list (:obj:`SequenceType`): Collection of ``hidden_size`` to pass to ``Encoder``, \
                the last element must match ``head_hidden_size``.
            - action_space (:obj:`String`): Action space, such as 'regression', 'reparameterization', 'discrete'.
            - activation (:obj:`Optional[nn.Module]`): The type of activation function in networks \
                if ``None`` then default set it to ``nn.LeakyReLU()`` refer to https://arxiv.org/abs/1805.01954
            - norm_type (:obj:`Optional[str]`): The type of normalization in networks, see \
                ``ding.torch_utils.fc_block`` for more details.
        """
        super(InverseDynamicsModel, self).__init__()
        # For compatibility: 1, (1, ), [4, 32, 32]
        obs_shape, action_shape = squeeze(obs_shape), squeeze(action_shape)
        # FC encoder: obs and obs[next] ,so input shape is obs_shape*2
        if isinstance(obs_shape, int) or len(obs_shape) == 1:
            self.encoder = FCEncoder(
                obs_shape * 2, encoder_hidden_size_list, activation=activation, norm_type=norm_type
            )
        elif len(obs_shape) == 3:
            # FC encoder: obs and obs[next] ,so first channel need multiply 2
            obs_shape = (obs_shape[0] * 2, *obs_shape[1:])
            self.encoder = ConvEncoder(obs_shape, encoder_hidden_size_list, activation=activation, norm_type=norm_type)
        else:
            raise RuntimeError(
                "not support obs_shape for pre-defined encoder: {}, please customize your own Model".format(obs_shape)
            )
        self.action_space = action_space
        assert self.action_space in ['regression', 'reparameterization',
                                     'discrete'], "not supported action_space: {}".format(self.action_space)
        if self.action_space == "regression":
            self.header = RegressionHead(
                encoder_hidden_size_list[-1],
                action_shape,
                final_tanh=False,
                activation=activation,
                norm_type=norm_type
            )
        elif self.action_space == "reparameterization":
            self.header = ReparameterizationHead(
                encoder_hidden_size_list[-1],
                action_shape,
                sigma_type='conditioned',
                activation=activation,
                norm_type=norm_type
            )
        elif self.action_space == "discrete":
            self.header = DiscreteHead(
                encoder_hidden_size_list[-1], action_shape, activation=activation, norm_type=norm_type
            )

    def forward(self, x: torch.Tensor) -> Dict:
        if self.action_space == "regression":
            x = self.encoder(x)
            x = self.header(x)
            return {'action': x['pred']}
        elif self.action_space == "reparameterization":
            x = self.encoder(x)
            x = self.header(x)
            mu, sigma = x['mu'], x['sigma']
            dist = Independent(Normal(mu, sigma), 1)
            pred = dist.rsample()
            action = torch.tanh(pred)
            return {'logit': [mu, sigma], 'action': action}
        elif self.action_space == "discrete":
            x = self.encoder(x)
            x = self.header(x)
            return x

    def predict_action(self, x: torch.Tensor) -> Dict:
        if self.action_space == "discrete":
            res = nn.Softmax(dim=-1)
            action = torch.argmax(res(self.forward(x)['logit']), -1)
            return {'action': action}
        else:
            return self.forward(x)

    def train(self, training_set: dict, n_epoch: int, learning_rate: float, weight_decay: float):
        r"""
        Overview:
            Train idm model, given pair of states return action (s_t,s_t+1,a_t)

        Arguments:
            - training_set (:obj:`dict`):states transition
            - n_epoch (:obj:`int`): number of epoches
            - learning_rate (:obj:`float`): learning rate for optimizer
            - weight_decay (:obj:`float`): weight decay for optimizer
        """
        if self.action_space == "discrete":
            criterion = nn.CrossEntropyLoss()
        else:
            # criterion = nn.MSELoss()
            criterion = nn.L1Loss()
        optimizer = torch.optim.AdamW(self.parameters(), lr=learning_rate, weight_decay=weight_decay)
        loss_list = []
        for itr in range(n_epoch):
            data = training_set['obs']
            y = training_set['action']
            if self.action_space == "discrete":
                y_pred = self.forward(data)['logit']
            else:
                y_pred = self.forward(data)['action']
            loss = criterion(y_pred, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_list.append(loss.item())
        loss = np.mean(loss_list)
        return loss