import torch import torch.nn as nn import torch.nn.functional as F from openrec.modeling.common import Activation NET_CONFIG_det = { 'blocks2': # k, in_c, out_c, s, use_se [[3, 16, 32, 1, False]], 'blocks3': [[3, 32, 64, 2, False], [3, 64, 64, 1, False]], 'blocks4': [[3, 64, 128, 2, False], [3, 128, 128, 1, False]], 'blocks5': [ [3, 128, 256, 2, False], [5, 256, 256, 1, False], [5, 256, 256, 1, False], [5, 256, 256, 1, False], [5, 256, 256, 1, False], ], 'blocks6': [ [5, 256, 512, 2, True], [5, 512, 512, 1, True], [5, 512, 512, 1, False], [5, 512, 512, 1, False], ], } NET_CONFIG_rec = { 'blocks2': # k, in_c, out_c, s, use_se [[3, 16, 32, 1, False]], 'blocks3': [[3, 32, 64, 1, False], [3, 64, 64, 1, False]], 'blocks4': [[3, 64, 128, (2, 1), False], [3, 128, 128, 1, False]], 'blocks5': [ [3, 128, 256, (1, 2), False], [5, 256, 256, 1, False], [5, 256, 256, 1, False], [5, 256, 256, 1, False], [5, 256, 256, 1, False], ], 'blocks6': [ [5, 256, 512, (2, 1), True], [5, 512, 512, 1, True], [5, 512, 512, (2, 1), False], [5, 512, 512, 1, False], ], } def make_divisible(v, divisor=16, min_value=None): if min_value is None: min_value = divisor new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) if new_v < 0.9 * v: new_v += divisor return new_v class LearnableAffineBlock(nn.Module): def __init__(self, scale_value=1.0, bias_value=0.0, lr_mult=1.0, lab_lr=0.1): super().__init__() self.scale = nn.Parameter(torch.Tensor([scale_value])) self.bias = nn.Parameter(torch.Tensor([bias_value])) def forward(self, x): return self.scale * x + self.bias class ConvBNLayer(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, stride, groups=1, lr_mult=1.0): super().__init__() self.conv = nn.Conv2d( in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=(kernel_size - 1) // 2, groups=groups, bias=False, ) self.bn = nn.BatchNorm2d(out_channels) def forward(self, x): x = self.conv(x) x = self.bn(x) return x class Act(nn.Module): def __init__(self, act='hard_swish', lr_mult=1.0, lab_lr=0.1): super().__init__() assert act in ['hard_swish', 'relu'] self.act = Activation(act) self.lab = LearnableAffineBlock(lr_mult=lr_mult, lab_lr=lab_lr) def forward(self, x): return self.lab(self.act(x)) class LearnableRepLayer(nn.Module): def __init__( self, in_channels, out_channels, kernel_size, stride=1, groups=1, num_conv_branches=1, lr_mult=1.0, lab_lr=0.1, ): super().__init__() self.is_repped = False self.groups = groups self.stride = stride self.kernel_size = kernel_size self.in_channels = in_channels self.out_channels = out_channels self.num_conv_branches = num_conv_branches self.padding = (kernel_size - 1) // 2 self.identity = (nn.BatchNorm2d(in_channels) if out_channels == in_channels and stride == 1 else None) self.conv_kxk = nn.ModuleList([ ConvBNLayer( in_channels, out_channels, kernel_size, stride, groups=groups, lr_mult=lr_mult, ) for _ in range(self.num_conv_branches) ]) self.conv_1x1 = (ConvBNLayer(in_channels, out_channels, 1, stride, groups=groups, lr_mult=lr_mult) if kernel_size > 1 else None) self.lab = LearnableAffineBlock(lr_mult=lr_mult, lab_lr=lab_lr) self.act = Act(lr_mult=lr_mult, lab_lr=lab_lr) def forward(self, x): # for export if self.is_repped: out = self.lab(self.reparam_conv(x)) if self.stride != 2: out = self.act(out) return out out = 0 if self.identity is not None: out += self.identity(x) if self.conv_1x1 is not None: out += self.conv_1x1(x) for conv in self.conv_kxk: out += conv(x) out = self.lab(out) if self.stride != 2: out = self.act(out) return out def rep(self): if self.is_repped: return kernel, bias = self._get_kernel_bias() self.reparam_conv = nn.Conv2d( in_channels=self.in_channels, out_channels=self.out_channels, kernel_size=self.kernel_size, stride=self.stride, padding=self.padding, groups=self.groups, ) self.reparam_conv.weight.data = kernel self.reparam_conv.bias.data = bias self.is_repped = True def _pad_kernel_1x1_to_kxk(self, kernel1x1, pad): if not isinstance(kernel1x1, torch.Tensor): return 0 else: return nn.functional.pad(kernel1x1, [pad, pad, pad, pad]) def _get_kernel_bias(self): kernel_conv_1x1, bias_conv_1x1 = self._fuse_bn_tensor(self.conv_1x1) kernel_conv_1x1 = self._pad_kernel_1x1_to_kxk(kernel_conv_1x1, self.kernel_size // 2) kernel_identity, bias_identity = self._fuse_bn_tensor(self.identity) kernel_conv_kxk = 0 bias_conv_kxk = 0 for conv in self.conv_kxk: kernel, bias = self._fuse_bn_tensor(conv) kernel_conv_kxk += kernel bias_conv_kxk += bias kernel_reparam = kernel_conv_kxk + kernel_conv_1x1 + kernel_identity bias_reparam = bias_conv_kxk + bias_conv_1x1 + bias_identity return kernel_reparam, bias_reparam def _fuse_bn_tensor(self, branch): if not branch: return 0, 0 elif isinstance(branch, ConvBNLayer): kernel = branch.conv.weight running_mean = branch.bn.running_mean running_var = branch.bn.running_var gamma = branch.bn.weight beta = branch.bn.bias eps = branch.bn.eps else: assert isinstance(branch, nn.BatchNorm2d) if not hasattr(self, 'id_tensor'): input_dim = self.in_channels // self.groups kernel_value = torch.zeros( (self.in_channels, input_dim, self.kernel_size, self.kernel_size), dtype=branch.weight.dtype, ) for i in range(self.in_channels): kernel_value[i, i % input_dim, self.kernel_size // 2, self.kernel_size // 2] = 1 self.id_tensor = kernel_value kernel = self.id_tensor running_mean = branch.running_mean running_var = branch.running_var gamma = branch.weight beta = branch.bias eps = branch.eps std = (running_var + eps).sqrt() t = (gamma / std).reshape((-1, 1, 1, 1)) return kernel * t, beta - running_mean * gamma / std class SELayer(nn.Module): def __init__(self, channel, reduction=4, lr_mult=1.0): super().__init__() self.avg_pool = nn.AdaptiveAvgPool2d(1) self.conv1 = nn.Conv2d( in_channels=channel, out_channels=channel // reduction, kernel_size=1, stride=1, padding=0, ) self.relu = nn.ReLU() self.conv2 = nn.Conv2d( in_channels=channel // reduction, out_channels=channel, kernel_size=1, stride=1, padding=0, ) self.hardsigmoid = Activation('hard_sigmoid') def forward(self, x): identity = x x = self.avg_pool(x) x = self.conv1(x) x = self.relu(x) x = self.conv2(x) x = self.hardsigmoid(x) x = x * identity return x class LCNetV3Block(nn.Module): def __init__( self, in_channels, out_channels, stride, dw_size, use_se=False, conv_kxk_num=4, lr_mult=1.0, lab_lr=0.1, ): super().__init__() self.use_se = use_se self.dw_conv = LearnableRepLayer( in_channels=in_channels, out_channels=in_channels, kernel_size=dw_size, stride=stride, groups=in_channels, num_conv_branches=conv_kxk_num, lr_mult=lr_mult, lab_lr=lab_lr, ) if use_se: self.se = SELayer(in_channels, lr_mult=lr_mult) self.pw_conv = LearnableRepLayer( in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, num_conv_branches=conv_kxk_num, lr_mult=lr_mult, lab_lr=lab_lr, ) def forward(self, x): x = self.dw_conv(x) if self.use_se: x = self.se(x) x = self.pw_conv(x) return x class PPLCNetV3(nn.Module): def __init__(self, scale=1.0, conv_kxk_num=4, lr_mult_list=[1.0, 1.0, 1.0, 1.0, 1.0, 1.0], lab_lr=0.1, det=False, **kwargs): super().__init__() self.scale = scale self.lr_mult_list = lr_mult_list self.det = det self.net_config = NET_CONFIG_det if self.det else NET_CONFIG_rec assert isinstance( self.lr_mult_list, (list, tuple )), 'lr_mult_list should be in (list, tuple) but got {}'.format( type(self.lr_mult_list)) assert len(self.lr_mult_list ) == 6, 'lr_mult_list length should be 6 but got {}'.format( len(self.lr_mult_list)) self.conv1 = ConvBNLayer( in_channels=3, out_channels=make_divisible(16 * scale), kernel_size=3, stride=2, lr_mult=self.lr_mult_list[0], ) self.blocks2 = nn.Sequential(*[ LCNetV3Block( in_channels=make_divisible(in_c * scale), out_channels=make_divisible(out_c * scale), dw_size=k, stride=s, use_se=se, conv_kxk_num=conv_kxk_num, lr_mult=self.lr_mult_list[1], lab_lr=lab_lr, ) for i, (k, in_c, out_c, s, se) in enumerate(self.net_config['blocks2']) ]) self.blocks3 = nn.Sequential(*[ LCNetV3Block( in_channels=make_divisible(in_c * scale), out_channels=make_divisible(out_c * scale), dw_size=k, stride=s, use_se=se, conv_kxk_num=conv_kxk_num, lr_mult=self.lr_mult_list[2], lab_lr=lab_lr, ) for i, (k, in_c, out_c, s, se) in enumerate(self.net_config['blocks3']) ]) self.blocks4 = nn.Sequential(*[ LCNetV3Block( in_channels=make_divisible(in_c * scale), out_channels=make_divisible(out_c * scale), dw_size=k, stride=s, use_se=se, conv_kxk_num=conv_kxk_num, lr_mult=self.lr_mult_list[3], lab_lr=lab_lr, ) for i, (k, in_c, out_c, s, se) in enumerate(self.net_config['blocks4']) ]) self.blocks5 = nn.Sequential(*[ LCNetV3Block( in_channels=make_divisible(in_c * scale), out_channels=make_divisible(out_c * scale), dw_size=k, stride=s, use_se=se, conv_kxk_num=conv_kxk_num, lr_mult=self.lr_mult_list[4], lab_lr=lab_lr, ) for i, (k, in_c, out_c, s, se) in enumerate(self.net_config['blocks5']) ]) self.blocks6 = nn.Sequential(*[ LCNetV3Block( in_channels=make_divisible(in_c * scale), out_channels=make_divisible(out_c * scale), dw_size=k, stride=s, use_se=se, conv_kxk_num=conv_kxk_num, lr_mult=self.lr_mult_list[5], lab_lr=lab_lr, ) for i, (k, in_c, out_c, s, se) in enumerate(self.net_config['blocks6']) ]) self.out_channels = make_divisible(512 * scale) if self.det: mv_c = [16, 24, 56, 480] self.out_channels = [ make_divisible(self.net_config['blocks3'][-1][2] * scale), make_divisible(self.net_config['blocks4'][-1][2] * scale), make_divisible(self.net_config['blocks5'][-1][2] * scale), make_divisible(self.net_config['blocks6'][-1][2] * scale), ] self.layer_list = nn.ModuleList([ nn.Conv2d(self.out_channels[0], int(mv_c[0] * scale), 1, 1, 0), nn.Conv2d(self.out_channels[1], int(mv_c[1] * scale), 1, 1, 0), nn.Conv2d(self.out_channels[2], int(mv_c[2] * scale), 1, 1, 0), nn.Conv2d(self.out_channels[3], int(mv_c[3] * scale), 1, 1, 0), ]) self.out_channels = [ int(mv_c[0] * scale), int(mv_c[1] * scale), int(mv_c[2] * scale), int(mv_c[3] * scale), ] def forward(self, x): out_list = [] x = self.conv1(x) x = self.blocks2(x) x = self.blocks3(x) out_list.append(x) x = self.blocks4(x) out_list.append(x) x = self.blocks5(x) out_list.append(x) x = self.blocks6(x) out_list.append(x) if self.det: out_list[0] = self.layer_list[0](out_list[0]) out_list[1] = self.layer_list[1](out_list[1]) out_list[2] = self.layer_list[2](out_list[2]) out_list[3] = self.layer_list[3](out_list[3]) return out_list if self.training: x = F.adaptive_avg_pool2d(x, [1, 40]) else: x = F.avg_pool2d(x, [3, 2]) return x