File size: 2,713 Bytes
0e936e1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
from typing import Optional, Tuple, Type
import numpy as np
import torch
import torch.nn as nn
from numpy.typing import NDArray
from rl_algo_impls.shared.actor import Actor, PiForward
from rl_algo_impls.shared.actor.categorical import MaskedCategorical
from rl_algo_impls.shared.actor.gridnet import GridnetDistribution
from rl_algo_impls.shared.encoder import EncoderOutDim
from rl_algo_impls.shared.module.module import layer_init
class Transpose(nn.Module):
def __init__(self, permutation: Tuple[int, ...]) -> None:
super().__init__()
self.permutation = permutation
def forward(self, x: torch.Tensor) -> torch.Tensor:
return x.permute(self.permutation)
class GridnetDecoder(Actor):
def __init__(
self,
map_size: int,
action_vec: NDArray[np.int64],
in_dim: EncoderOutDim,
activation: Type[nn.Module] = nn.ReLU,
init_layers_orthogonal: bool = True,
) -> None:
super().__init__()
self.map_size = map_size
self.action_vec = action_vec
assert isinstance(in_dim, tuple)
self.deconv = nn.Sequential(
layer_init(
nn.ConvTranspose2d(
in_dim[0], 128, 3, stride=2, padding=1, output_padding=1
),
init_layers_orthogonal=init_layers_orthogonal,
),
activation(),
layer_init(
nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
init_layers_orthogonal=init_layers_orthogonal,
),
activation(),
layer_init(
nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
init_layers_orthogonal=init_layers_orthogonal,
),
activation(),
layer_init(
nn.ConvTranspose2d(
32, action_vec.sum(), 3, stride=2, padding=1, output_padding=1
),
init_layers_orthogonal=init_layers_orthogonal,
std=0.01,
),
Transpose((0, 2, 3, 1)),
)
def forward(
self,
obs: torch.Tensor,
actions: Optional[torch.Tensor] = None,
action_masks: Optional[torch.Tensor] = None,
) -> PiForward:
assert (
action_masks is not None
), f"No mask case unhandled in {self.__class__.__name__}"
logits = self.deconv(obs)
pi = GridnetDistribution(self.map_size, self.action_vec, logits, action_masks)
return self.pi_forward(pi, actions)
@property
def action_shape(self) -> Tuple[int, ...]:
return (self.map_size, len(self.action_vec))
|