| """3D vertex candidate generation in the style of the S23DR 2025 winner. |
| |
| The original baseline (and our v11) detects 2D corners on gestalt images |
| then unprojects them via depth — which introduces 30–100 cm of error from |
| the monocular depth ambiguity. |
| |
| The winner generates candidates **directly in 3D** by selecting the COLMAP |
| points whose projection lands inside a gestalt corner-class blob: |
| |
| 1. Per view, per gestalt corner class (apex, eave_end_point, flashing_end_point): |
| a. Find connected components of the class mask. |
| b. For each blob, iteratively binary-dilate it until at least |
| ``min_colmap_points`` projected COLMAP points fall inside. |
| c. Record those COLMAP point indices as a "cluster" tagged with class+view. |
| |
| 2. Globally: |
| a. Take the union of all clustered point indices. |
| b. For each cluster compute its 3D centroid, then redefine it as all |
| filtered points within ``cluster_radius`` of that centroid. |
| c. Merge any pair of clusters whose smaller member shares >50% of its |
| points with the other. |
| |
| The output is a list of 3D vertex candidates with sub-decimetre accuracy |
| (limited only by COLMAP triangulation precision). |
| |
| Entry point: ``generate_winner_candidates(entry)``. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import numpy as np |
| import cv2 |
| from dataclasses import dataclass |
|
|
| from hoho2025.example_solutions import convert_entry_to_human_readable |
| from hoho2025.color_mappings import gestalt_color_mapping |
|
|
| try: |
| from mvs_utils import collect_views, project_world_to_image |
| except ImportError: |
| from submission.mvs_utils import collect_views, project_world_to_image |
|
|
|
|
| VERTEX_CLASSES = ['apex', 'eave_end_point', 'flashing_end_point'] |
|
|
|
|
| @dataclass |
| class WinnerCandidate: |
| """A 3D vertex candidate produced by the winner-2025 algorithm.""" |
| centroid: np.ndarray |
| point_indices: set[int] |
| classes: set[str] |
| view_count: int |
|
|
|
|
| def _project_colmap_to_view(colmap_xyz: np.ndarray, P: np.ndarray, W: int, H: int): |
| """Return (uv int, in_bounds_mask, in_front_mask).""" |
| uv, z = project_world_to_image(P, colmap_xyz) |
| in_front = z > 0 |
| uv_int = np.round(uv).astype(np.int64) |
| in_bounds = ( |
| (uv_int[:, 0] >= 0) & (uv_int[:, 0] < W) & |
| (uv_int[:, 1] >= 0) & (uv_int[:, 1] < H) |
| ) |
| return uv_int, in_bounds & in_front |
|
|
|
|
| def _expand_blob_to_min_colmap( |
| blob_mask: np.ndarray, |
| uv_int: np.ndarray, |
| valid_mask: np.ndarray, |
| min_points: int = 5, |
| max_iters: int = 20, |
| ) -> tuple[np.ndarray, np.ndarray]: |
| """Iteratively dilate a 2D blob mask until at least ``min_points`` of the |
| valid projected COLMAP points fall inside it. |
| |
| Returns (final_mask, point_indices_inside). |
| """ |
| H, W = blob_mask.shape |
| valid_uv = uv_int[valid_mask] |
| valid_idx = np.where(valid_mask)[0] |
|
|
| def hit_indices(mask): |
| |
| |
| |
| h_inside = mask[valid_uv[:, 1], valid_uv[:, 0]] > 0 |
| return valid_idx[h_inside] |
|
|
| inside = hit_indices(blob_mask) |
| if len(inside) >= min_points: |
| return blob_mask, inside |
|
|
| kernel = np.ones((3, 3), np.uint8) |
| cur = blob_mask.copy() |
| for _ in range(max_iters): |
| cur = cv2.dilate(cur, kernel, iterations=1) |
| inside = hit_indices(cur) |
| if len(inside) >= min_points: |
| return cur, inside |
| return cur, inside |
|
|
|
|
| def _per_view_clusters( |
| gest_np: np.ndarray, |
| colmap_xyz: np.ndarray, |
| P: np.ndarray, |
| W: int, H: int, |
| view_id: str, |
| min_colmap_points: int = 5, |
| min_blob_area: int = 4, |
| ) -> list[tuple[set[int], str, str]]: |
| """Yield clusters from a single view. |
| |
| Returns list of (point_indices_set, gestalt_class, view_id). |
| """ |
| uv_int, valid = _project_colmap_to_view(colmap_xyz, P, W, H) |
| out: list[tuple[set[int], str, str]] = [] |
| if not np.any(valid): |
| return out |
|
|
| for v_class in VERTEX_CLASSES: |
| color = np.array(gestalt_color_mapping[v_class]) |
| mask = cv2.inRange(gest_np, color - 0.5, color + 0.5) |
| if mask.sum() == 0: |
| continue |
| n_lbl, lbl, stats, _ = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) |
| for i in range(1, n_lbl): |
| area = int(stats[i, cv2.CC_STAT_AREA]) |
| if area < min_blob_area: |
| continue |
| blob_mask = (lbl == i).astype(np.uint8) |
| _, inside = _expand_blob_to_min_colmap( |
| blob_mask, uv_int, valid, |
| min_points=min_colmap_points, |
| ) |
| if len(inside) >= min_colmap_points: |
| out.append((set(inside.tolist()), v_class, view_id)) |
| return out |
|
|
|
|
| def _merge_clusters( |
| raw_clusters: list[tuple[set[int], str, str]], |
| colmap_xyz: np.ndarray, |
| cluster_radius: float = 0.5, |
| overlap_threshold: float = 0.5, |
| ) -> list[WinnerCandidate]: |
| """Global merge step. |
| |
| 1. Filter the global cloud to points that appear in at least one cluster. |
| 2. For each cluster: centroid → all filtered points within cluster_radius. |
| 3. Merge any pair sharing >50% of its points (smaller side). |
| """ |
| if not raw_clusters: |
| return [] |
|
|
| used_idx = set() |
| for pts, _, _ in raw_clusters: |
| used_idx.update(pts) |
| used_idx_arr = np.array(sorted(used_idx), dtype=np.int64) |
| if len(used_idx_arr) == 0: |
| return [] |
| filtered_xyz = colmap_xyz[used_idx_arr] |
| |
| g_to_f = -np.ones(len(colmap_xyz), dtype=np.int64) |
| g_to_f[used_idx_arr] = np.arange(len(used_idx_arr)) |
|
|
| |
| from scipy.spatial import cKDTree |
| tree = cKDTree(filtered_xyz) |
|
|
| |
| candidates: list[WinnerCandidate] = [] |
| for pts, cls, vid in raw_clusters: |
| if not pts: |
| continue |
| pts_arr = np.array([p for p in pts if g_to_f[p] >= 0]) |
| if len(pts_arr) == 0: |
| continue |
| local = filtered_xyz[g_to_f[pts_arr]] |
| centroid = local.mean(axis=0) |
| |
| nbr_f_idx = tree.query_ball_point(centroid, cluster_radius) |
| if not nbr_f_idx: |
| continue |
| nbr_global = set(int(used_idx_arr[i]) for i in nbr_f_idx) |
| candidates.append(WinnerCandidate( |
| centroid=centroid, |
| point_indices=nbr_global, |
| classes={cls}, |
| view_count=1, |
| )) |
|
|
| if not candidates: |
| return [] |
|
|
| |
| changed = True |
| while changed: |
| changed = False |
| i = 0 |
| while i < len(candidates): |
| j = i + 1 |
| while j < len(candidates): |
| a, b = candidates[i], candidates[j] |
| inter = len(a.point_indices & b.point_indices) |
| smaller = min(len(a.point_indices), len(b.point_indices)) |
| if smaller > 0 and inter / smaller > overlap_threshold: |
| |
| merged_pts = a.point_indices | b.point_indices |
| merged_xyz = colmap_xyz[np.array(sorted(merged_pts))] |
| a.centroid = merged_xyz.mean(axis=0) |
| a.point_indices = merged_pts |
| a.classes |= b.classes |
| a.view_count = a.view_count + b.view_count |
| candidates.pop(j) |
| changed = True |
| else: |
| j += 1 |
| i += 1 |
|
|
| return candidates |
|
|
|
|
| def generate_winner_candidates( |
| entry, |
| min_colmap_points: int = 5, |
| cluster_radius: float = 0.5, |
| overlap_threshold: float = 0.5, |
| min_blob_area: int = 4, |
| ) -> tuple[list[WinnerCandidate], dict]: |
| """Run the winner-2025 3D vertex candidate generator. |
| |
| Returns (candidates, good_entry). |
| """ |
| good = convert_entry_to_human_readable(entry) |
| colmap_rec = good.get('colmap') or good.get('colmap_binary') |
| if colmap_rec is None: |
| return [], good |
|
|
| colmap_xyz = np.array( |
| [p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64 |
| ) |
| if len(colmap_xyz) == 0: |
| return [], good |
|
|
| views = collect_views(colmap_rec, good['image_ids']) |
| raw_clusters: list[tuple[set[int], str, str]] = [] |
|
|
| for gest, depth, img_id in zip(good['gestalt'], good['depth'], good['image_ids']): |
| info = views.get(img_id) |
| if info is None: |
| continue |
| depth_np = np.array(depth) |
| H, W = depth_np.shape[:2] |
| gest_np = np.array(gest.resize((W, H))).astype(np.uint8) |
| view_clusters = _per_view_clusters( |
| gest_np, colmap_xyz, info['P'], W, H, img_id, |
| min_colmap_points=min_colmap_points, |
| min_blob_area=min_blob_area, |
| ) |
| raw_clusters.extend(view_clusters) |
|
|
| candidates = _merge_clusters( |
| raw_clusters, colmap_xyz, |
| cluster_radius=cluster_radius, |
| overlap_threshold=overlap_threshold, |
| ) |
| return candidates, good |
|
|