Metric3D_GPU / mono /model /decode_heads /RAFTDepthNormalDPTDecoder5.py
JUGGHM's picture
Update mono/model/decode_heads/RAFTDepthNormalDPTDecoder5.py
6b09c9a
raw
history blame
44.5 kB
import torch
import torch.nn as nn
import numpy as np
import math
import torch.nn.functional as F
# LORA finetuning originally by edwardjhu
class LoRALayer():
def __init__(
self,
r: int,
lora_alpha: int,
lora_dropout: float,
merge_weights: bool,
):
self.r = r
self.lora_alpha = lora_alpha
# Optional dropout
if lora_dropout > 0.:
self.lora_dropout = nn.Dropout(p=lora_dropout)
else:
self.lora_dropout = lambda x: x
# Mark the weight as unmerged
self.merged = False
self.merge_weights = merge_weights
class LoRALinear(nn.Linear, LoRALayer):
# LoRA implemented in a dense layer
def __init__(
self,
in_features: int,
out_features: int,
r: int = 0,
lora_alpha: int = 1,
lora_dropout: float = 0.,
fan_in_fan_out: bool = False, # Set this to True if the layer to replace stores weight like (fan_in, fan_out)
merge_weights: bool = True,
**kwargs
):
nn.Linear.__init__(self, in_features, out_features, **kwargs)
LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout,
merge_weights=merge_weights)
self.fan_in_fan_out = fan_in_fan_out
# Actual trainable parameters
if r > 0:
self.lora_A = nn.Parameter(self.weight.new_zeros((r, in_features)))
self.lora_B = nn.Parameter(self.weight.new_zeros((out_features, r)))
self.scaling = self.lora_alpha / self.r
# Freezing the pre-trained weight matrix
self.weight.requires_grad = False
self.reset_parameters()
if fan_in_fan_out:
self.weight.data = self.weight.data.transpose(0, 1)
def reset_parameters(self):
#nn.Linear.reset_parameters(self)
if hasattr(self, 'lora_A'):
# initialize B the same way as the default for nn.Linear and A to zero
# this is different than what is described in the paper but should not affect performance
nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
nn.init.zeros_(self.lora_B)
# def train(self, mode: bool = True):
# def T(w):
# return w.transpose(0, 1) if self.fan_in_fan_out else w
# nn.Linear.train(self, mode)
# if mode:
# if self.merge_weights and self.merged:
# # Make sure that the weights are not merged
# if self.r > 0:
# self.weight.data -= T(self.lora_B @ self.lora_A) * self.scaling
# self.merged = False
# else:
# if self.merge_weights and not self.merged:
# # Merge the weights and mark it
# if self.r > 0:
# self.weight.data += T(self.lora_B @ self.lora_A) * self.scaling
# self.merged = True
def forward(self, x: torch.Tensor):
def T(w):
return w.transpose(0, 1) if self.fan_in_fan_out else w
if self.r > 0 and not self.merged:
result = F.linear(x, T(self.weight), bias=self.bias)
result += (self.lora_dropout(x) @ self.lora_A.transpose(0, 1) @ self.lora_B.transpose(0, 1)) * self.scaling
return result
else:
return F.linear(x, T(self.weight), bias=self.bias)
class ConvLoRA(nn.Conv2d, LoRALayer):
def __init__(self, in_channels, out_channels, kernel_size, r=0, lora_alpha=1, lora_dropout=0., merge_weights=True, **kwargs):
#self.conv = conv_module(in_channels, out_channels, kernel_size, **kwargs)
nn.Conv2d.__init__(self, in_channels, out_channels, kernel_size, **kwargs)
LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, merge_weights=merge_weights)
assert isinstance(kernel_size, int)
# Actual trainable parameters
if r > 0:
self.lora_A = nn.Parameter(
self.weight.new_zeros((r * kernel_size, in_channels * kernel_size))
)
self.lora_B = nn.Parameter(
self.weight.new_zeros((out_channels//self.groups*kernel_size, r*kernel_size))
)
self.scaling = self.lora_alpha / self.r
# Freezing the pre-trained weight matrix
self.weight.requires_grad = False
self.reset_parameters()
self.merged = False
def reset_parameters(self):
#self.conv.reset_parameters()
if hasattr(self, 'lora_A'):
# initialize A the same way as the default for nn.Linear and B to zero
nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
nn.init.zeros_(self.lora_B)
# def train(self, mode=True):
# super(ConvLoRA, self).train(mode)
# if mode:
# if self.merge_weights and self.merged:
# if self.r > 0:
# # Make sure that the weights are not merged
# self.conv.weight.data -= (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling
# self.merged = False
# else:
# if self.merge_weights and not self.merged:
# if self.r > 0:
# # Merge the weights and mark it
# self.conv.weight.data += (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling
# self.merged = True
def forward(self, x):
if self.r > 0 and not self.merged:
# return self.conv._conv_forward(
# x,
# self.conv.weight + (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling,
# self.conv.bias
# )
weight = self.weight + (self.lora_B @ self.lora_A).view(self.weight.shape) * self.scaling
bias = self.bias
return F.conv2d(x, weight, bias=bias, stride=self.stride, padding=self.padding, dilation=self.dilation, groups=self.groups)
else:
return F.conv2d(x, self.weight, bias=self.bias, stride=self.stride, padding=self.padding, dilation=self.dilation, groups=self.groups)
class ConvTransposeLoRA(nn.ConvTranspose2d, LoRALayer):
def __init__(self, in_channels, out_channels, kernel_size, r=0, lora_alpha=1, lora_dropout=0., merge_weights=True, **kwargs):
#self.conv = conv_module(in_channels, out_channels, kernel_size, **kwargs)
nn.ConvTranspose2d.__init__(self, in_channels, out_channels, kernel_size, **kwargs)
LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, merge_weights=merge_weights)
assert isinstance(kernel_size, int)
# Actual trainable parameters
if r > 0:
self.lora_A = nn.Parameter(
self.weight.new_zeros((r * kernel_size, in_channels * kernel_size))
)
self.lora_B = nn.Parameter(
self.weight.new_zeros((out_channels//self.groups*kernel_size, r*kernel_size))
)
self.scaling = self.lora_alpha / self.r
# Freezing the pre-trained weight matrix
self.weight.requires_grad = False
self.reset_parameters()
self.merged = False
def reset_parameters(self):
#self.conv.reset_parameters()
if hasattr(self, 'lora_A'):
# initialize A the same way as the default for nn.Linear and B to zero
nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
nn.init.zeros_(self.lora_B)
# def train(self, mode=True):
# super(ConvTransposeLoRA, self).train(mode)
# if mode:
# if self.merge_weights and self.merged:
# if self.r > 0:
# # Make sure that the weights are not merged
# self.conv.weight.data -= (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling
# self.merged = False
# else:
# if self.merge_weights and not self.merged:
# if self.r > 0:
# # Merge the weights and mark it
# self.conv.weight.data += (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling
# self.merged = True
def forward(self, x):
if self.r > 0 and not self.merged:
weight = self.weight + (self.lora_B @ self.lora_A).view(self.weight.shape) * self.scaling
bias = self.bias
return F.conv_transpose2d(x, weight,
bias=bias, stride=self.stride, padding=self.padding, output_padding=self.output_padding,
groups=self.groups, dilation=self.dilation)
else:
return F.conv_transpose2d(x, self.weight,
bias=self.bias, stride=self.stride, padding=self.padding, output_padding=self.output_padding,
groups=self.groups, dilation=self.dilation)
#return self.conv(x)
class Conv2dLoRA(ConvLoRA):
def __init__(self, *args, **kwargs):
super(Conv2dLoRA, self).__init__(*args, **kwargs)
class ConvTranspose2dLoRA(ConvTransposeLoRA):
def __init__(self, *args, **kwargs):
super(ConvTranspose2dLoRA, self).__init__(*args, **kwargs)
def compute_depth_expectation(prob, depth_values):
depth_values = depth_values.view(*depth_values.shape, 1, 1)
depth = torch.sum(prob * depth_values, 1)
return depth
def interpolate_float32(x, size=None, scale_factor=None, mode='nearest', align_corners=None):
#with torch.autocast(device_type='cuda', dtype=torch.bfloat16, enabled=False):
return F.interpolate(x.float(), size=size, scale_factor=scale_factor, mode=mode, align_corners=align_corners)
# def upflow8(flow, mode='bilinear'):
# new_size = (8 * flow.shape[2], 8 * flow.shape[3])
# return 8 * F.interpolate(flow, size=new_size, mode=mode, align_corners=True)
def upflow4(flow, mode='bilinear'):
new_size = (4 * flow.shape[2], 4 * flow.shape[3])
#with torch.autocast(device_type='cuda', dtype=torch.bfloat16, enabled=False):
return F.interpolate(flow, size=new_size, mode=mode, align_corners=True)
def coords_grid(batch, ht, wd):
# coords = torch.meshgrid(torch.arange(ht), torch.arange(wd))
coords = (torch.zeros((ht, wd)), torch.zeros((ht, wd)), torch.zeros((ht, wd)), torch.zeros((ht, wd)), torch.zeros((ht, wd)), torch.zeros((ht, wd)))
coords = torch.stack(coords[::-1], dim=0).float()
return coords[None].repeat(batch, 1, 1, 1)
def norm_normalize(norm_out):
min_kappa = 0.01
norm_x, norm_y, norm_z, kappa = torch.split(norm_out, 1, dim=1)
norm = torch.sqrt(norm_x ** 2.0 + norm_y ** 2.0 + norm_z ** 2.0) + 1e-10
kappa = F.elu(kappa) + 1.0 + min_kappa
final_out = torch.cat([norm_x / norm, norm_y / norm, norm_z / norm, kappa], dim=1)
return final_out
# uncertainty-guided sampling (only used during training)
@torch.no_grad()
def sample_points(init_normal, gt_norm_mask, sampling_ratio, beta):
device = init_normal.device
B, _, H, W = init_normal.shape
N = int(sampling_ratio * H * W)
beta = beta
# uncertainty map
uncertainty_map = -1 * init_normal[:, -1, :, :] # B, H, W
# gt_invalid_mask (B, H, W)
if gt_norm_mask is not None:
gt_invalid_mask = F.interpolate(gt_norm_mask.float(), size=[H, W], mode='nearest')
gt_invalid_mask = gt_invalid_mask[:, 0, :, :] < 0.5
uncertainty_map[gt_invalid_mask] = -1e4
# (B, H*W)
_, idx = uncertainty_map.view(B, -1).sort(1, descending=True)
# importance sampling
if int(beta * N) > 0:
importance = idx[:, :int(beta * N)] # B, beta*N
# remaining
remaining = idx[:, int(beta * N):] # B, H*W - beta*N
# coverage
num_coverage = N - int(beta * N)
if num_coverage <= 0:
samples = importance
else:
coverage_list = []
for i in range(B):
idx_c = torch.randperm(remaining.size()[1]) # shuffles "H*W - beta*N"
coverage_list.append(remaining[i, :][idx_c[:num_coverage]].view(1, -1)) # 1, N-beta*N
coverage = torch.cat(coverage_list, dim=0) # B, N-beta*N
samples = torch.cat((importance, coverage), dim=1) # B, N
else:
# remaining
remaining = idx[:, :] # B, H*W
# coverage
num_coverage = N
coverage_list = []
for i in range(B):
idx_c = torch.randperm(remaining.size()[1]) # shuffles "H*W - beta*N"
coverage_list.append(remaining[i, :][idx_c[:num_coverage]].view(1, -1)) # 1, N-beta*N
coverage = torch.cat(coverage_list, dim=0) # B, N-beta*N
samples = coverage
# point coordinates
rows_int = samples // W # 0 for first row, H-1 for last row
rows_float = rows_int / float(H-1) # 0 to 1.0
rows_float = (rows_float * 2.0) - 1.0 # -1.0 to 1.0
cols_int = samples % W # 0 for first column, W-1 for last column
cols_float = cols_int / float(W-1) # 0 to 1.0
cols_float = (cols_float * 2.0) - 1.0 # -1.0 to 1.0
point_coords = torch.zeros(B, 1, N, 2)
point_coords[:, 0, :, 0] = cols_float # x coord
point_coords[:, 0, :, 1] = rows_float # y coord
point_coords = point_coords.to(device)
return point_coords, rows_int, cols_int
class FlowHead(nn.Module):
def __init__(self, input_dim=128, hidden_dim=256, output_dim_depth=2, output_dim_norm=4, tuning_mode=None):
super(FlowHead, self).__init__()
self.conv1d = Conv2dLoRA(input_dim, hidden_dim // 2, 3, padding=1, r = 8 if tuning_mode == 'lora' else 0)
self.conv2d = Conv2dLoRA(hidden_dim // 2, output_dim_depth, 3, padding=1, r = 8 if tuning_mode == 'lora' else 0)
self.conv1n = Conv2dLoRA(input_dim, hidden_dim // 2, 3, padding=1, r = 8 if tuning_mode == 'lora' else 0)
self.conv2n = Conv2dLoRA(hidden_dim // 2, output_dim_norm, 3, padding=1, r = 8 if tuning_mode == 'lora' else 0)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
depth = self.conv2d(self.relu(self.conv1d(x)))
normal = self.conv2n(self.relu(self.conv1n(x)))
return torch.cat((depth, normal), dim=1)
class ConvGRU(nn.Module):
def __init__(self, hidden_dim, input_dim, kernel_size=3, tuning_mode=None):
super(ConvGRU, self).__init__()
self.convz = Conv2dLoRA(hidden_dim+input_dim, hidden_dim, kernel_size, padding=kernel_size//2, r = 8 if tuning_mode == 'lora' else 0)
self.convr = Conv2dLoRA(hidden_dim+input_dim, hidden_dim, kernel_size, padding=kernel_size//2, r = 8 if tuning_mode == 'lora' else 0)
self.convq = Conv2dLoRA(hidden_dim+input_dim, hidden_dim, kernel_size, padding=kernel_size//2, r = 8 if tuning_mode == 'lora' else 0)
def forward(self, h, cz, cr, cq, *x_list):
x = torch.cat(x_list, dim=1)
hx = torch.cat([h, x], dim=1)
z = torch.sigmoid((self.convz(hx) + cz))
r = torch.sigmoid((self.convr(hx) + cr))
q = torch.tanh((self.convq(torch.cat([r*h, x], dim=1)) + cq))
# z = torch.sigmoid((self.convz(hx) + cz).float())
# r = torch.sigmoid((self.convr(hx) + cr).float())
# q = torch.tanh((self.convq(torch.cat([r*h, x], dim=1)) + cq).float())
h = (1-z) * h + z * q
return h
def pool2x(x):
return F.avg_pool2d(x, 3, stride=2, padding=1)
def pool4x(x):
return F.avg_pool2d(x, 5, stride=4, padding=1)
def interp(x, dest):
interp_args = {'mode': 'bilinear', 'align_corners': True}
return interpolate_float32(x, dest.shape[2:], **interp_args)
class BasicMultiUpdateBlock(nn.Module):
def __init__(self, args, hidden_dims=[], out_dims=2, tuning_mode=None):
super().__init__()
self.args = args
self.n_gru_layers = args.model.decode_head.n_gru_layers # 3
self.n_downsample = args.model.decode_head.n_downsample # 3, resolution of the disparity field (1/2^K)
# self.encoder = BasicMotionEncoder(args)
# encoder_output_dim = 128 # if there is corr volume
encoder_output_dim = 6 # no corr volume
self.gru08 = ConvGRU(hidden_dims[2], encoder_output_dim + hidden_dims[1] * (self.n_gru_layers > 1), tuning_mode=tuning_mode)
self.gru16 = ConvGRU(hidden_dims[1], hidden_dims[0] * (self.n_gru_layers == 3) + hidden_dims[2], tuning_mode=tuning_mode)
self.gru32 = ConvGRU(hidden_dims[0], hidden_dims[1], tuning_mode=tuning_mode)
self.flow_head = FlowHead(hidden_dims[2], hidden_dim=2*hidden_dims[2], tuning_mode=tuning_mode)
factor = 2**self.n_downsample
self.mask = nn.Sequential(
Conv2dLoRA(hidden_dims[2], hidden_dims[2], 3, padding=1, r = 8 if tuning_mode == 'lora' else 0),
nn.ReLU(inplace=True),
Conv2dLoRA(hidden_dims[2], (factor**2)*9, 1, padding=0, r = 8 if tuning_mode == 'lora' else 0))
def forward(self, net, inp, corr=None, flow=None, iter08=True, iter16=True, iter32=True, update=True):
if iter32:
net[2] = self.gru32(net[2], *(inp[2]), pool2x(net[1]))
if iter16:
if self.n_gru_layers > 2:
net[1] = self.gru16(net[1], *(inp[1]), interp(pool2x(net[0]), net[1]), interp(net[2], net[1]))
else:
net[1] = self.gru16(net[1], *(inp[1]), interp(pool2x(net[0]), net[1]))
if iter08:
if corr is not None:
motion_features = self.encoder(flow, corr)
else:
motion_features = flow
if self.n_gru_layers > 1:
net[0] = self.gru08(net[0], *(inp[0]), motion_features, interp(net[1], net[0]))
else:
net[0] = self.gru08(net[0], *(inp[0]), motion_features)
if not update:
return net
delta_flow = self.flow_head(net[0])
# scale mask to balence gradients
mask = .25 * self.mask(net[0])
return net, mask, delta_flow
class LayerNorm2d(nn.LayerNorm):
def __init__(self, dim):
super(LayerNorm2d, self).__init__(dim)
def forward(self, x):
x = x.permute(0, 2, 3, 1).contiguous()
x = super(LayerNorm2d, self).forward(x)
x = x.permute(0, 3, 1, 2).contiguous()
return x
class ResidualBlock(nn.Module):
def __init__(self, in_planes, planes, norm_fn='group', stride=1, tuning_mode=None):
super(ResidualBlock, self).__init__()
self.conv1 = Conv2dLoRA(in_planes, planes, kernel_size=3, padding=1, stride=stride, r = 8 if tuning_mode == 'lora' else 0)
self.conv2 = Conv2dLoRA(planes, planes, kernel_size=3, padding=1, r = 8 if tuning_mode == 'lora' else 0)
self.relu = nn.ReLU(inplace=True)
num_groups = planes // 8
if norm_fn == 'group':
self.norm1 = nn.GroupNorm(num_groups=num_groups, num_channels=planes)
self.norm2 = nn.GroupNorm(num_groups=num_groups, num_channels=planes)
if not (stride == 1 and in_planes == planes):
self.norm3 = nn.GroupNorm(num_groups=num_groups, num_channels=planes)
elif norm_fn == 'batch':
self.norm1 = nn.BatchNorm2d(planes)
self.norm2 = nn.BatchNorm2d(planes)
if not (stride == 1 and in_planes == planes):
self.norm3 = nn.BatchNorm2d(planes)
elif norm_fn == 'instance':
self.norm1 = nn.InstanceNorm2d(planes)
self.norm2 = nn.InstanceNorm2d(planes)
if not (stride == 1 and in_planes == planes):
self.norm3 = nn.InstanceNorm2d(planes)
elif norm_fn == 'layer':
self.norm1 = LayerNorm2d(planes)
self.norm2 = LayerNorm2d(planes)
if not (stride == 1 and in_planes == planes):
self.norm3 = LayerNorm2d(planes)
elif norm_fn == 'none':
self.norm1 = nn.Sequential()
self.norm2 = nn.Sequential()
if not (stride == 1 and in_planes == planes):
self.norm3 = nn.Sequential()
if stride == 1 and in_planes == planes:
self.downsample = None
else:
self.downsample = nn.Sequential(
Conv2dLoRA(in_planes, planes, kernel_size=1, stride=stride, r = 8 if tuning_mode == 'lora' else 0), self.norm3)
def forward(self, x):
y = x
y = self.conv1(y)
y = self.norm1(y)
y = self.relu(y)
y = self.conv2(y)
y = self.norm2(y)
y = self.relu(y)
if self.downsample is not None:
x = self.downsample(x)
return self.relu(x+y)
class ContextFeatureEncoder(nn.Module):
'''
Encoder features are used to:
1. initialize the hidden state of the update operator
2. and also injected into the GRU during each iteration of the update operator
'''
def __init__(self, in_dim, output_dim, tuning_mode=None):
'''
in_dim = [x4, x8, x16, x32]
output_dim = [hindden_dims, context_dims]
[[x4,x8,x16,x32],[x4,x8,x16,x32]]
'''
super().__init__()
output_list = []
for dim in output_dim:
conv_out = nn.Sequential(
ResidualBlock(in_dim[0], dim[0], 'layer', stride=1, tuning_mode=tuning_mode),
Conv2dLoRA(dim[0], dim[0], 3, padding=1, r = 8 if tuning_mode == 'lora' else 0))
output_list.append(conv_out)
self.outputs04 = nn.ModuleList(output_list)
output_list = []
for dim in output_dim:
conv_out = nn.Sequential(
ResidualBlock(in_dim[1], dim[1], 'layer', stride=1, tuning_mode=tuning_mode),
Conv2dLoRA(dim[1], dim[1], 3, padding=1, r = 8 if tuning_mode == 'lora' else 0))
output_list.append(conv_out)
self.outputs08 = nn.ModuleList(output_list)
output_list = []
for dim in output_dim:
conv_out = nn.Sequential(
ResidualBlock(in_dim[2], dim[2], 'layer', stride=1, tuning_mode=tuning_mode),
Conv2dLoRA(dim[2], dim[2], 3, padding=1, r = 8 if tuning_mode == 'lora' else 0))
output_list.append(conv_out)
self.outputs16 = nn.ModuleList(output_list)
# output_list = []
# for dim in output_dim:
# conv_out = Conv2dLoRA(in_dim[3], dim[3], 3, padding=1)
# output_list.append(conv_out)
# self.outputs32 = nn.ModuleList(output_list)
def forward(self, encoder_features):
x_4, x_8, x_16, x_32 = encoder_features
outputs04 = [f(x_4) for f in self.outputs04]
outputs08 = [f(x_8) for f in self.outputs08]
outputs16 = [f(x_16)for f in self.outputs16]
# outputs32 = [f(x_32) for f in self.outputs32]
return (outputs04, outputs08, outputs16)
class ConvBlock(nn.Module):
# reimplementation of DPT
def __init__(self, channels, tuning_mode=None):
super(ConvBlock, self).__init__()
self.act = nn.ReLU(inplace=True)
self.conv1 = Conv2dLoRA(
channels,
channels,
kernel_size=3,
stride=1,
padding=1,
r = 8 if tuning_mode == 'lora' else 0
)
self.conv2 = Conv2dLoRA(
channels,
channels,
kernel_size=3,
stride=1,
padding=1,
r = 8 if tuning_mode == 'lora' else 0
)
def forward(self, x):
out = self.act(x)
out = self.conv1(out)
out = self.act(out)
out = self.conv2(out)
return x + out
class FuseBlock(nn.Module):
# reimplementation of DPT
def __init__(self, in_channels, out_channels, fuse=True, upsample=True, scale_factor=2, tuning_mode=None):
super(FuseBlock, self).__init__()
self.fuse = fuse
self.scale_factor = scale_factor
self.way_trunk = ConvBlock(in_channels, tuning_mode=tuning_mode)
if self.fuse:
self.way_branch = ConvBlock(in_channels, tuning_mode=tuning_mode)
self.out_conv = Conv2dLoRA(
in_channels,
out_channels,
kernel_size=1,
stride=1,
padding=0,
r = 8 if tuning_mode == 'lora' else 0
)
self.upsample = upsample
def forward(self, x1, x2=None):
if x2 is not None:
x2 = self.way_branch(x2)
x1 = x1 + x2
out = self.way_trunk(x1)
if self.upsample:
out = interpolate_float32(
out, scale_factor=self.scale_factor, mode="bilinear", align_corners=True
)
out = self.out_conv(out)
return out
class Readout(nn.Module):
# From DPT
def __init__(self, in_features, use_cls_token=True, num_register_tokens=0, tuning_mode=None):
super(Readout, self).__init__()
self.use_cls_token = use_cls_token
if self.use_cls_token == True:
self.project_patch = LoRALinear(in_features, in_features, r = 8 if tuning_mode == 'lora' else 0)
self.project_learn = LoRALinear((1 + num_register_tokens) * in_features, in_features, bias=False, r = 8 if tuning_mode == 'lora' else 0)
self.act = nn.GELU()
else:
self.project = nn.Identity()
def forward(self, x):
if self.use_cls_token == True:
x_patch = self.project_patch(x[0])
x_learn = self.project_learn(x[1])
x_learn = x_learn.expand_as(x_patch).contiguous()
features = x_patch + x_learn
return self.act(features)
else:
return self.project(x)
class Token2Feature(nn.Module):
# From DPT
def __init__(self, vit_channel, feature_channel, scale_factor, use_cls_token=True, num_register_tokens=0, tuning_mode=None):
super(Token2Feature, self).__init__()
self.scale_factor = scale_factor
self.readoper = Readout(in_features=vit_channel, use_cls_token=use_cls_token, num_register_tokens=num_register_tokens, tuning_mode=tuning_mode)
if scale_factor > 1 and isinstance(scale_factor, int):
self.sample = ConvTranspose2dLoRA(r = 8 if tuning_mode == 'lora' else 0,
in_channels=vit_channel,
out_channels=feature_channel,
kernel_size=scale_factor,
stride=scale_factor,
padding=0,
)
elif scale_factor > 1:
self.sample = nn.Sequential(
# Upsample2(upscale=scale_factor),
# nn.Upsample(scale_factor=scale_factor),
Conv2dLoRA(r = 8 if tuning_mode == 'lora' else 0,
in_channels=vit_channel,
out_channels=feature_channel,
kernel_size=1,
stride=1,
padding=0,
),
)
elif scale_factor < 1:
scale_factor = int(1.0 / scale_factor)
self.sample = Conv2dLoRA(r = 8 if tuning_mode == 'lora' else 0,
in_channels=vit_channel,
out_channels=feature_channel,
kernel_size=scale_factor+1,
stride=scale_factor,
padding=1,
)
else:
self.sample = nn.Identity()
def forward(self, x):
x = self.readoper(x)
#if use_cls_token == True:
x = x.permute(0, 3, 1, 2).contiguous()
if isinstance(self.scale_factor, float):
x = interpolate_float32(x.float(), scale_factor=self.scale_factor, mode='nearest')
x = self.sample(x)
return x
class EncoderFeature(nn.Module):
def __init__(self, vit_channel, num_ch_dec=[256, 512, 1024, 1024], use_cls_token=True, num_register_tokens=0, tuning_mode=None):
super(EncoderFeature, self).__init__()
self.vit_channel = vit_channel
self.num_ch_dec = num_ch_dec
self.read_3 = Token2Feature(self.vit_channel, self.num_ch_dec[3], scale_factor=1, use_cls_token=use_cls_token, num_register_tokens=num_register_tokens, tuning_mode=tuning_mode)
self.read_2 = Token2Feature(self.vit_channel, self.num_ch_dec[2], scale_factor=1, use_cls_token=use_cls_token, num_register_tokens=num_register_tokens, tuning_mode=tuning_mode)
self.read_1 = Token2Feature(self.vit_channel, self.num_ch_dec[1], scale_factor=2, use_cls_token=use_cls_token, num_register_tokens=num_register_tokens, tuning_mode=tuning_mode)
self.read_0 = Token2Feature(self.vit_channel, self.num_ch_dec[0], scale_factor=7/2, use_cls_token=use_cls_token, num_register_tokens=num_register_tokens, tuning_mode=tuning_mode)
def forward(self, ref_feature):
x = self.read_3(ref_feature[3]) # 1/14
x2 = self.read_2(ref_feature[2]) # 1/14
x1 = self.read_1(ref_feature[1]) # 1/7
x0 = self.read_0(ref_feature[0]) # 1/4
return x, x2, x1, x0
class DecoderFeature(nn.Module):
def __init__(self, vit_channel, num_ch_dec=[128, 256, 512, 1024, 1024], use_cls_token=True, tuning_mode=None):
super(DecoderFeature, self).__init__()
self.vit_channel = vit_channel
self.num_ch_dec = num_ch_dec
self.upconv_3 = FuseBlock(
self.num_ch_dec[4],
self.num_ch_dec[3],
fuse=False, upsample=False, tuning_mode=tuning_mode)
self.upconv_2 = FuseBlock(
self.num_ch_dec[3],
self.num_ch_dec[2],
tuning_mode=tuning_mode)
self.upconv_1 = FuseBlock(
self.num_ch_dec[2],
self.num_ch_dec[1] + 2,
scale_factor=7/4,
tuning_mode=tuning_mode)
# self.upconv_0 = FuseBlock(
# self.num_ch_dec[1],
# self.num_ch_dec[0] + 1,
# )
def forward(self, ref_feature):
x, x2, x1, x0 = ref_feature # 1/14 1/14 1/7 1/4
x = self.upconv_3(x) # 1/14
x = self.upconv_2(x, x2) # 1/7
x = self.upconv_1(x, x1) # 1/4
# x = self.upconv_0(x, x0) # 4/7
return x
class RAFTDepthNormalDPT5(nn.Module):
def __init__(self, cfg):
super().__init__()
self.in_channels = cfg.model.decode_head.in_channels # [1024, 1024, 1024, 1024]
self.feature_channels = cfg.model.decode_head.feature_channels # [256, 512, 1024, 1024] [2/7, 1/7, 1/14, 1/14]
self.decoder_channels = cfg.model.decode_head.decoder_channels # [128, 256, 512, 1024, 1024] [-, 1/4, 1/7, 1/14, 1/14]
self.use_cls_token = cfg.model.decode_head.use_cls_token
self.up_scale = cfg.model.decode_head.up_scale
self.num_register_tokens = cfg.model.decode_head.num_register_tokens
self.min_val = cfg.data_basic.depth_normalize[0]
self.max_val = cfg.data_basic.depth_normalize[1]
self.regress_scale = 100.0\
try:
tuning_mode = cfg.model.decode_head.tuning_mode
except:
tuning_mode = None
self.tuning_mode = tuning_mode
self.hidden_dims = self.context_dims = cfg.model.decode_head.hidden_channels # [128, 128, 128, 128]
self.n_gru_layers = cfg.model.decode_head.n_gru_layers # 3
self.n_downsample = cfg.model.decode_head.n_downsample # 3, resolution of the disparity field (1/2^K)
self.iters = cfg.model.decode_head.iters # 22
self.slow_fast_gru = cfg.model.decode_head.slow_fast_gru # True
self.num_depth_regressor_anchor = 256 # 512
self.used_res_channel = self.decoder_channels[1] # now, use 2/7 res
self.token2feature = EncoderFeature(self.in_channels[0], self.feature_channels, self.use_cls_token, self.num_register_tokens, tuning_mode=tuning_mode)
self.decoder_mono = DecoderFeature(self.in_channels, self.decoder_channels, tuning_mode=tuning_mode)
self.depth_regressor = nn.Sequential(
Conv2dLoRA(self.used_res_channel,
self.num_depth_regressor_anchor,
kernel_size=3,
padding=1, r = 8 if tuning_mode == 'lora' else 0),
# nn.BatchNorm2d(self.num_depth_regressor_anchor),
nn.ReLU(inplace=True),
Conv2dLoRA(self.num_depth_regressor_anchor,
self.num_depth_regressor_anchor,
kernel_size=1, r = 8 if tuning_mode == 'lora' else 0),
)
self.normal_predictor = nn.Sequential(
Conv2dLoRA(self.used_res_channel,
128,
kernel_size=3,
padding=1, r = 8 if tuning_mode == 'lora' else 0,),
# nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
Conv2dLoRA(128, 128, kernel_size=1, r = 8 if tuning_mode == 'lora' else 0), nn.ReLU(inplace=True),
Conv2dLoRA(128, 128, kernel_size=1, r = 8 if tuning_mode == 'lora' else 0), nn.ReLU(inplace=True),
Conv2dLoRA(128, 3, kernel_size=1, r = 8 if tuning_mode == 'lora' else 0),
)
self.context_feature_encoder = ContextFeatureEncoder(self.feature_channels, [self.hidden_dims, self.context_dims], tuning_mode=tuning_mode)
self.context_zqr_convs = nn.ModuleList([Conv2dLoRA(self.context_dims[i], self.hidden_dims[i]*3, 3, padding=3//2, r = 8 if tuning_mode == 'lora' else 0) for i in range(self.n_gru_layers)])
self.update_block = BasicMultiUpdateBlock(cfg, hidden_dims=self.hidden_dims, out_dims=6, tuning_mode=tuning_mode)
self.relu = nn.ReLU(inplace=True)
def get_bins(self, bins_num):
#depth_bins_vec = torch.linspace(math.log(self.min_val), math.log(self.max_val), bins_num, device="cuda")
depth_bins_vec = torch.linspace(math.log(self.min_val), math.log(self.max_val), bins_num, device="cpu")
depth_bins_vec = torch.exp(depth_bins_vec)
return depth_bins_vec
def register_depth_expectation_anchor(self, bins_num, B):
depth_bins_vec = self.get_bins(bins_num)
depth_bins_vec = depth_bins_vec.unsqueeze(0).repeat(B, 1)
self.register_buffer('depth_expectation_anchor', depth_bins_vec, persistent=False)
def clamp(self, x):
y = self.relu(x - self.min_val) + self.min_val
y = self.max_val - self.relu(self.max_val - y)
return y
def regress_depth(self, feature_map_d):
prob_feature = self.depth_regressor(feature_map_d)
prob = prob_feature.softmax(dim=1)
#prob = prob_feature.float().softmax(dim=1)
## Error logging
if torch.isnan(prob).any():
print('prob_feat_nan!!!')
if torch.isinf(prob).any():
print('prob_feat_inf!!!')
# h = prob[0,:,0,0].cpu().numpy().reshape(-1)
# import matplotlib.pyplot as plt
# plt.bar(range(len(h)), h)
B = prob.shape[0]
if "depth_expectation_anchor" not in self._buffers:
self.register_depth_expectation_anchor(self.num_depth_regressor_anchor, B)
d = compute_depth_expectation(
prob,
self.depth_expectation_anchor[:B, ...]).unsqueeze(1)
## Error logging
if torch.isnan(d ).any():
print('d_nan!!!')
if torch.isinf(d ).any():
print('d_inf!!!')
return (self.clamp(d) - self.max_val)/ self.regress_scale, prob_feature
def pred_normal(self, feature_map, confidence):
normal_out = self.normal_predictor(feature_map)
## Error logging
if torch.isnan(normal_out).any():
print('norm_nan!!!')
if torch.isinf(normal_out).any():
print('norm_feat_inf!!!')
return norm_normalize(torch.cat([normal_out, confidence], dim=1))
#return norm_normalize(torch.cat([normal_out, confidence], dim=1).float())
def create_mesh_grid(self, height, width, batch, device="cpu", set_buffer=True):
y, x = torch.meshgrid([torch.arange(0, height, dtype=torch.float32, device=device),
torch.arange(0, width, dtype=torch.float32, device=device)], indexing='ij')
meshgrid = torch.stack((x, y))
meshgrid = meshgrid.unsqueeze(0).repeat(batch, 1, 1, 1)
#self.register_buffer('meshgrid', meshgrid, persistent=False)
return meshgrid
def upsample_flow(self, flow, mask):
""" Upsample flow field [H/8, W/8, 2] -> [H, W, 2] using convex combination """
N, D, H, W = flow.shape
factor = 2 ** self.n_downsample
mask = mask.view(N, 1, 9, factor, factor, H, W)
mask = torch.softmax(mask, dim=2)
#mask = torch.softmax(mask.float(), dim=2)
#up_flow = F.unfold(factor * flow, [3,3], padding=1)
up_flow = F.unfold(flow, [3,3], padding=1)
up_flow = up_flow.view(N, D, 9, 1, 1, H, W)
up_flow = torch.sum(mask * up_flow, dim=2)
up_flow = up_flow.permute(0, 1, 4, 2, 5, 3)
return up_flow.reshape(N, D, factor*H, factor*W)
def initialize_flow(self, img):
""" Flow is represented as difference between two coordinate grids flow = coords1 - coords0"""
N, _, H, W = img.shape
coords0 = coords_grid(N, H, W).to(img.device)
coords1 = coords_grid(N, H, W).to(img.device)
return coords0, coords1
def upsample(self, x, scale_factor=2):
"""Upsample input tensor by a factor of 2
"""
return interpolate_float32(x, scale_factor=scale_factor*self.up_scale/8, mode="nearest")
def forward(self, vit_features, **kwargs):
## read vit token to multi-scale features
B, H, W, _, _, num_register_tokens = vit_features[1]
vit_features = vit_features[0]
## Error logging
if torch.isnan(vit_features[0]).any():
print('vit_feature_nan!!!')
if torch.isinf(vit_features[0]).any():
print('vit_feature_inf!!!')
if self.use_cls_token == True:
vit_features = [[ft[:, 1+num_register_tokens:, :].view(B, H, W, self.in_channels[0]), \
ft[:, 0:1+num_register_tokens, :].view(B, 1, 1, self.in_channels[0] * (1+num_register_tokens))] for ft in vit_features]
else:
vit_features = [ft.view(B, H, W, self.in_channels[0]) for ft in vit_features]
encoder_features = self.token2feature(vit_features) # 1/14, 1/14, 1/7, 1/4
## Error logging
for en_ft in encoder_features:
if torch.isnan(en_ft).any():
print('decoder_feature_nan!!!')
print(en_ft.shape)
if torch.isinf(en_ft).any():
print('decoder_feature_inf!!!')
print(en_ft.shape)
## decode features to init-depth (and confidence)
ref_feat= self.decoder_mono(encoder_features) # now, 1/4 for depth
## Error logging
if torch.isnan(ref_feat).any():
print('ref_feat_nan!!!')
if torch.isinf(ref_feat).any():
print('ref_feat_inf!!!')
feature_map = ref_feat[:, :-2, :, :] # feature map share of depth and normal prediction
depth_confidence_map = ref_feat[:, -2:-1, :, :]
normal_confidence_map = ref_feat[:, -1:, :, :]
depth_pred, binmap = self.regress_depth(feature_map) # regress bin for depth
normal_pred = self.pred_normal(feature_map, normal_confidence_map) # mlp for normal
depth_init = torch.cat((depth_pred, depth_confidence_map, normal_pred), dim=1) # (N, 1+1+4, H, W)
## encoder features to context-feature for init-hidden-state and contex-features
cnet_list = self.context_feature_encoder(encoder_features[::-1])
net_list = [torch.tanh(x[0]) for x in cnet_list] # x_4, x_8, x_16 of hidden state
inp_list = [torch.relu(x[1]) for x in cnet_list] # x_4, x_8, x_16 context features
# Rather than running the GRU's conv layers on the context features multiple times, we do it once at the beginning
inp_list = [list(conv(i).split(split_size=conv.out_channels//3, dim=1)) for i,conv in zip(inp_list, self.context_zqr_convs)]
coords0, coords1 = self.initialize_flow(net_list[0])
if depth_init is not None:
coords1 = coords1 + depth_init
if self.training:
low_resolution_init = [self.clamp(depth_init[:,:1] * self.regress_scale + self.max_val), depth_init[:,1:2], norm_normalize(depth_init[:,2:].clone())]
init_depth = upflow4(depth_init)
flow_predictions = [self.clamp(init_depth[:,:1] * self.regress_scale + self.max_val)]
conf_predictions = [init_depth[:,1:2]]
normal_outs = [norm_normalize(init_depth[:,2:].clone())]
else:
flow_predictions = []
conf_predictions = []
samples_pred_list = []
coord_list = []
normal_outs = []
low_resolution_init = []
for itr in range(self.iters):
# coords1 = coords1.detach()
flow = coords1 - coords0
if self.n_gru_layers == 3 and self.slow_fast_gru: # Update low-res GRU
net_list = self.update_block(net_list, inp_list, iter32=True, iter16=False, iter08=False, update=False)
if self.n_gru_layers >= 2 and self.slow_fast_gru:# Update low-res GRU and mid-res GRU
net_list = self.update_block(net_list, inp_list, iter32=self.n_gru_layers==3, iter16=True, iter08=False, update=False)
net_list, up_mask, delta_flow = self.update_block(net_list, inp_list, None, flow, iter32=self.n_gru_layers==3, iter16=self.n_gru_layers>=2)
# F(t+1) = F(t) + \Delta(t)
coords1 = coords1 + delta_flow
# We do not need to upsample or output intermediate results in test_mode
#if (not self.training) and itr < self.iters-1:
#continue
# upsample predictions
if up_mask is None:
flow_up = self.upsample(coords1-coords0, 4)
else:
flow_up = self.upsample_flow(coords1 - coords0, up_mask)
# flow_up = self.upsample(coords1-coords0, 4)
flow_predictions.append(self.clamp(flow_up[:,:1] * self.regress_scale + self.max_val))
conf_predictions.append(flow_up[:,1:2])
normal_outs.append(norm_normalize(flow_up[:,2:].clone()))
outputs=dict(
prediction=flow_predictions[-1],
predictions_list=flow_predictions,
confidence=conf_predictions[-1],
confidence_list=conf_predictions,
pred_logit=None,
# samples_pred_list=samples_pred_list,
# coord_list=coord_list,
prediction_normal=normal_outs[-1],
normal_out_list=normal_outs,
low_resolution_init=low_resolution_init,
)
return outputs
if __name__ == "__main__":
try:
from mmcv.utils import Config
except:
from mmengine import Config
cfg = Config.fromfile('/cpfs01/shared/public/users/mu.hu/monodepth/mono/configs/RAFTDecoder/vit.raft.full2t.py')
cfg.model.decode_head.in_channels = [384, 384, 384, 384]
cfg.model.decode_head.feature_channels = [96, 192, 384, 768]
cfg.model.decode_head.decoder_channels = [48, 96, 192, 384, 384]
cfg.model.decode_head.hidden_channels = [48, 48, 48, 48, 48]
cfg.model.decode_head.up_scale = 7
# cfg.model.decode_head.use_cls_token = True
# vit_feature = [[torch.rand((2, 20, 60, 384)).cuda(), torch.rand(2, 384).cuda()], \
# [torch.rand((2, 20, 60, 384)).cuda(), torch.rand(2, 384).cuda()], \
# [torch.rand((2, 20, 60, 384)).cuda(), torch.rand(2, 384).cuda()], \
# [torch.rand((2, 20, 60, 384)).cuda(), torch.rand(2, 384).cuda()]]
cfg.model.decode_head.use_cls_token = True
cfg.model.decode_head.num_register_tokens = 4
vit_feature = [[torch.rand((2, (74 * 74) + 5, 384)).cuda(),\
torch.rand((2, (74 * 74) + 5, 384)).cuda(), \
torch.rand((2, (74 * 74) + 5, 384)).cuda(), \
torch.rand((2, (74 * 74) + 5, 384)).cuda()], (2, 74, 74, 1036, 1036, 4)]
decoder = RAFTDepthNormalDPT5(cfg).cuda()
output = decoder(vit_feature)
temp = 1