subm / winner_candidates.py
Neritz's picture
Add handcrafted_submission_2026 contents
5b6e3d8 verified
"""3D vertex candidate generation in the style of the S23DR 2025 winner.
The original baseline (and our v11) detects 2D corners on gestalt images
then unprojects them via depth — which introduces 30–100 cm of error from
the monocular depth ambiguity.
The winner generates candidates **directly in 3D** by selecting the COLMAP
points whose projection lands inside a gestalt corner-class blob:
1. Per view, per gestalt corner class (apex, eave_end_point, flashing_end_point):
a. Find connected components of the class mask.
b. For each blob, iteratively binary-dilate it until at least
``min_colmap_points`` projected COLMAP points fall inside.
c. Record those COLMAP point indices as a "cluster" tagged with class+view.
2. Globally:
a. Take the union of all clustered point indices.
b. For each cluster compute its 3D centroid, then redefine it as all
filtered points within ``cluster_radius`` of that centroid.
c. Merge any pair of clusters whose smaller member shares >50% of its
points with the other.
The output is a list of 3D vertex candidates with sub-decimetre accuracy
(limited only by COLMAP triangulation precision).
Entry point: ``generate_winner_candidates(entry)``.
"""
from __future__ import annotations
import numpy as np
import cv2
from dataclasses import dataclass
from hoho2025.example_solutions import convert_entry_to_human_readable
from hoho2025.color_mappings import gestalt_color_mapping
try:
from mvs_utils import collect_views, project_world_to_image
except ImportError:
from submission.mvs_utils import collect_views, project_world_to_image
VERTEX_CLASSES = ['apex', 'eave_end_point', 'flashing_end_point']
@dataclass
class WinnerCandidate:
"""A 3D vertex candidate produced by the winner-2025 algorithm."""
centroid: np.ndarray # (3,) world coords
point_indices: set[int] # COLMAP point3D indices it owns
classes: set[str] # gestalt vertex classes that voted for it
view_count: int # how many views the cluster came from
def _project_colmap_to_view(colmap_xyz: np.ndarray, P: np.ndarray, W: int, H: int):
"""Return (uv int, in_bounds_mask, in_front_mask)."""
uv, z = project_world_to_image(P, colmap_xyz)
in_front = z > 0
uv_int = np.round(uv).astype(np.int64)
in_bounds = (
(uv_int[:, 0] >= 0) & (uv_int[:, 0] < W) &
(uv_int[:, 1] >= 0) & (uv_int[:, 1] < H)
)
return uv_int, in_bounds & in_front
def _expand_blob_to_min_colmap(
blob_mask: np.ndarray,
uv_int: np.ndarray,
valid_mask: np.ndarray,
min_points: int = 5,
max_iters: int = 20,
) -> tuple[np.ndarray, np.ndarray]:
"""Iteratively dilate a 2D blob mask until at least ``min_points`` of the
valid projected COLMAP points fall inside it.
Returns (final_mask, point_indices_inside).
"""
H, W = blob_mask.shape
valid_uv = uv_int[valid_mask]
valid_idx = np.where(valid_mask)[0]
def hit_indices(mask):
# Indices into valid_uv that fall inside the mask.
# Critical: cast to bool — masks are uint8 0/255 and integer
# indexing would otherwise be silently wrong (fancy indexing).
h_inside = mask[valid_uv[:, 1], valid_uv[:, 0]] > 0
return valid_idx[h_inside]
inside = hit_indices(blob_mask)
if len(inside) >= min_points:
return blob_mask, inside
kernel = np.ones((3, 3), np.uint8)
cur = blob_mask.copy()
for _ in range(max_iters):
cur = cv2.dilate(cur, kernel, iterations=1)
inside = hit_indices(cur)
if len(inside) >= min_points:
return cur, inside
return cur, inside
def _per_view_clusters(
gest_np: np.ndarray,
colmap_xyz: np.ndarray,
P: np.ndarray,
W: int, H: int,
view_id: str,
min_colmap_points: int = 5,
min_blob_area: int = 4,
) -> list[tuple[set[int], str, str]]:
"""Yield clusters from a single view.
Returns list of (point_indices_set, gestalt_class, view_id).
"""
uv_int, valid = _project_colmap_to_view(colmap_xyz, P, W, H)
out: list[tuple[set[int], str, str]] = []
if not np.any(valid):
return out
for v_class in VERTEX_CLASSES:
color = np.array(gestalt_color_mapping[v_class])
mask = cv2.inRange(gest_np, color - 0.5, color + 0.5)
if mask.sum() == 0:
continue
n_lbl, lbl, stats, _ = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S)
for i in range(1, n_lbl):
area = int(stats[i, cv2.CC_STAT_AREA])
if area < min_blob_area:
continue
blob_mask = (lbl == i).astype(np.uint8)
_, inside = _expand_blob_to_min_colmap(
blob_mask, uv_int, valid,
min_points=min_colmap_points,
)
if len(inside) >= min_colmap_points:
out.append((set(inside.tolist()), v_class, view_id))
return out
def _merge_clusters(
raw_clusters: list[tuple[set[int], str, str]],
colmap_xyz: np.ndarray,
cluster_radius: float = 0.5,
overlap_threshold: float = 0.5,
) -> list[WinnerCandidate]:
"""Global merge step.
1. Filter the global cloud to points that appear in at least one cluster.
2. For each cluster: centroid → all filtered points within cluster_radius.
3. Merge any pair sharing >50% of its points (smaller side).
"""
if not raw_clusters:
return []
used_idx = set()
for pts, _, _ in raw_clusters:
used_idx.update(pts)
used_idx_arr = np.array(sorted(used_idx), dtype=np.int64)
if len(used_idx_arr) == 0:
return []
filtered_xyz = colmap_xyz[used_idx_arr]
# Map global → filtered index for fast neighbour query
g_to_f = -np.ones(len(colmap_xyz), dtype=np.int64)
g_to_f[used_idx_arr] = np.arange(len(used_idx_arr))
# Build KDTree on filtered cloud
from scipy.spatial import cKDTree
tree = cKDTree(filtered_xyz)
# Step 2: redefine each cluster by ball query around its centroid
candidates: list[WinnerCandidate] = []
for pts, cls, vid in raw_clusters:
if not pts:
continue
pts_arr = np.array([p for p in pts if g_to_f[p] >= 0])
if len(pts_arr) == 0:
continue
local = filtered_xyz[g_to_f[pts_arr]]
centroid = local.mean(axis=0)
# Ball query in 0.5 m
nbr_f_idx = tree.query_ball_point(centroid, cluster_radius)
if not nbr_f_idx:
continue
nbr_global = set(int(used_idx_arr[i]) for i in nbr_f_idx)
candidates.append(WinnerCandidate(
centroid=centroid,
point_indices=nbr_global,
classes={cls},
view_count=1,
))
if not candidates:
return []
# Step 3: greedy merge by overlap > 50%
changed = True
while changed:
changed = False
i = 0
while i < len(candidates):
j = i + 1
while j < len(candidates):
a, b = candidates[i], candidates[j]
inter = len(a.point_indices & b.point_indices)
smaller = min(len(a.point_indices), len(b.point_indices))
if smaller > 0 and inter / smaller > overlap_threshold:
# Merge b into a
merged_pts = a.point_indices | b.point_indices
merged_xyz = colmap_xyz[np.array(sorted(merged_pts))]
a.centroid = merged_xyz.mean(axis=0)
a.point_indices = merged_pts
a.classes |= b.classes
a.view_count = a.view_count + b.view_count
candidates.pop(j)
changed = True
else:
j += 1
i += 1
return candidates
def generate_winner_candidates(
entry,
min_colmap_points: int = 5,
cluster_radius: float = 0.5,
overlap_threshold: float = 0.5,
min_blob_area: int = 4,
) -> tuple[list[WinnerCandidate], dict]:
"""Run the winner-2025 3D vertex candidate generator.
Returns (candidates, good_entry).
"""
good = convert_entry_to_human_readable(entry)
colmap_rec = good.get('colmap') or good.get('colmap_binary')
if colmap_rec is None:
return [], good
colmap_xyz = np.array(
[p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64
)
if len(colmap_xyz) == 0:
return [], good
views = collect_views(colmap_rec, good['image_ids'])
raw_clusters: list[tuple[set[int], str, str]] = []
for gest, depth, img_id in zip(good['gestalt'], good['depth'], good['image_ids']):
info = views.get(img_id)
if info is None:
continue
depth_np = np.array(depth)
H, W = depth_np.shape[:2]
gest_np = np.array(gest.resize((W, H))).astype(np.uint8)
view_clusters = _per_view_clusters(
gest_np, colmap_xyz, info['P'], W, H, img_id,
min_colmap_points=min_colmap_points,
min_blob_area=min_blob_area,
)
raw_clusters.extend(view_clusters)
candidates = _merge_clusters(
raw_clusters, colmap_xyz,
cluster_radius=cluster_radius,
overlap_threshold=overlap_threshold,
)
return candidates, good