Align3R / dust3r /losses.py
cyun9286's picture
Add application file
f53b39e
# Copyright (C) 2024-present Naver Corporation. All rights reserved.
# Licensed under CC BY-NC-SA 4.0 (non-commercial use only).
#
# --------------------------------------------------------
# Implementation of DUSt3R training losses
# --------------------------------------------------------
from copy import copy, deepcopy
import torch
import torch.nn as nn
import torch.nn.functional as F
from dust3r.inference import get_pred_pts3d, find_opt_scaling
from dust3r.utils.geometry import inv, geotrf, normalize_pointcloud
from dust3r.utils.geometry import get_joint_pointcloud_depth, get_joint_pointcloud_center_scale
def Sum(*losses_and_masks):
loss, mask = losses_and_masks[0]
if loss.ndim > 0:
# we are actually returning the loss for every pixels
return losses_and_masks
else:
# we are returning the global loss
for loss2, mask2 in losses_and_masks[1:]:
loss = loss + loss2
return loss
class BaseCriterion(nn.Module):
def __init__(self, reduction='mean'):
super().__init__()
self.reduction = reduction
class LLoss (BaseCriterion):
""" L-norm loss
"""
def forward(self, a, b):
assert a.shape == b.shape and a.ndim >= 2 and 1 <= a.shape[-1] <= 3, f'Bad shape = {a.shape}'
dist = self.distance(a, b)
assert dist.ndim == a.ndim - 1 # one dimension less
if self.reduction == 'none':
return dist
if self.reduction == 'sum':
return dist.sum()
if self.reduction == 'mean':
return dist.mean() if dist.numel() > 0 else dist.new_zeros(())
raise ValueError(f'bad {self.reduction=} mode')
def distance(self, a, b):
raise NotImplementedError()
class WeightedL21Loss(LLoss):
""" Euclidean distance between 3D points with weighted loss based on 1/z """
def distance(self, a, b, z):
"""
Compute the weighted Euclidean distance between two 3D points.
a: tensor of shape (B, H, W, 3), 3D points of prediction
b: tensor of shape (B, H, W, 3), 3D points of target
"""
# Calculate the Euclidean distance (L2 norm) between the points
dist = torch.norm(a - b, dim=-1) # (B, H, W)
# Extract the z values from b (the third dimension, i.e., b[..., 2])
#z = b[..., 2] # (B, H, W)
# Apply weight based on 1/z
weight = torch.clamp(1.0 / (z + 1e-8), min=0, max=1) # To prevent division by zero, add a small epsilon
#print(weight.max(), weight.min())
# Apply the weight to the distance
weighted_dist = 10 * dist * weight # Element-wise multiplication (B, H, W)
return weighted_dist
def forward(self, a, b, z):
assert a.shape == b.shape and a.ndim >= 2 and 1 <= a.shape[-1] <= 3, f'Bad shape = {a.shape}'
dist = self.distance(a, b, z)
assert dist.ndim == a.ndim - 1 # one dimension less
if self.reduction == 'none':
return dist
if self.reduction == 'sum':
return dist.sum()
if self.reduction == 'mean':
return dist.mean() if dist.numel() > 0 else dist.new_zeros(())
raise ValueError(f'bad {self.reduction=} mode')
class L21Loss (LLoss):
""" Euclidean distance between 3d points """
def distance(self, a, b):
return torch.norm(a - b, dim=-1) # normalized L2 distance
L21 = L21Loss()
WeightedL21 = WeightedL21Loss()
class Criterion (nn.Module):
def __init__(self, criterion=None):
super().__init__()
assert isinstance(criterion, BaseCriterion), f'{criterion} is not a proper criterion!'
self.criterion = copy(criterion)
def get_name(self):
return f'{type(self).__name__}({self.criterion})'
def with_reduction(self, mode='none'):
res = loss = deepcopy(self)
while loss is not None:
assert isinstance(loss, Criterion)
loss.criterion.reduction = mode # make it return the loss for each sample
loss = loss._loss2 # we assume loss is a Multiloss
return res
class MultiLoss (nn.Module):
""" Easily combinable losses (also keep track of individual loss values):
loss = MyLoss1() + 0.1*MyLoss2()
Usage:
Inherit from this class and override get_name() and compute_loss()
"""
def __init__(self):
super().__init__()
self._alpha = 1
self._loss2 = None
def compute_loss(self, *args, **kwargs):
raise NotImplementedError()
def get_name(self):
raise NotImplementedError()
def __mul__(self, alpha):
assert isinstance(alpha, (int, float))
res = copy(self)
res._alpha = alpha
return res
__rmul__ = __mul__ # same
def __add__(self, loss2):
assert isinstance(loss2, MultiLoss)
res = cur = copy(self)
# find the end of the chain
while cur._loss2 is not None:
cur = cur._loss2
cur._loss2 = loss2
return res
def __repr__(self):
name = self.get_name()
if self._alpha != 1:
name = f'{self._alpha:g}*{name}'
if self._loss2:
name = f'{name} + {self._loss2}'
return name
def forward(self, *args, **kwargs):
loss = self.compute_loss(*args, **kwargs)
if isinstance(loss, tuple):
loss, details = loss
elif loss.ndim == 0:
details = {self.get_name(): float(loss)}
else:
details = {}
loss = loss * self._alpha
if self._loss2:
loss2, details2 = self._loss2(*args, **kwargs)
loss = loss + loss2
details |= details2
return loss, details
class Regr3D (Criterion, MultiLoss):
""" Ensure that all 3D points are correct.
Asymmetric loss: view1 is supposed to be the anchor.
P1 = RT1 @ D1
P2 = RT2 @ D2
loss1 = (I @ pred_D1) - (RT1^-1 @ RT1 @ D1)
loss2 = (RT21 @ pred_D2) - (RT1^-1 @ P2)
= (RT21 @ pred_D2) - (RT1^-1 @ RT2 @ D2)
"""
def __init__(self, criterion, norm_mode='avg_dis', gt_scale=False):
super().__init__(criterion)
self.norm_mode = norm_mode
self.gt_scale = gt_scale
def get_all_pts3d(self, gt1, gt2, pred1, pred2, dist_clip=None):
# everything is normalized w.r.t. camera of view1
in_camera1 = inv(gt1['camera_pose'])
gt_pts1 = geotrf(in_camera1, gt1['pts3d']) # B,H,W,3
gt_pts2 = geotrf(in_camera1, gt2['pts3d']) # B,H,W,3
valid1 = gt1['valid_mask'][..., 0].clone()
valid2 = gt2['valid_mask'][..., 0].clone()
if dist_clip is not None:
# points that are too far-away == invalid
dis1 = gt_pts1.norm(dim=-1) # (B, H, W)
dis2 = gt_pts2.norm(dim=-1) # (B, H, W)
valid1 = valid1 & (dis1 <= dist_clip)
valid2 = valid2 & (dis2 <= dist_clip)
pr_pts1 = get_pred_pts3d(gt1, pred1, use_pose=False)
pr_pts2 = get_pred_pts3d(gt2, pred2, use_pose=True)
#gt_pts11 = gt_pts1.clone()
#gt_pts21 = gt_pts2.clone()
# normalize 3d points
if self.norm_mode:
pr_pts1, pr_pts2 = normalize_pointcloud(pr_pts1, pr_pts2, self.norm_mode, valid1, valid2)
if self.norm_mode and not self.gt_scale:
gt_pts1, gt_pts2 = normalize_pointcloud(gt_pts1, gt_pts2, self.norm_mode, valid1, valid2)
#print(gt_pts1.shape)
return gt_pts1, gt_pts2, pr_pts1, pr_pts2, valid1, valid2, {}#, gt_pts11[..., 2], gt_pts21[..., 2]
def compute_loss(self, gt1, gt2, pred1, pred2, **kw):
#print(gt2['valid_mask'].shape)
gt_pts1, gt_pts2, pred_pts1, pred_pts2, mask1, mask2, monitoring = \
self.get_all_pts3d(gt1, gt2, pred1, pred2, **kw)
# loss on img1 side
l1 = self.criterion(pred_pts1[mask1], gt_pts1[mask1])
# loss on gt2 side
l2 = self.criterion(pred_pts2[mask2], gt_pts2[mask2])
#print((gt1['pts3d'][...,-1]==0).sum())
#print((gt1['valid_mask'][..., 1]==0).sum(), (gt1['valid_mask'][..., 0]==0).sum())
l_mask1 = torch.tensor(0.0).cuda()#F.mse_loss(pred1['pred_mask'].permute(0, 2, 3, 1)[..., 0], gt1['valid_mask'][..., 1].float())
l_mask2 = torch.tensor(0.0).cuda()#F.mse_loss(pred2['pred_mask'].permute(0, 2, 3, 1)[..., 0], gt2['valid_mask'][..., 1].float())
self_name = type(self).__name__
details = {self_name + '_pts3d_1': float(l1.mean()), self_name + '_pts3d_2': float(l2.mean()), 'mask_loss_1': float(l_mask1.mean()), 'mask_loss_2': float(l_mask2.mean())}
#l1 = l1 + l_mask1
#l2 = l2 + l_mask2
return Sum((l1, mask1), (l2, mask2)), (details | monitoring)
class ConfLoss (MultiLoss):
""" Weighted regression by learned confidence.
Assuming the input pixel_loss is a pixel-level regression loss.
Principle:
high-confidence means high conf = 0.1 ==> conf_loss = x / 10 + alpha*log(10)
low confidence means low conf = 10 ==> conf_loss = x * 10 - alpha*log(10)
alpha: hyperparameter
"""
def __init__(self, pixel_loss, alpha=1):
super().__init__()
assert alpha > 0
self.alpha = alpha
self.pixel_loss = pixel_loss.with_reduction('none')
def get_name(self):
return f'ConfLoss({self.pixel_loss})'
def get_conf_log(self, x):
return x, torch.log(x)
def compute_loss(self, gt1, gt2, pred1, pred2, **kw):
# compute per-pixel loss
((loss1, msk1), (loss2, msk2)), details = self.pixel_loss(gt1, gt2, pred1, pred2, **kw)
if loss1.numel() == 0:
print('NO VALID POINTS in img1', force=True)
if loss2.numel() == 0:
print('NO VALID POINTS in img2', force=True)
# weight by confidence
conf1, log_conf1 = self.get_conf_log(pred1['conf'][msk1])
conf2, log_conf2 = self.get_conf_log(pred2['conf'][msk2])
conf_loss1 = loss1 * conf1 - self.alpha * log_conf1
conf_loss2 = loss2 * conf2 - self.alpha * log_conf2
#print('11')
# conf_loss1 = loss1 #* conf1 - self.alpha * log_conf1
# conf_loss2 = loss2 #* conf2 - self.alpha * log_conf2
# average + nan protection (in case of no valid pixels at all)
conf_loss1 = conf_loss1.mean() if conf_loss1.numel() > 0 else 0
conf_loss2 = conf_loss2.mean() if conf_loss2.numel() > 0 else 0
return conf_loss1 + conf_loss2, dict(conf_loss_1=float(conf_loss1), conf_loss2=float(conf_loss2), **details)
class Regr3D_ShiftInv (Regr3D):
""" Same than Regr3D but invariant to depth shift.
"""
def get_all_pts3d(self, gt1, gt2, pred1, pred2):
# compute unnormalized points
gt_pts1, gt_pts2, pred_pts1, pred_pts2, mask1, mask2, monitoring = \
super().get_all_pts3d(gt1, gt2, pred1, pred2)
# compute median depth
gt_z1, gt_z2 = gt_pts1[..., 2], gt_pts2[..., 2]
pred_z1, pred_z2 = pred_pts1[..., 2], pred_pts2[..., 2]
gt_shift_z = get_joint_pointcloud_depth(gt_z1, gt_z2, mask1, mask2)[:, None, None]
pred_shift_z = get_joint_pointcloud_depth(pred_z1, pred_z2, mask1, mask2)[:, None, None]
# subtract the median depth
gt_z1 -= gt_shift_z
gt_z2 -= gt_shift_z
pred_z1 -= pred_shift_z
pred_z2 -= pred_shift_z
# monitoring = dict(monitoring, gt_shift_z=gt_shift_z.mean().detach(), pred_shift_z=pred_shift_z.mean().detach())
return gt_pts1, gt_pts2, pred_pts1, pred_pts2, mask1, mask2, monitoring#, gt_z1, gt_z2
class Regr3D_ScaleInv (Regr3D):
""" Same than Regr3D but invariant to depth shift.
if gt_scale == True: enforce the prediction to take the same scale than GT
"""
def get_all_pts3d(self, gt1, gt2, pred1, pred2):
# compute depth-normalized points
gt_pts1, gt_pts2, pred_pts1, pred_pts2, mask1, mask2, monitoring = super().get_all_pts3d(gt1, gt2, pred1, pred2)
# measure scene scale
_, gt_scale = get_joint_pointcloud_center_scale(gt_pts1, gt_pts2, mask1, mask2)
_, pred_scale = get_joint_pointcloud_center_scale(pred_pts1, pred_pts2, mask1, mask2)
# prevent predictions to be in a ridiculous range
pred_scale = pred_scale.clip(min=1e-3, max=1e3)
# subtract the median depth
if self.gt_scale:
pred_pts1 *= gt_scale / pred_scale
pred_pts2 *= gt_scale / pred_scale
# monitoring = dict(monitoring, pred_scale=(pred_scale/gt_scale).mean())
else:
gt_pts1 /= gt_scale
gt_pts2 /= gt_scale
pred_pts1 /= pred_scale
pred_pts2 /= pred_scale
# monitoring = dict(monitoring, gt_scale=gt_scale.mean(), pred_scale=pred_scale.mean().detach())
return gt_pts1, gt_pts2, pred_pts1, pred_pts2, mask1, mask2, monitoring#, gt_z1, gt_z2
class Regr3D_ScaleShiftInv (Regr3D_ScaleInv, Regr3D_ShiftInv):
# calls Regr3D_ShiftInv first, then Regr3D_ScaleInv
pass