Maikou's picture
all files first commit
9c3a994
raw
history blame
11.5 kB
# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple, Dict
from michelangelo.models.modules.distributions import DiagonalGaussianDistribution
from michelangelo.utils.eval import compute_psnr
from michelangelo.utils import misc
class KLNearFar(nn.Module):
def __init__(self,
near_weight: float = 0.1,
kl_weight: float = 1.0,
num_near_samples: Optional[int] = None):
super().__init__()
self.near_weight = near_weight
self.kl_weight = kl_weight
self.num_near_samples = num_near_samples
self.geo_criterion = nn.BCEWithLogitsLoss()
def forward(self,
posteriors: Optional[DiagonalGaussianDistribution],
logits: torch.FloatTensor,
labels: torch.FloatTensor,
split: Optional[str] = "train", **kwargs) -> Tuple[torch.FloatTensor, Dict[str, float]]:
"""
Args:
posteriors (DiagonalGaussianDistribution or torch.distributions.Normal):
logits (torch.FloatTensor): [B, 2*N], logits[:, 0:N] is the volume points; logits[:, N:2N] is the near points;
labels (torch.FloatTensor): [B, 2*N], labels[:, 0:N] is the volume points; labels[:, N:2N] is the near points;
split (str):
**kwargs:
Returns:
loss (torch.Tensor): (,)
log (dict):
"""
if self.num_near_samples is None:
num_vol = logits.shape[1] // 2
else:
num_vol = logits.shape[1] - self.num_near_samples
vol_logits = logits[:, 0:num_vol]
vol_labels = labels[:, 0:num_vol]
near_logits = logits[:, num_vol:]
near_labels = labels[:, num_vol:]
# occupancy loss
# vol_bce = self.geo_criterion(vol_logits, vol_labels)
# near_bce = self.geo_criterion(near_logits, near_labels)
vol_bce = self.geo_criterion(vol_logits.float(), vol_labels.float())
near_bce = self.geo_criterion(near_logits.float(), near_labels.float())
if posteriors is None:
kl_loss = torch.tensor(0.0, dtype=vol_logits.dtype, device=vol_logits.device)
else:
kl_loss = posteriors.kl(dims=(1, 2))
kl_loss = torch.mean(kl_loss)
loss = vol_bce + near_bce * self.near_weight + kl_loss * self.kl_weight
with torch.no_grad():
preds = logits >= 0
accuracy = (preds == labels).float()
accuracy = accuracy.mean()
pos_ratio = torch.mean(labels)
log = {
"{}/total_loss".format(split): loss.clone().detach(),
"{}/near".format(split): near_bce.detach(),
"{}/far".format(split): vol_bce.detach(),
"{}/kl".format(split): kl_loss.detach(),
"{}/accuracy".format(split): accuracy,
"{}/pos_ratio".format(split): pos_ratio
}
if posteriors is not None:
log[f"{split}/mean"] = posteriors.mean.mean().detach()
log[f"{split}/std_mean"] = posteriors.std.mean().detach()
log[f"{split}/std_max"] = posteriors.std.max().detach()
return loss, log
class KLNearFarColor(nn.Module):
def __init__(self,
near_weight: float = 0.1,
kl_weight: float = 1.0,
color_weight: float = 1.0,
color_criterion: str = "mse",
num_near_samples: Optional[int] = None):
super().__init__()
self.color_weight = color_weight
self.near_weight = near_weight
self.kl_weight = kl_weight
self.num_near_samples = num_near_samples
if color_criterion == "mse":
self.color_criterion = nn.MSELoss()
elif color_criterion == "l1":
self.color_criterion = nn.L1Loss()
else:
raise ValueError(f"{color_criterion} must be [`mse`, `l1`].")
self.geo_criterion = nn.BCEWithLogitsLoss()
def forward(self,
posteriors: Optional[DiagonalGaussianDistribution],
logits: torch.FloatTensor,
labels: torch.FloatTensor,
pred_colors: torch.FloatTensor,
gt_colors: torch.FloatTensor,
split: Optional[str] = "train", **kwargs) -> Tuple[torch.FloatTensor, Dict[str, float]]:
"""
Args:
posteriors (DiagonalGaussianDistribution or torch.distributions.Normal):
logits (torch.FloatTensor): [B, 2*N], logits[:, 0:N] is the volume points; logits[:, N:2N] is the near points;
labels (torch.FloatTensor): [B, 2*N], labels[:, 0:N] is the volume points; labels[:, N:2N] is the near points;
pred_colors (torch.FloatTensor): [B, M, 3]
gt_colors (torch.FloatTensor): [B, M, 3]
split (str):
**kwargs:
Returns:
loss (torch.Tensor): (,)
log (dict):
"""
if self.num_near_samples is None:
num_vol = logits.shape[1] // 2
else:
num_vol = logits.shape[1] - self.num_near_samples
vol_logits = logits[:, 0:num_vol]
vol_labels = labels[:, 0:num_vol]
near_logits = logits[:, num_vol:]
near_labels = labels[:, num_vol:]
# occupancy loss
# vol_bce = self.geo_criterion(vol_logits, vol_labels)
# near_bce = self.geo_criterion(near_logits, near_labels)
vol_bce = self.geo_criterion(vol_logits.float(), vol_labels.float())
near_bce = self.geo_criterion(near_logits.float(), near_labels.float())
# surface color loss
color = self.color_criterion(pred_colors, gt_colors)
if posteriors is None:
kl_loss = torch.tensor(0.0, dtype=pred_colors.dtype, device=pred_colors.device)
else:
kl_loss = posteriors.kl(dims=(1, 2))
kl_loss = torch.mean(kl_loss)
loss = vol_bce + near_bce * self.near_weight + color * self.color_weight + kl_loss * self.kl_weight
with torch.no_grad():
preds = logits >= 0
accuracy = (preds == labels).float()
accuracy = accuracy.mean()
psnr = compute_psnr(pred_colors, gt_colors)
log = {
"{}/total_loss".format(split): loss.clone().detach(),
"{}/near".format(split): near_bce.detach(),
"{}/far".format(split): vol_bce.detach(),
"{}/color".format(split): color.detach(),
"{}/kl".format(split): kl_loss.detach(),
"{}/psnr".format(split): psnr.detach(),
"{}/accuracy".format(split): accuracy
}
return loss, log
class ContrastKLNearFar(nn.Module):
def __init__(self,
contrast_weight: float = 1.0,
near_weight: float = 0.1,
kl_weight: float = 1.0,
num_near_samples: Optional[int] = None):
super().__init__()
self.labels = None
self.last_local_batch_size = None
self.contrast_weight = contrast_weight
self.near_weight = near_weight
self.kl_weight = kl_weight
self.num_near_samples = num_near_samples
self.geo_criterion = nn.BCEWithLogitsLoss()
def forward(self,
shape_embed: torch.FloatTensor,
text_embed: torch.FloatTensor,
image_embed: torch.FloatTensor,
logit_scale: torch.FloatTensor,
posteriors: Optional[DiagonalGaussianDistribution],
shape_logits: torch.FloatTensor,
shape_labels: torch.FloatTensor,
split: Optional[str] = "train", **kwargs):
local_batch_size = shape_embed.size(0)
if local_batch_size != self.last_local_batch_size:
self.labels = local_batch_size * misc.get_rank() + torch.arange(
local_batch_size, device=shape_embed.device
).long()
self.last_local_batch_size = local_batch_size
# normalized features
shape_embed = F.normalize(shape_embed, dim=-1, p=2)
text_embed = F.normalize(text_embed, dim=-1, p=2)
image_embed = F.normalize(image_embed, dim=-1, p=2)
# gather features from all GPUs
shape_embed_all, text_embed_all, image_embed_all = misc.all_gather_batch(
[shape_embed, text_embed, image_embed]
)
# cosine similarity as logits
logits_per_shape_text = logit_scale * shape_embed @ text_embed_all.t()
logits_per_text_shape = logit_scale * text_embed @ shape_embed_all.t()
logits_per_shape_image = logit_scale * shape_embed @ image_embed_all.t()
logits_per_image_shape = logit_scale * image_embed @ shape_embed_all.t()
contrast_loss = (F.cross_entropy(logits_per_shape_text, self.labels) +
F.cross_entropy(logits_per_text_shape, self.labels)) / 2 + \
(F.cross_entropy(logits_per_shape_image, self.labels) +
F.cross_entropy(logits_per_image_shape, self.labels)) / 2
# shape reconstruction
if self.num_near_samples is None:
num_vol = shape_logits.shape[1] // 2
else:
num_vol = shape_logits.shape[1] - self.num_near_samples
vol_logits = shape_logits[:, 0:num_vol]
vol_labels = shape_labels[:, 0:num_vol]
near_logits = shape_logits[:, num_vol:]
near_labels = shape_labels[:, num_vol:]
# occupancy loss
vol_bce = self.geo_criterion(vol_logits.float(), vol_labels.float())
near_bce = self.geo_criterion(near_logits.float(), near_labels.float())
if posteriors is None:
kl_loss = torch.tensor(0.0, dtype=vol_logits.dtype, device=vol_logits.device)
else:
kl_loss = posteriors.kl(dims=(1, 2))
kl_loss = torch.mean(kl_loss)
loss = vol_bce + near_bce * self.near_weight + kl_loss * self.kl_weight + contrast_loss * self.contrast_weight
# compute accuracy
with torch.no_grad():
pred = torch.argmax(logits_per_shape_text, dim=-1)
correct = pred.eq(self.labels).sum()
shape_text_acc = 100 * correct / local_batch_size
pred = torch.argmax(logits_per_shape_image, dim=-1)
correct = pred.eq(self.labels).sum()
shape_image_acc = 100 * correct / local_batch_size
preds = shape_logits >= 0
accuracy = (preds == shape_labels).float()
accuracy = accuracy.mean()
log = {
"{}/contrast".format(split): contrast_loss.clone().detach(),
"{}/near".format(split): near_bce.detach(),
"{}/far".format(split): vol_bce.detach(),
"{}/kl".format(split): kl_loss.detach(),
"{}/shape_text_acc".format(split): shape_text_acc,
"{}/shape_image_acc".format(split): shape_image_acc,
"{}/total_loss".format(split): loss.clone().detach(),
"{}/accuracy".format(split): accuracy,
}
if posteriors is not None:
log[f"{split}/mean"] = posteriors.mean.mean().detach()
log[f"{split}/std_mean"] = posteriors.std.mean().detach()
log[f"{split}/std_max"] = posteriors.std.max().detach()
return loss, log