Linly-Talker / pytorch3d /tests /test_chamfer.py
linxianzhong0128's picture
Upload folder using huggingface_hub
7088d16 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import unittest
from collections import namedtuple
import numpy as np
import torch
import torch.nn.functional as F
from pytorch3d.loss import chamfer_distance
from pytorch3d.structures.pointclouds import Pointclouds
from .common_testing import get_random_cuda_device, TestCaseMixin
# Output of init_pointclouds
points_normals = namedtuple(
"points_normals", "p1_lengths p2_lengths cloud1 cloud2 p1 p2 n1 n2 weights"
)
class TestChamfer(TestCaseMixin, unittest.TestCase):
def setUp(self) -> None:
super().setUp()
torch.manual_seed(1)
@staticmethod
def init_pointclouds(
N, P1, P2, device, requires_grad: bool = True, allow_empty: bool = True
):
"""
Create 2 pointclouds object and associated padded points/normals tensors by
starting from lists. The clouds and tensors have the same data. The
leaf nodes for the clouds are a list of tensors. The padded tensor can be
used directly as a leaf node.
"""
low = 0 if allow_empty else 1
p1_lengths = torch.randint(low, P1, size=(N,), dtype=torch.int64, device=device)
p2_lengths = torch.randint(low, P2, size=(N,), dtype=torch.int64, device=device)
P1 = p1_lengths.max().item()
P2 = p2_lengths.max().item()
weights = torch.rand((N,), dtype=torch.float32, device=device)
# list of points and normals tensors
p1 = torch.rand((N, P1, 3), dtype=torch.float32, device=device)
p2 = torch.rand((N, P2, 3), dtype=torch.float32, device=device)
n1 = torch.rand((N, P1, 3), dtype=torch.float32, device=device)
n2 = torch.rand((N, P2, 3), dtype=torch.float32, device=device)
n1 /= n1.norm(dim=-1, p=2, keepdim=True)
n2 /= n2.norm(dim=-1, p=2, keepdim=True)
p1_list = []
p2_list = []
n1_list = []
n2_list = []
for i in range(N):
l1 = p1_lengths[i]
l2 = p2_lengths[i]
p1_list.append(p1[i, :l1].clone())
p2_list.append(p2[i, :l2].clone())
n1_list.append(n1[i, :l1].clone())
n2_list.append(n2[i, :l2].clone())
# Set requires_grad for all tensors in the lists and
# padded tensors.
if requires_grad:
for p in p2_list + p1_list + n1_list + n2_list + [p1, p2, n1, n2]:
p.requires_grad = True
# Create pointclouds objects
cloud1 = Pointclouds(points=p1_list, normals=n1_list)
cloud2 = Pointclouds(points=p2_list, normals=n2_list)
# Return pointclouds objects and padded tensors
return points_normals(
p1_lengths=p1_lengths,
p2_lengths=p2_lengths,
cloud1=cloud1,
cloud2=cloud2,
p1=p1,
p2=p2,
n1=n1,
n2=n2,
weights=weights,
)
@staticmethod
def chamfer_distance_naive_pointclouds(
p1, p2, norm: int = 2, device="cpu", abs_cosine=True
):
"""
Naive iterative implementation of nearest neighbor and chamfer distance.
x and y are assumed to be pointclouds objects with points and optionally normals.
This functions supports heterogeneous pointclouds in a batch.
Returns lists of the unreduced loss and loss_normals.
"""
x = p1.points_padded()
y = p2.points_padded()
N, P1, D = x.shape
P2 = y.size(1)
x_lengths = p1.num_points_per_cloud()
y_lengths = p2.num_points_per_cloud()
x_normals = p1.normals_padded()
y_normals = p2.normals_padded()
return_normals = x_normals is not None and y_normals is not None
# Initialize all distances to + inf
dist = torch.ones((N, P1, P2), dtype=torch.float32, device=device) * np.inf
x_mask = (
torch.arange(P1, device=x.device)[None] >= x_lengths[:, None]
) # shape [N, P1]
y_mask = (
torch.arange(P2, device=y.device)[None] >= y_lengths[:, None]
) # shape [N, P2]
is_x_heterogeneous = (x_lengths != P1).any()
is_y_heterogeneous = (y_lengths != P2).any()
# Only calculate the distances for the points which are not masked
for n in range(N):
for i1 in range(x_lengths[n]):
for i2 in range(y_lengths[n]):
if norm == 2:
dist[n, i1, i2] = torch.sum((x[n, i1, :] - y[n, i2, :]) ** 2)
elif norm == 1:
dist[n, i1, i2] = torch.sum(
torch.abs(x[n, i1, :] - y[n, i2, :])
)
else:
raise ValueError("No support for norm %d" % (norm))
x_dist = torch.min(dist, dim=2)[0] # (N, P1)
y_dist = torch.min(dist, dim=1)[0] # (N, P2)
if is_x_heterogeneous:
x_dist[x_mask] = 0.0
if is_y_heterogeneous:
y_dist[y_mask] = 0.0
loss = [x_dist, y_dist]
lnorm = [x.new_zeros(()), x.new_zeros(())]
if return_normals:
x_index = dist.argmin(2).view(N, P1, 1).expand(N, P1, 3)
y_index = dist.argmin(1).view(N, P2, 1).expand(N, P2, 3)
cosine_sim1 = F.cosine_similarity(
x_normals, y_normals.gather(1, x_index), dim=2, eps=1e-6
)
cosine_sim2 = F.cosine_similarity(
y_normals, x_normals.gather(1, y_index), dim=2, eps=1e-6
)
if abs_cosine:
lnorm1 = 1 - torch.abs(cosine_sim1)
lnorm2 = 1 - torch.abs(cosine_sim2)
else:
lnorm1 = 1 - cosine_sim1
lnorm2 = 1 - cosine_sim2
if is_x_heterogeneous:
lnorm1[x_mask] = 0.0
if is_y_heterogeneous:
lnorm2[y_mask] = 0.0
lnorm = [lnorm1, lnorm2] # [(N, P1), (N, P2)]
return loss, lnorm
@staticmethod
def chamfer_distance_naive(
x, y, x_normals=None, y_normals=None, norm: int = 2, abs_cosine=True
):
"""
Naive iterative implementation of nearest neighbor and chamfer distance.
Returns lists of the unreduced loss and loss_normals. This naive
version only supports homogeneous pointcouds in a batch.
"""
N, P1, D = x.shape
P2 = y.size(1)
device = x.device
return_normals = x_normals is not None and y_normals is not None
dist = torch.zeros((N, P1, P2), dtype=torch.float32, device=device)
for n in range(N):
for i1 in range(P1):
for i2 in range(P2):
if norm == 2:
dist[n, i1, i2] = torch.sum((x[n, i1, :] - y[n, i2, :]) ** 2)
elif norm == 1:
dist[n, i1, i2] = torch.sum(
torch.abs(x[n, i1, :] - y[n, i2, :])
)
else:
raise ValueError("No support for norm %d" % (norm))
loss = [
torch.min(dist, dim=2)[0], # (N, P1)
torch.min(dist, dim=1)[0], # (N, P2)
]
lnorm = [x.new_zeros(()), x.new_zeros(())]
if return_normals:
x_index = dist.argmin(2).view(N, P1, 1).expand(N, P1, 3)
y_index = dist.argmin(1).view(N, P2, 1).expand(N, P2, 3)
cosine_sim1 = F.cosine_similarity(
x_normals, y_normals.gather(1, x_index), dim=2, eps=1e-6
)
cosine_sim2 = F.cosine_similarity(
y_normals, x_normals.gather(1, y_index), dim=2, eps=1e-6
)
if abs_cosine:
lnorm1 = 1 - torch.abs(cosine_sim1)
lnorm2 = 1 - torch.abs(cosine_sim2)
else:
lnorm1 = 1 - cosine_sim1
lnorm2 = 1 - cosine_sim2
lnorm = [lnorm1, lnorm2] # [(N, P1), (N, P2)]
return loss, lnorm
def test_chamfer_point_batch_reduction_mean(self):
"""
Compare output of vectorized chamfer loss with naive implementation
for the default settings (point_reduction = "mean" and batch_reduction = "mean")
and no normals.
This tests only uses homogeneous pointclouds.
"""
N, max_P1, max_P2 = 7, 10, 18
device = get_random_cuda_device()
for norm in [1, 2]:
points_normals = TestChamfer.init_pointclouds(N, max_P1, max_P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
weights = points_normals.weights
p11 = p1.detach().clone()
p22 = p2.detach().clone()
p11.requires_grad = True
p22.requires_grad = True
P1 = p1.shape[1]
P2 = p2.shape[1]
pred_loss, pred_loss_norm = TestChamfer.chamfer_distance_naive(
p1, p2, norm=norm
)
# point_reduction = "mean".
loss, loss_norm = chamfer_distance(p11, p22, weights=weights, norm=norm)
pred_loss = pred_loss[0].sum(1) / P1 + pred_loss[1].sum(1) / P2
pred_loss *= weights
pred_loss = pred_loss.sum() / weights.sum()
self.assertClose(loss, pred_loss)
self.assertTrue(loss_norm is None)
# Check gradients
self._check_gradients(loss, None, pred_loss, None, p1, p11, p2, p22)
def test_chamfer_vs_naive_pointcloud(self):
"""
Test the default settings for chamfer_distance
(point reduction = "mean" and batch_reduction="mean") but with heterogeneous
pointclouds as input. Compare with the naive implementation of chamfer
which supports heterogeneous pointcloud objects.
"""
N, max_P1, max_P2 = 3, 70, 70
device = get_random_cuda_device()
for norm in [1, 2]:
points_normals = TestChamfer.init_pointclouds(N, max_P1, max_P2, device)
weights = points_normals.weights
x_lengths = points_normals.p1_lengths
y_lengths = points_normals.p2_lengths
# Chamfer with tensors as input for heterogeneous pointclouds.
cham_tensor, norm_tensor = chamfer_distance(
points_normals.p1,
points_normals.p2,
x_normals=points_normals.n1,
y_normals=points_normals.n2,
x_lengths=points_normals.p1_lengths,
y_lengths=points_normals.p2_lengths,
weights=weights,
norm=norm,
)
# Chamfer with pointclouds as input.
pred_loss, pred_norm_loss = TestChamfer.chamfer_distance_naive_pointclouds(
points_normals.cloud1, points_normals.cloud2, norm=norm, device=device
)
# Mean reduction point loss.
pred_loss[0] *= weights.view(N, 1)
pred_loss[1] *= weights.view(N, 1)
pred_loss_mean = (
pred_loss[0].sum(1) / x_lengths + pred_loss[1].sum(1) / y_lengths
)
pred_loss_mean = pred_loss_mean.sum()
pred_loss_mean /= weights.sum()
# Mean reduction norm loss.
pred_norm_loss[0] *= weights.view(N, 1)
pred_norm_loss[1] *= weights.view(N, 1)
pred_norm_loss_mean = (
pred_norm_loss[0].sum(1) / x_lengths
+ pred_norm_loss[1].sum(1) / y_lengths
)
pred_norm_loss_mean = pred_norm_loss_mean.sum() / weights.sum()
self.assertClose(pred_loss_mean, cham_tensor)
self.assertClose(pred_norm_loss_mean, norm_tensor)
self._check_gradients(
cham_tensor,
norm_tensor,
pred_loss_mean,
pred_norm_loss_mean,
points_normals.cloud1.points_list(),
points_normals.p1,
points_normals.cloud2.points_list(),
points_normals.p2,
points_normals.cloud1.normals_list(),
points_normals.n1,
points_normals.cloud2.normals_list(),
points_normals.n2,
x_lengths,
y_lengths,
)
def test_single_directional_chamfer_vs_naive_pointcloud(self):
"""
Test the single directional settings for chamfer_distance
(point reduction = "mean" and batch_reduction="mean") but with heterogeneous
pointclouds as input. Compare with the naive implementation of chamfer
which supports heterogeneous pointcloud objects.
"""
N, max_P1, max_P2 = 3, 70, 70
device = get_random_cuda_device()
for norm in [1, 2]:
for abs_cosine in [True, False]:
points_normals = TestChamfer.init_pointclouds(N, max_P1, max_P2, device)
weights = points_normals.weights
x_lengths = points_normals.p1_lengths
y_lengths = points_normals.p2_lengths
# Chamfer with tensors as input for heterogeneous pointclouds.
cham_tensor, norm_tensor = chamfer_distance(
points_normals.p1,
points_normals.p2,
x_normals=points_normals.n1,
y_normals=points_normals.n2,
x_lengths=points_normals.p1_lengths,
y_lengths=points_normals.p2_lengths,
weights=weights,
norm=norm,
single_directional=True,
abs_cosine=abs_cosine,
)
# Chamfer with pointclouds as input.
(
pred_loss,
pred_norm_loss,
) = TestChamfer.chamfer_distance_naive_pointclouds(
points_normals.cloud1,
points_normals.cloud2,
norm=norm,
device=device,
abs_cosine=abs_cosine,
)
# Mean reduction point loss.
pred_loss[0] *= weights.view(N, 1)
pred_loss_mean = pred_loss[0].sum(1) / x_lengths
pred_loss_mean = pred_loss_mean.sum()
pred_loss_mean /= weights.sum()
# Mean reduction norm loss.
pred_norm_loss[0] *= weights.view(N, 1)
pred_norm_loss_mean = pred_norm_loss[0].sum(1) / x_lengths
pred_norm_loss_mean = pred_norm_loss_mean.sum() / weights.sum()
self.assertClose(pred_loss_mean, cham_tensor)
self.assertClose(pred_norm_loss_mean, norm_tensor)
self._check_gradients(
cham_tensor,
norm_tensor,
pred_loss_mean,
pred_norm_loss_mean,
points_normals.cloud1.points_list(),
points_normals.p1,
points_normals.cloud2.points_list(),
points_normals.p2,
points_normals.cloud1.normals_list(),
points_normals.n1,
points_normals.cloud2.normals_list(),
points_normals.n2,
x_lengths,
y_lengths,
)
def test_chamfer_pointcloud_object_withnormals(self):
N = 5
P1, P2 = 100, 100
device = get_random_cuda_device()
reductions = [
("sum", "sum"),
("mean", "sum"),
("sum", "mean"),
("mean", "mean"),
("sum", None),
("mean", None),
(None, None),
]
for point_reduction, batch_reduction in reductions:
# Reinitialize all the tensors so that the
# backward pass can be computed.
points_normals = TestChamfer.init_pointclouds(
N, P1, P2, device, allow_empty=False
)
# Chamfer with pointclouds as input.
cham_cloud, norm_cloud = chamfer_distance(
points_normals.cloud1,
points_normals.cloud2,
point_reduction=point_reduction,
batch_reduction=batch_reduction,
)
# Chamfer with tensors as input.
cham_tensor, norm_tensor = chamfer_distance(
points_normals.p1,
points_normals.p2,
x_lengths=points_normals.p1_lengths,
y_lengths=points_normals.p2_lengths,
x_normals=points_normals.n1,
y_normals=points_normals.n2,
point_reduction=point_reduction,
batch_reduction=batch_reduction,
)
if point_reduction is None:
cham_tensor_bidirectional = torch.hstack(
[cham_tensor[0], cham_tensor[1]]
)
norm_tensor_bidirectional = torch.hstack(
[norm_tensor[0], norm_tensor[1]]
)
cham_cloud_bidirectional = torch.hstack([cham_cloud[0], cham_cloud[1]])
norm_cloud_bidirectional = torch.hstack([norm_cloud[0], norm_cloud[1]])
self.assertClose(cham_cloud_bidirectional, cham_tensor_bidirectional)
self.assertClose(norm_cloud_bidirectional, norm_tensor_bidirectional)
self._check_gradients(
cham_tensor_bidirectional,
norm_tensor_bidirectional,
cham_cloud_bidirectional,
norm_cloud_bidirectional,
points_normals.cloud1.points_list(),
points_normals.p1,
points_normals.cloud2.points_list(),
points_normals.p2,
points_normals.cloud1.normals_list(),
points_normals.n1,
points_normals.cloud2.normals_list(),
points_normals.n2,
points_normals.p1_lengths,
points_normals.p2_lengths,
)
else:
self.assertClose(cham_cloud, cham_tensor)
self.assertClose(norm_cloud, norm_tensor)
self._check_gradients(
cham_tensor,
norm_tensor,
cham_cloud,
norm_cloud,
points_normals.cloud1.points_list(),
points_normals.p1,
points_normals.cloud2.points_list(),
points_normals.p2,
points_normals.cloud1.normals_list(),
points_normals.n1,
points_normals.cloud2.normals_list(),
points_normals.n2,
points_normals.p1_lengths,
points_normals.p2_lengths,
)
def test_chamfer_pointcloud_object_nonormals(self):
N = 5
P1, P2 = 100, 100
device = get_random_cuda_device()
reductions = [
("sum", "sum"),
("mean", "sum"),
("sum", "mean"),
("mean", "mean"),
("sum", None),
("mean", None),
(None, None),
]
for point_reduction, batch_reduction in reductions:
# Reinitialize all the tensors so that the
# backward pass can be computed.
points_normals = TestChamfer.init_pointclouds(
N, P1, P2, device, allow_empty=False
)
# Chamfer with pointclouds as input.
cham_cloud, _ = chamfer_distance(
points_normals.cloud1,
points_normals.cloud2,
point_reduction=point_reduction,
batch_reduction=batch_reduction,
)
# Chamfer with tensors as input.
cham_tensor, _ = chamfer_distance(
points_normals.p1,
points_normals.p2,
x_lengths=points_normals.p1_lengths,
y_lengths=points_normals.p2_lengths,
point_reduction=point_reduction,
batch_reduction=batch_reduction,
)
if point_reduction is None:
cham_tensor_bidirectional = torch.hstack(
[cham_tensor[0], cham_tensor[1]]
)
cham_cloud_bidirectional = torch.hstack([cham_cloud[0], cham_cloud[1]])
self.assertClose(cham_cloud_bidirectional, cham_tensor_bidirectional)
self._check_gradients(
cham_tensor_bidirectional,
None,
cham_cloud_bidirectional,
None,
points_normals.cloud1.points_list(),
points_normals.p1,
points_normals.cloud2.points_list(),
points_normals.p2,
lengths1=points_normals.p1_lengths,
lengths2=points_normals.p2_lengths,
)
else:
self.assertClose(cham_cloud, cham_tensor)
self._check_gradients(
cham_tensor,
None,
cham_cloud,
None,
points_normals.cloud1.points_list(),
points_normals.p1,
points_normals.cloud2.points_list(),
points_normals.p2,
lengths1=points_normals.p1_lengths,
lengths2=points_normals.p2_lengths,
)
def test_chamfer_point_reduction_mean(self):
"""
Compare output of vectorized chamfer loss with naive implementation
for point_reduction = "mean" and batch_reduction = None.
"""
N, max_P1, max_P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, max_P1, max_P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
p1_normals = points_normals.n1
p2_normals = points_normals.n2
weights = points_normals.weights
p11 = p1.detach().clone()
p22 = p2.detach().clone()
p11.requires_grad = True
p22.requires_grad = True
P1 = p1.shape[1]
P2 = p2.shape[1]
pred_loss, pred_loss_norm = TestChamfer.chamfer_distance_naive(
p1, p2, x_normals=p1_normals, y_normals=p2_normals
)
# point_reduction = "mean".
loss, loss_norm = chamfer_distance(
p11,
p22,
x_normals=p1_normals,
y_normals=p2_normals,
weights=weights,
batch_reduction=None,
point_reduction="mean",
)
pred_loss_mean = pred_loss[0].sum(1) / P1 + pred_loss[1].sum(1) / P2
pred_loss_mean *= weights
self.assertClose(loss, pred_loss_mean)
pred_loss_norm_mean = (
pred_loss_norm[0].sum(1) / P1 + pred_loss_norm[1].sum(1) / P2
)
pred_loss_norm_mean *= weights
self.assertClose(loss_norm, pred_loss_norm_mean)
# Check gradients
self._check_gradients(
loss, loss_norm, pred_loss_mean, pred_loss_norm_mean, p1, p11, p2, p22
)
def test_single_direction_chamfer_point_reduction_mean(self):
"""
Compare output of vectorized chamfer loss with naive implementation
for point_reduction = "mean" and batch_reduction = None.
"""
N, max_P1, max_P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, max_P1, max_P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
p1_normals = points_normals.n1
p2_normals = points_normals.n2
weights = points_normals.weights
p11 = p1.detach().clone()
p22 = p2.detach().clone()
p11.requires_grad = True
p22.requires_grad = True
P1 = p1.shape[1]
pred_loss, pred_loss_norm = TestChamfer.chamfer_distance_naive(
p1, p2, x_normals=p1_normals, y_normals=p2_normals
)
# point_reduction = "mean".
loss, loss_norm = chamfer_distance(
p11,
p22,
x_normals=p1_normals,
y_normals=p2_normals,
weights=weights,
batch_reduction=None,
point_reduction="mean",
single_directional=True,
)
pred_loss_mean = pred_loss[0].sum(1) / P1
pred_loss_mean *= weights
self.assertClose(loss, pred_loss_mean)
pred_loss_norm_mean = pred_loss_norm[0].sum(1) / P1
pred_loss_norm_mean *= weights
self.assertClose(loss_norm, pred_loss_norm_mean)
# Check gradients
self._check_gradients(
loss, loss_norm, pred_loss_mean, pred_loss_norm_mean, p1, p11, p2, p22
)
def test_chamfer_point_reduction_sum(self):
"""
Compare output of vectorized chamfer loss with naive implementation
for point_reduction = "sum" and batch_reduction = None.
"""
N, P1, P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, P1, P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
p1_normals = points_normals.n1
p2_normals = points_normals.n2
weights = points_normals.weights
p11 = p1.detach().clone()
p22 = p2.detach().clone()
p11.requires_grad = True
p22.requires_grad = True
pred_loss, pred_loss_norm = TestChamfer.chamfer_distance_naive(
p1, p2, x_normals=p1_normals, y_normals=p2_normals
)
loss, loss_norm = chamfer_distance(
p11,
p22,
x_normals=p1_normals,
y_normals=p2_normals,
weights=weights,
batch_reduction=None,
point_reduction="sum",
)
pred_loss_sum = pred_loss[0].sum(1) + pred_loss[1].sum(1)
pred_loss_sum *= weights
self.assertClose(loss, pred_loss_sum)
pred_loss_norm_sum = pred_loss_norm[0].sum(1) + pred_loss_norm[1].sum(1)
pred_loss_norm_sum *= weights
self.assertClose(loss_norm, pred_loss_norm_sum)
# Check gradients
self._check_gradients(
loss, loss_norm, pred_loss_sum, pred_loss_norm_sum, p1, p11, p2, p22
)
def test_single_directional_chamfer_point_reduction_sum(self):
"""
Compare output of vectorized single directional chamfer loss with naive implementation
for point_reduction = "sum" and batch_reduction = None.
"""
N, P1, P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, P1, P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
p1_normals = points_normals.n1
p2_normals = points_normals.n2
weights = points_normals.weights
p11 = p1.detach().clone()
p22 = p2.detach().clone()
p11.requires_grad = True
p22.requires_grad = True
pred_loss, pred_loss_norm = TestChamfer.chamfer_distance_naive(
p1, p2, x_normals=p1_normals, y_normals=p2_normals
)
loss, loss_norm = chamfer_distance(
p11,
p22,
x_normals=p1_normals,
y_normals=p2_normals,
weights=weights,
batch_reduction=None,
point_reduction="sum",
single_directional=True,
)
pred_loss_sum = pred_loss[0].sum(1)
pred_loss_sum *= weights
self.assertClose(loss, pred_loss_sum)
pred_loss_norm_sum = pred_loss_norm[0].sum(1)
pred_loss_norm_sum *= weights
self.assertClose(loss_norm, pred_loss_norm_sum)
# Check gradients
self._check_gradients(
loss, loss_norm, pred_loss_sum, pred_loss_norm_sum, p1, p11, p2, p22
)
def test_chamfer_point_reduction_none(self):
"""
Compare output of vectorized chamfer loss with naive implementation
for point_reduction = None and batch_reduction = None.
"""
N, max_P1, max_P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, max_P1, max_P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
p1_normals = points_normals.n1
p2_normals = points_normals.n2
p11 = p1.detach().clone()
p22 = p2.detach().clone()
p11.requires_grad = True
p22.requires_grad = True
pred_loss, pred_loss_norm = TestChamfer.chamfer_distance_naive(
p1, p2, x_normals=p1_normals, y_normals=p2_normals
)
# point_reduction = None
loss, loss_norm = chamfer_distance(
p11,
p22,
x_normals=p1_normals,
y_normals=p2_normals,
batch_reduction=None,
point_reduction=None,
)
loss_bidirectional = torch.hstack([loss[0], loss[1]])
pred_loss_bidirectional = torch.hstack([pred_loss[0], pred_loss[1]])
loss_norm_bidirectional = torch.hstack([loss_norm[0], loss_norm[1]])
pred_loss_norm_bidirectional = torch.hstack(
[pred_loss_norm[0], pred_loss_norm[1]]
)
self.assertClose(loss_bidirectional, pred_loss_bidirectional)
self.assertClose(loss_norm_bidirectional, pred_loss_norm_bidirectional)
# Check gradients
self._check_gradients(
loss_bidirectional,
loss_norm_bidirectional,
pred_loss_bidirectional,
pred_loss_norm_bidirectional,
p1,
p11,
p2,
p22,
)
def test_single_direction_chamfer_point_reduction_none(self):
"""
Compare output of vectorized chamfer loss with naive implementation
for point_reduction = None and batch_reduction = None.
"""
N, max_P1, max_P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, max_P1, max_P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
p1_normals = points_normals.n1
p2_normals = points_normals.n2
p11 = p1.detach().clone()
p22 = p2.detach().clone()
p11.requires_grad = True
p22.requires_grad = True
pred_loss, pred_loss_norm = TestChamfer.chamfer_distance_naive(
p1, p2, x_normals=p1_normals, y_normals=p2_normals
)
# point_reduction = None
loss, loss_norm = chamfer_distance(
p11,
p22,
x_normals=p1_normals,
y_normals=p2_normals,
batch_reduction=None,
point_reduction=None,
single_directional=True,
)
self.assertClose(loss, pred_loss[0])
self.assertClose(loss_norm, pred_loss_norm[0])
# Check gradients
self._check_gradients(
loss, loss_norm, pred_loss[0], pred_loss_norm[0], p1, p11, p2, p22
)
def _check_gradients(
self,
loss,
loss_norm,
pred_loss,
pred_loss_norm,
x1,
x2,
y1,
y2,
xn1=None, # normals
xn2=None, # normals
yn1=None, # normals
yn2=None, # normals
lengths1=None,
lengths2=None,
):
"""
x1 and x2 can have different types based on the leaf node used in the calculation:
e.g. x1 may be a list of tensors whereas x2 is a padded tensor.
This also applies for the pairs: (y1, y2), (xn1, xn2), (yn1, yn2).
"""
grad_loss = torch.rand(loss.shape, device=loss.device, dtype=loss.dtype)
# Loss for normals is optional. Iniitalize to 0.
norm_loss_term = pred_norm_loss_term = 0.0
if loss_norm is not None and pred_loss_norm is not None:
grad_normals = torch.rand(
loss_norm.shape, device=loss.device, dtype=loss.dtype
)
norm_loss_term = loss_norm * grad_normals
pred_norm_loss_term = pred_loss_norm * grad_normals
l1 = (loss * grad_loss) + norm_loss_term
l1.sum().backward()
l2 = (pred_loss * grad_loss) + pred_norm_loss_term
l2.sum().backward()
self._check_grad_by_type(x1, x2, lengths1)
self._check_grad_by_type(y1, y2, lengths2)
# If leaf nodes for normals are passed in, check their gradients.
if all(n is not None for n in [xn1, xn2, yn1, yn2]):
self._check_grad_by_type(xn1, xn2, lengths1)
self._check_grad_by_type(yn1, yn2, lengths2)
def _check_grad_by_type(self, x1, x2, lengths=None):
"""
x1 and x2 can be of different types e.g. list or tensor - compare appropriately
based on the types.
"""
error_msg = "All values for gradient checks must be tensors or lists of tensors"
if all(isinstance(p, list) for p in [x1, x2]):
# Lists of tensors
for i in range(len(x1)):
self.assertClose(x1[i].grad, x2[i].grad)
elif isinstance(x1, list) and torch.is_tensor(x2):
self.assertIsNotNone(lengths) # lengths is required
# List of tensors vs padded tensor
for i in range(len(x1)):
self.assertClose(x1[i].grad, x2.grad[i, : lengths[i]], atol=1e-7)
self.assertTrue(x2.grad[i, lengths[i] :].sum().item() == 0.0)
elif all(torch.is_tensor(p) for p in [x1, x2]):
# Two tensors
self.assertClose(x1.grad, x2.grad)
else:
raise ValueError(error_msg)
def test_chamfer_joint_reduction(self):
"""
Compare output of vectorized chamfer loss with naive implementation
when batch_reduction in ["mean", "sum"] and
point_reduction in ["mean", "sum"].
"""
N, max_P1, max_P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, max_P1, max_P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
p1_normals = points_normals.n1
p2_normals = points_normals.n2
weights = points_normals.weights
P1 = p1.shape[1]
P2 = p2.shape[1]
pred_loss, pred_loss_norm = TestChamfer.chamfer_distance_naive(
p1, p2, x_normals=p1_normals, y_normals=p2_normals
)
# batch_reduction = "sum", point_reduction = "sum".
loss, loss_norm = chamfer_distance(
p1,
p2,
x_normals=p1_normals,
y_normals=p2_normals,
weights=weights,
batch_reduction="sum",
point_reduction="sum",
)
pred_loss[0] *= weights.view(N, 1)
pred_loss[1] *= weights.view(N, 1)
pred_loss_sum = pred_loss[0].sum(1) + pred_loss[1].sum(1) # point sum
pred_loss_sum = pred_loss_sum.sum() # batch sum
self.assertClose(loss, pred_loss_sum)
pred_loss_norm[0] *= weights.view(N, 1)
pred_loss_norm[1] *= weights.view(N, 1)
pred_loss_norm_sum = pred_loss_norm[0].sum(1) + pred_loss_norm[1].sum(
1
) # point sum.
pred_loss_norm_sum = pred_loss_norm_sum.sum() # batch sum
self.assertClose(loss_norm, pred_loss_norm_sum)
# batch_reduction = "mean", point_reduction = "sum".
loss, loss_norm = chamfer_distance(
p1,
p2,
x_normals=p1_normals,
y_normals=p2_normals,
weights=weights,
batch_reduction="mean",
point_reduction="sum",
)
pred_loss_sum /= weights.sum()
self.assertClose(loss, pred_loss_sum)
pred_loss_norm_sum /= weights.sum()
self.assertClose(loss_norm, pred_loss_norm_sum)
# batch_reduction = "sum", point_reduction = "mean".
loss, loss_norm = chamfer_distance(
p1,
p2,
x_normals=p1_normals,
y_normals=p2_normals,
weights=weights,
batch_reduction="sum",
point_reduction="mean",
)
pred_loss_mean = pred_loss[0].sum(1) / P1 + pred_loss[1].sum(1) / P2
pred_loss_mean = pred_loss_mean.sum()
self.assertClose(loss, pred_loss_mean)
pred_loss_norm_mean = (
pred_loss_norm[0].sum(1) / P1 + pred_loss_norm[1].sum(1) / P2
)
pred_loss_norm_mean = pred_loss_norm_mean.sum()
self.assertClose(loss_norm, pred_loss_norm_mean)
# batch_reduction = "mean", point_reduction = "mean". This is the default.
loss, loss_norm = chamfer_distance(
p1,
p2,
x_normals=p1_normals,
y_normals=p2_normals,
weights=weights,
batch_reduction="mean",
point_reduction="mean",
)
pred_loss_mean /= weights.sum()
self.assertClose(loss, pred_loss_mean)
pred_loss_norm_mean /= weights.sum()
self.assertClose(loss_norm, pred_loss_norm_mean)
# Error when batch_reduction is not in ["mean", "sum"] or None.
with self.assertRaisesRegex(ValueError, "batch_reduction must be one of"):
chamfer_distance(p1, p2, weights=weights, batch_reduction="max")
# Error when point_reduction is not in ["mean", "sum"] or None.
with self.assertRaisesRegex(ValueError, "point_reduction must be one of"):
chamfer_distance(p1, p2, weights=weights, point_reduction="max")
def test_incorrect_weights(self):
N, P1, P2 = 16, 64, 128
device = get_random_cuda_device()
p1 = torch.rand(
(N, P1, 3), dtype=torch.float32, device=device, requires_grad=True
)
p2 = torch.rand(
(N, P2, 3), dtype=torch.float32, device=device, requires_grad=True
)
weights = torch.zeros((N,), dtype=torch.float32, device=device)
loss, loss_norm = chamfer_distance(
p1, p2, weights=weights, batch_reduction="mean"
)
self.assertClose(loss.cpu(), torch.zeros(()))
self.assertTrue(loss.requires_grad)
self.assertClose(loss_norm.cpu(), torch.zeros(()))
self.assertTrue(loss_norm.requires_grad)
loss, loss_norm = chamfer_distance(
p1, p2, weights=weights, batch_reduction=None
)
self.assertClose(loss.cpu(), torch.zeros((N, N)))
self.assertTrue(loss.requires_grad)
self.assertClose(loss_norm.cpu(), torch.zeros((N, N)))
self.assertTrue(loss_norm.requires_grad)
weights = torch.ones((N,), dtype=torch.float32, device=device) * -1
with self.assertRaises(ValueError):
loss, loss_norm = chamfer_distance(p1, p2, weights=weights)
weights = torch.zeros((N - 1,), dtype=torch.float32, device=device)
with self.assertRaises(ValueError):
loss, loss_norm = chamfer_distance(p1, p2, weights=weights)
def test_incorrect_inputs(self):
N, P1, P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, P1, P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
p1_normals = points_normals.n1
# Normals of wrong shape
with self.assertRaisesRegex(ValueError, "Expected normals to be of shape"):
chamfer_distance(p1, p2, x_normals=p1_normals[None])
# Points of wrong shape
with self.assertRaisesRegex(ValueError, "Expected points to be of shape"):
chamfer_distance(p1[None], p2)
# Lengths of wrong shape
with self.assertRaisesRegex(ValueError, "Expected lengths to be of shape"):
chamfer_distance(p1, p2, x_lengths=torch.tensor([1, 2, 3], device=device))
# Points are not a tensor or Pointclouds
with self.assertRaisesRegex(ValueError, "Pointclouds objects or torch.Tensor"):
chamfer_distance(x=[1, 1, 1], y=[1, 1, 1])
def test_invalid_norm(self):
N, P1, P2 = 7, 10, 18
device = get_random_cuda_device()
points_normals = TestChamfer.init_pointclouds(N, P1, P2, device)
p1 = points_normals.p1
p2 = points_normals.p2
with self.assertRaisesRegex(ValueError, "Support for 1 or 2 norm."):
chamfer_distance(p1, p2, norm=0)
with self.assertRaisesRegex(ValueError, "Support for 1 or 2 norm."):
chamfer_distance(p1, p2, norm=3)
def test_empty_clouds(self):
# Check that point_reduction doesn't divide by zero
points1 = Pointclouds(points=[torch.zeros(0, 3), torch.zeros(10, 3)])
points2 = Pointclouds(points=torch.ones(2, 40, 3))
loss, _ = chamfer_distance(points1, points2, batch_reduction=None)
self.assertClose(loss, torch.tensor([0.0, 6.0]))
# Check that batch_reduction doesn't divide by zero
loss2, _ = chamfer_distance(Pointclouds([]), Pointclouds([]))
self.assertClose(loss2, torch.tensor(0.0))
@staticmethod
def chamfer_with_init(
batch_size: int,
P1: int,
P2: int,
return_normals: bool,
homogeneous: bool,
device="cpu",
):
points_normals = TestChamfer.init_pointclouds(batch_size, P1, P2, device=device)
l1 = points_normals.p1_lengths
l2 = points_normals.p2_lengths
if homogeneous:
# Set lengths to None so in Chamfer it assumes
# there is no padding.
l1 = l2 = None
torch.cuda.synchronize()
def loss():
loss, loss_normals = chamfer_distance(
points_normals.p1,
points_normals.p2,
x_lengths=l1,
y_lengths=l2,
x_normals=points_normals.n1,
y_normals=points_normals.n2,
weights=points_normals.weights,
)
torch.cuda.synchronize()
return loss
@staticmethod
def chamfer_naive_with_init(
batch_size: int, P1: int, P2: int, return_normals: bool, device="cpu"
):
points_normals = TestChamfer.init_pointclouds(batch_size, P1, P2, device=device)
torch.cuda.synchronize()
def loss():
loss, loss_normals = TestChamfer.chamfer_distance_naive(
points_normals.p1,
points_normals.p2,
x_normals=points_normals.n1,
y_normals=points_normals.n2,
)
torch.cuda.synchronize()
return loss