import math import torch import torch.nn as nn from torch.nn import functional as F from ..layers import deformable_conv, SE torch.manual_seed(0) # This is the simple CNN layer,that performs a 2-D convolution while maintaining the dimensions of the input(except for the features dimension) class CNN_layer(nn.Module): def __init__(self, in_ch, out_ch, kernel_size, dropout, bias=True): super(CNN_layer, self).__init__() self.kernel_size = kernel_size padding = ( (kernel_size[0] - 1) // 2, (kernel_size[1] - 1) // 2) # padding so that both dimensions are maintained assert kernel_size[0] % 2 == 1 and kernel_size[1] % 2 == 1 self.block1 = [nn.Conv2d(in_ch, out_ch, kernel_size=kernel_size, padding=padding, dilation=(1, 1)), nn.BatchNorm2d(out_ch), nn.Dropout(dropout, inplace=True), ] self.block1 = nn.Sequential(*self.block1) def forward(self, x): output = self.block1(x) return output class FPN(nn.Module): def __init__(self, in_ch, out_ch, kernel, # (3,1) dropout, reduction, ): super(FPN, self).__init__() kernel_size = kernel if isinstance(kernel, (tuple, list)) else (kernel, kernel) padding = ((kernel_size[0] - 1) // 2, (kernel_size[1] - 1) // 2) pad1 = (padding[0], padding[1]) pad2 = (padding[0] + pad1[0], padding[1] + pad1[1]) pad3 = (padding[0] + pad2[0], padding[1] + pad2[1]) dil1 = (1, 1) dil2 = (1 + pad1[0], 1 + pad1[1]) dil3 = (1 + pad2[0], 1 + pad2[1]) self.block1 = nn.Sequential(nn.Conv2d(in_ch, out_ch, kernel_size=kernel_size, padding=pad1, dilation=dil1), nn.BatchNorm2d(out_ch), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.block2 = nn.Sequential(nn.Conv2d(in_ch, out_ch, kernel_size=kernel_size, padding=pad2, dilation=dil2), nn.BatchNorm2d(out_ch), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.block3 = nn.Sequential(nn.Conv2d(in_ch, out_ch, kernel_size=kernel_size, padding=pad3, dilation=dil3), nn.BatchNorm2d(out_ch), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.pooling = nn.AdaptiveAvgPool2d((1, 1)) # Action Context. self.compress = nn.Conv2d(out_ch * 3 + in_ch, out_ch, kernel_size=(1, 1)) # PRELU is outside the loop, check at the end of the code. def forward(self, x): b, dim, joints, seq = x.shape global_action = F.interpolate(self.pooling(x), (joints, seq)) out = torch.cat((self.block1(x), self.block2(x), self.block3(x), global_action), dim=1) out = self.compress(out) return out def mish(x): return (x * torch.tanh(F.softplus(x))) class ConvTemporalGraphical(nn.Module): # Source : https://github.com/yysijie/st-gcn/blob/master/net/st_gcn.py r"""The basic module for applying a graph convolution. Args: Shape: - Input: Input graph sequence in :math:`(N, in_ch, T_{in}, V)` format - Output: Outpu graph sequence in :math:`(N, out_ch, T_{out}, V)` format where :math:`N` is a batch size, :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]`, :math:`T_{in}/T_{out}` is a length of input/output sequence, :math:`V` is the number of graph nodes. """ def __init__(self, time_dim, joints_dim, domain, interpratable): super(ConvTemporalGraphical, self).__init__() if domain == "time": # learnable, graph-agnostic 3-d adjacency matrix(or edge importance matrix) size = joints_dim if not interpratable: self.A = nn.Parameter(torch.FloatTensor(time_dim, size, size)) self.domain = 'nctv,tvw->nctw' else: self.domain = 'nctv,ntvw->nctw' elif domain == "space": size = time_dim if not interpratable: self.A = nn.Parameter(torch.FloatTensor(joints_dim, size, size)) self.domain = 'nctv,vtq->ncqv' else: self.domain = 'nctv,nvtq->ncqv' if not interpratable: stdv = 1. / math.sqrt(self.A.size(1)) self.A.data.uniform_(-stdv, stdv) def forward(self, x): x = torch.einsum(self.domain, (x, self.A)) return x.contiguous() class Map2Adj(nn.Module): def __init__(self, in_ch, time_dim, joints_dim, domain, dropout, ): super(Map2Adj, self).__init__() self.domain = domain inter_ch = in_ch // 2 self.time_compress = nn.Sequential(nn.Conv2d(in_ch, inter_ch, kernel_size=1, bias=False), nn.BatchNorm2d(inter_ch), nn.PReLU(), nn.Conv2d(inter_ch, inter_ch, kernel_size=(time_dim, 1), bias=False), nn.BatchNorm2d(inter_ch), nn.Dropout(dropout, inplace=True), nn.Conv2d(inter_ch, time_dim, kernel_size=1, bias=False), ) self.joint_compress = nn.Sequential(nn.Conv2d(in_ch, inter_ch, kernel_size=1, bias=False), nn.BatchNorm2d(inter_ch), nn.PReLU(), nn.Conv2d(inter_ch, inter_ch, kernel_size=(1, joints_dim), bias=False), nn.BatchNorm2d(inter_ch), nn.Dropout(dropout, inplace=True), nn.Conv2d(inter_ch, joints_dim, kernel_size=1, bias=False), ) if self.domain == "space": ch = joints_dim self.perm1 = (0, 1, 2, 3) self.perm2 = (0, 3, 2, 1) if self.domain == "time": ch = time_dim self.perm1 = (0, 2, 1, 3) self.perm2 = (0, 1, 2, 3) inter_ch = ch # // 2 self.expansor = nn.Sequential(nn.Conv2d(ch, inter_ch, kernel_size=1, bias=False), nn.BatchNorm2d(inter_ch), nn.Dropout(dropout, inplace=True), nn.PReLU(), nn.Conv2d(inter_ch, ch, kernel_size=1, bias=False), ) self.time_compress.apply(self._init_weights) self.joint_compress.apply(self._init_weights) self.expansor.apply(self._init_weights) def _init_weights(self, m, gain=0.05): if isinstance(m, nn.Linear): torch.nn.init.xavier_uniform_(m.weight, gain=gain) if isinstance(m, (nn.Conv2d, nn.Conv1d)): torch.nn.init.xavier_normal_(m.weight, gain=gain) if isinstance(m, nn.PReLU): torch.nn.init.constant_(m.weight, 0.25) def forward(self, x): b, dims, seq, joints = x.shape dim_seq = self.time_compress(x) dim_space = self.joint_compress(x) o = torch.matmul(dim_space.permute(self.perm1), dim_seq.permute(self.perm2)) Adj = self.expansor(o) return Adj class Domain_GCNN_layer(nn.Module): """ Shape: - Input[0]: Input graph sequence in :math:`(N, in_ch, T_{in}, V)` format - Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format - Output[0]: Outpu graph sequence in :math:`(N, out_ch, T_{out}, V)` format where :math:`N` is a batch size, :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]`, :math:`T_{in}/T_{out}` is a length of input/output sequence, :math:`V` is the number of graph nodes. :in_ch= dimension of coordinates : out_ch=dimension of coordinates + """ def __init__(self, in_ch, out_ch, kernel_size, stride, time_dim, joints_dim, domain, interpratable, dropout, bias=True): super(Domain_GCNN_layer, self).__init__() self.kernel_size = kernel_size assert self.kernel_size[0] % 2 == 1 assert self.kernel_size[1] % 2 == 1 padding = ((self.kernel_size[0] - 1) // 2, (self.kernel_size[1] - 1) // 2) self.interpratable = interpratable self.domain = domain self.gcn = ConvTemporalGraphical(time_dim, joints_dim, domain, interpratable) self.tcn = nn.Sequential(nn.Conv2d(in_ch, out_ch, (self.kernel_size[0], self.kernel_size[1]), (stride, stride), padding, ), nn.BatchNorm2d(out_ch), nn.Dropout(dropout, inplace=True), ) if stride != 1 or in_ch != out_ch: self.residual = nn.Sequential(nn.Conv2d(in_ch, out_ch, kernel_size=1, stride=(1, 1)), nn.BatchNorm2d(out_ch), ) else: self.residual = nn.Identity() if self.interpratable: self.map_to_adj = Map2Adj(in_ch, time_dim, joints_dim, domain, dropout, ) else: self.map_to_adj = nn.Identity() self.prelu = nn.PReLU() def forward(self, x): # assert A.shape[0] == self.kernel_size[1], print(A.shape[0],self.kernel_size) res = self.residual(x) self.Adj = self.map_to_adj(x) if self.interpratable: self.gcn.A = self.Adj x1 = self.gcn(x) x2 = self.tcn(x1) x3 = x2 + res x4 = self.prelu(x3) return x4 # Dynamic SpatioTemporal Decompose Graph Convolutions (DSTD-GC) class DSTD_GC(nn.Module): """ Shape: - Input[0]: Input graph sequence in :math:`(N, in_ch, T_{in}, V)` format - Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format - Output[0]: Outpu graph sequence in :math:`(N, out_ch, T_{out}, V)` format where :math:`N` is a batch size, :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]`, :math:`T_{in}/T_{out}` is a length of input/output sequence, :math:`V` is the number of graph nodes. : in_ch= dimension of coordinates : out_ch=dimension of coordinates + """ def __init__(self, in_ch, out_ch, interpratable, kernel_size, stride, time_dim, joints_dim, reduction, dropout): super(DSTD_GC, self).__init__() self.dsgn = Domain_GCNN_layer(in_ch, out_ch, kernel_size, stride, time_dim, joints_dim, "space", interpratable, dropout) self.tsgn = Domain_GCNN_layer(in_ch, out_ch, kernel_size, stride, time_dim, joints_dim, "time", interpratable, dropout) self.compressor = nn.Sequential(nn.Conv2d(out_ch * 2, out_ch, 1, bias=False), nn.BatchNorm2d(out_ch), nn.PReLU(), SE.SELayer2d(out_ch, reduction=reduction), ) if stride != 1 or in_ch != out_ch: self.residual = nn.Sequential(nn.Conv2d(in_ch, out_ch, kernel_size=1, stride=(1, 1)), nn.BatchNorm2d(out_ch), ) else: self.residual = nn.Identity() # Weighting features out_ch_c = out_ch // 2 if out_ch // 2 > 1 else 1 self.global_norm = nn.BatchNorm2d(in_ch) self.conv_s = nn.Sequential(nn.Conv2d(in_ch, out_ch_c, (time_dim, 1), bias=False), nn.BatchNorm2d(out_ch_c), nn.Dropout(dropout, inplace=True), nn.PReLU(), nn.Conv2d(out_ch_c, out_ch, (1, joints_dim), bias=False), nn.BatchNorm2d(out_ch), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.conv_t = nn.Sequential(nn.Conv2d(in_ch, out_ch_c, (time_dim, 1), bias=False), nn.BatchNorm2d(out_ch_c), nn.Dropout(dropout, inplace=True), nn.PReLU(), nn.Conv2d(out_ch_c, out_ch, (1, joints_dim), bias=False), nn.BatchNorm2d(out_ch), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.map_s = nn.Sequential(nn.Linear(out_ch + 2 + time_dim * 2, out_ch, bias=False), nn.BatchNorm1d(out_ch), nn.Dropout(dropout, inplace=True), nn.PReLU(), nn.Linear(out_ch, out_ch, bias=False), ) self.map_t = nn.Sequential(nn.Linear(out_ch + 2 + time_dim * 2, out_ch, bias=False), nn.BatchNorm1d(out_ch), nn.Dropout(dropout, inplace=True), nn.PReLU(), nn.Linear(out_ch, out_ch, bias=False), ) self.prelu1 = nn.Sequential(nn.BatchNorm2d(out_ch), nn.PReLU(), ) self.prelu2 = nn.Sequential(nn.BatchNorm2d(out_ch), nn.PReLU(), ) def _get_stats_(self, x): global_avg_pool = x.mean((3, 2)).mean(1, keepdims=True) global_avg_pool_features = x.mean(3).mean(1) global_std_pool = x.std((3, 2)).std(1, keepdims=True) global_std_pool_features = x.std(3).std(1) return torch.cat(( global_avg_pool, global_avg_pool_features, global_std_pool, global_std_pool_features, ), dim=1) def forward(self, x): b, dim, seq, joints = x.shape # 64, 3, 10, 22 xn = self.global_norm(x) stats = self._get_stats_(xn) w1 = torch.cat((self.conv_s(xn).view(b, -1), stats), dim=1) stats = self._get_stats_(xn) w2 = torch.cat((self.conv_t(xn).view(b, -1), stats), dim=1) self.w1 = self.map_s(w1) self.w2 = self.map_t(w2) w1 = self.w1[..., None, None] w2 = self.w2[..., None, None] x1 = self.dsgn(xn) x2 = self.tsgn(xn) out = torch.cat((self.prelu1(w1 * x1), self.prelu2(w2 * x2)), dim=1) out = self.compressor(out) return torch.clip(out + self.residual(xn), -1e5, 1e5) class ContextLayer(nn.Module): def __init__(self, in_ch, hidden_ch, output_seq, input_seq, joints, dims=3, reduction=8, dropout=0.1, ): super(ContextLayer, self).__init__() self.n_output = output_seq self.n_joints = joints self.n_input = input_seq self.context_conv1 = nn.Sequential(nn.Conv2d(in_ch, hidden_ch, 1, bias=False), nn.BatchNorm2d(hidden_ch), nn.PReLU(), ) self.context_conv2 = nn.Sequential(nn.Conv2d(in_ch, hidden_ch, (input_seq, 1), bias=False), nn.BatchNorm2d(hidden_ch), nn.PReLU(), ) self.context_conv3 = nn.Sequential(nn.Conv2d(in_ch, hidden_ch, 1, bias=False), nn.BatchNorm2d(hidden_ch), nn.PReLU(), ) self.map1 = nn.Sequential(nn.Linear(hidden_ch, self.n_output, bias=False), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.map2 = nn.Sequential(nn.Linear(hidden_ch, self.n_output, bias=False), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.map3 = nn.Sequential(nn.Linear(hidden_ch, self.n_output, bias=False), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.fmap_s = nn.Sequential(nn.Linear(self.n_output * 3, self.n_joints, bias=False), nn.BatchNorm1d(self.n_joints), nn.Dropout(dropout, inplace=True), ) self.fmap_t = nn.Sequential(nn.Linear(self.n_output * 3, self.n_output, bias=False), nn.BatchNorm1d(self.n_output), nn.Dropout(dropout, inplace=True), ) # inter_ch = self.n_joints # // 2 self.norm_map = nn.Sequential(nn.Conv1d(self.n_output, self.n_output, 1, bias=False), nn.BatchNorm1d(self.n_output), nn.Dropout(dropout, inplace=True), nn.PReLU(), SE.SELayer1d(self.n_output, reduction=reduction), nn.Conv1d(self.n_output, self.n_output, 1, bias=False), nn.BatchNorm1d(self.n_output), nn.Dropout(dropout, inplace=True), nn.PReLU(), ) self.fconv = nn.Sequential(nn.Conv2d(1, dims, 1, bias=False), nn.BatchNorm2d(dims), nn.PReLU(), nn.Conv2d(dims, dims, 1, bias=False), nn.BatchNorm2d(dims), nn.PReLU(), ) self.SE = SE.SELayer2d(self.n_output, reduction=reduction) def forward(self, x): b, _, seq, joint_dim = x.shape y1 = self.context_conv1(x).max(-1)[0].max(-1)[0] y2 = self.context_conv2(x).view(b, -1, joint_dim).max(-1)[0] ym = self.context_conv3(x).mean((2, 3)) y = torch.cat((self.map1(y1), self.map2(y2), self.map3(ym)), dim=1) self.joints = self.fmap_s(y) self.displacements = self.fmap_t(y) # .cumsum(1) self.seq_joints = torch.bmm(self.displacements.unsqueeze(2), self.joints.unsqueeze(1)) self.seq_joints_n = self.norm_map(self.seq_joints) self.seq_joints_dims = self.fconv(self.seq_joints_n.view(b, 1, self.n_output, self.n_joints)) o = self.SE(self.seq_joints_dims.permute(0, 2, 3, 1)) return o class CISTGCN(nn.Module): """ Shape: - Input[0]: Input sequence in :math:`(N, in_ch,T_in, V)` format - Output[0]: Output sequence in :math:`(N,T_out,in_ch, V)` format where :math:`N` is a batch size, :math:`T_{in}/T_{out}` is a length of input/output sequence, :math:`V` is the number of graph nodes. :in_ch=number of channels for the coordiantes(default=3) + """ def __init__(self, arch, learn): super(CISTGCN, self).__init__() self.clipping = arch.model_params.clipping self.n_input = arch.model_params.input_n self.n_output = arch.model_params.output_n self.n_joints = arch.model_params.joints self.n_txcnn_layers = arch.model_params.n_txcnn_layers self.txc_kernel_size = [arch.model_params.txc_kernel_size] * 2 self.input_gcn = arch.model_params.input_gcn self.output_gcn = arch.model_params.output_gcn self.reduction = arch.model_params.reduction self.hidden_dim = arch.model_params.hidden_dim self.st_gcnns = nn.ModuleList() self.txcnns = nn.ModuleList() self.se = nn.ModuleList() self.in_conv = nn.ModuleList() self.context_layer = nn.ModuleList() self.trans = nn.ModuleList() self.in_ch = 10 self.model_tx = self.input_gcn.model_complexity.copy() self.model_tx.insert(0, 1) # add 1 in the position 0. self.input_gcn.model_complexity.insert(0, self.in_ch) self.input_gcn.model_complexity.append(self.in_ch) # self.input_gcn.interpretable.insert(0, True) # self.input_gcn.interpretable.append(False) for i in range(len(self.input_gcn.model_complexity) - 1): self.st_gcnns.append(DSTD_GC(self.input_gcn.model_complexity[i], self.input_gcn.model_complexity[i + 1], self.input_gcn.interpretable[i], [1, 1], 1, self.n_input, self.n_joints, self.reduction, learn.dropout)) self.context_layer = ContextLayer(1, self.hidden_dim, self.n_output, self.n_output, self.n_joints, 3, self.reduction, learn.dropout ) # at this point, we must permute the dimensions of the gcn network, from (N,C,T,V) into (N,T,C,V) # with kernel_size[3,3] the dimensions of C,V will be maintained self.txcnns.append(FPN(self.n_input, self.n_output, self.txc_kernel_size, 0., self.reduction)) for i in range(1, self.n_txcnn_layers): self.txcnns.append(FPN(self.n_output, self.n_output, self.txc_kernel_size, 0., self.reduction)) self.prelus = nn.ModuleList() for j in range(self.n_txcnn_layers): self.prelus.append(nn.PReLU()) self.dim_conversor = nn.Sequential(nn.Conv2d(self.in_ch, 3, 1, bias=False), nn.BatchNorm2d(3), nn.PReLU(), nn.Conv2d(3, 3, 1, bias=False), nn.PReLU(3), ) self.st_gcnns_o = nn.ModuleList() self.output_gcn.model_complexity.insert(0, 3) for i in range(len(self.output_gcn.model_complexity) - 1): self.st_gcnns_o.append(DSTD_GC(self.output_gcn.model_complexity[i], self.output_gcn.model_complexity[i + 1], self.output_gcn.interpretable[i], [1, 1], 1, self.n_joints, self.n_output, self.reduction, learn.dropout)) self.st_gcnns_o.apply(self._init_weights) self.st_gcnns.apply(self._init_weights) self.txcnns.apply(self._init_weights) def _init_weights(self, m, gain=0.1): if isinstance(m, nn.Linear): torch.nn.init.xavier_uniform_(m.weight, gain=gain) # if isinstance(m, (nn.Conv2d, nn.Conv1d)): # torch.nn.init.xavier_normal_(m.weight, gain=gain) if isinstance(m, nn.PReLU): torch.nn.init.constant_(m.weight, 0.25) def forward(self, x): b, seq, joints, dim = x.shape vel = torch.zeros_like(x) vel[:, :-1] = torch.diff(x, dim=1) vel[:, -1] = x[:, -1] acc = torch.zeros_like(x) acc[:, :-1] = torch.diff(vel, dim=1) acc[:, -1] = vel[:, -1] x1 = torch.cat((x, acc, vel, torch.norm(vel, dim=-1, keepdim=True)), dim=-1) x2 = x1.permute((0, 3, 1, 2)) # (torch.Size([64, 10, 22, 7]) x3 = x2 for i in range(len(self.st_gcnns)): x3 = self.st_gcnns[i](x3) x5 = x3.permute(0, 2, 1, 3) # prepare the input for the Time-Extrapolator-CNN (NCTV->NTCV) x6 = self.prelus[0](self.txcnns[0](x5)) for i in range(1, self.n_txcnn_layers): x6 = self.prelus[i](self.txcnns[i](x6)) + x6 # residual connection x6 = self.dim_conversor(x6.permute(0, 2, 1, 3)).permute(0, 2, 3, 1) x7 = x6.cumsum(1) act = self.context_layer(x7.reshape(b, 1, self.n_output, joints * x7.shape[-1])) x8 = x7.permute(0, 3, 2, 1) for i in range(len(self.st_gcnns_o)): x8 = self.st_gcnns_o[i](x8) x9 = x8.permute(0, 3, 2, 1) + act return x[:, -1:] + x9,