File size: 2,713 Bytes
0e936e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from typing import Optional, Tuple, Type

import numpy as np
import torch
import torch.nn as nn
from numpy.typing import NDArray

from rl_algo_impls.shared.actor import Actor, PiForward
from rl_algo_impls.shared.actor.categorical import MaskedCategorical
from rl_algo_impls.shared.actor.gridnet import GridnetDistribution
from rl_algo_impls.shared.encoder import EncoderOutDim
from rl_algo_impls.shared.module.module import layer_init


class Transpose(nn.Module):
    def __init__(self, permutation: Tuple[int, ...]) -> None:
        super().__init__()
        self.permutation = permutation

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return x.permute(self.permutation)


class GridnetDecoder(Actor):
    def __init__(
        self,
        map_size: int,
        action_vec: NDArray[np.int64],
        in_dim: EncoderOutDim,
        activation: Type[nn.Module] = nn.ReLU,
        init_layers_orthogonal: bool = True,
    ) -> None:
        super().__init__()
        self.map_size = map_size
        self.action_vec = action_vec
        assert isinstance(in_dim, tuple)
        self.deconv = nn.Sequential(
            layer_init(
                nn.ConvTranspose2d(
                    in_dim[0], 128, 3, stride=2, padding=1, output_padding=1
                ),
                init_layers_orthogonal=init_layers_orthogonal,
            ),
            activation(),
            layer_init(
                nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
                init_layers_orthogonal=init_layers_orthogonal,
            ),
            activation(),
            layer_init(
                nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
                init_layers_orthogonal=init_layers_orthogonal,
            ),
            activation(),
            layer_init(
                nn.ConvTranspose2d(
                    32, action_vec.sum(), 3, stride=2, padding=1, output_padding=1
                ),
                init_layers_orthogonal=init_layers_orthogonal,
                std=0.01,
            ),
            Transpose((0, 2, 3, 1)),
        )

    def forward(
        self,
        obs: torch.Tensor,
        actions: Optional[torch.Tensor] = None,
        action_masks: Optional[torch.Tensor] = None,
    ) -> PiForward:
        assert (
            action_masks is not None
        ), f"No mask case unhandled in {self.__class__.__name__}"
        logits = self.deconv(obs)
        pi = GridnetDistribution(self.map_size, self.action_vec, logits, action_masks)
        return self.pi_forward(pi, actions)

    @property
    def action_shape(self) -> Tuple[int, ...]:
        return (self.map_size, len(self.action_vec))