"""3D vertex candidate generation in the style of the S23DR 2025 winner. The original baseline (and our v11) detects 2D corners on gestalt images then unprojects them via depth — which introduces 30–100 cm of error from the monocular depth ambiguity. The winner generates candidates **directly in 3D** by selecting the COLMAP points whose projection lands inside a gestalt corner-class blob: 1. Per view, per gestalt corner class (apex, eave_end_point, flashing_end_point): a. Find connected components of the class mask. b. For each blob, iteratively binary-dilate it until at least ``min_colmap_points`` projected COLMAP points fall inside. c. Record those COLMAP point indices as a "cluster" tagged with class+view. 2. Globally: a. Take the union of all clustered point indices. b. For each cluster compute its 3D centroid, then redefine it as all filtered points within ``cluster_radius`` of that centroid. c. Merge any pair of clusters whose smaller member shares >50% of its points with the other. The output is a list of 3D vertex candidates with sub-decimetre accuracy (limited only by COLMAP triangulation precision). Entry point: ``generate_winner_candidates(entry)``. """ from __future__ import annotations import numpy as np import cv2 from dataclasses import dataclass from hoho2025.example_solutions import convert_entry_to_human_readable from hoho2025.color_mappings import gestalt_color_mapping try: from mvs_utils import collect_views, project_world_to_image except ImportError: from submission.mvs_utils import collect_views, project_world_to_image VERTEX_CLASSES = ['apex', 'eave_end_point', 'flashing_end_point'] @dataclass class WinnerCandidate: """A 3D vertex candidate produced by the winner-2025 algorithm.""" centroid: np.ndarray # (3,) world coords point_indices: set[int] # COLMAP point3D indices it owns classes: set[str] # gestalt vertex classes that voted for it view_count: int # how many views the cluster came from def _project_colmap_to_view(colmap_xyz: np.ndarray, P: np.ndarray, W: int, H: int): """Return (uv int, in_bounds_mask, in_front_mask).""" uv, z = project_world_to_image(P, colmap_xyz) in_front = z > 0 uv_int = np.round(uv).astype(np.int64) in_bounds = ( (uv_int[:, 0] >= 0) & (uv_int[:, 0] < W) & (uv_int[:, 1] >= 0) & (uv_int[:, 1] < H) ) return uv_int, in_bounds & in_front def _expand_blob_to_min_colmap( blob_mask: np.ndarray, uv_int: np.ndarray, valid_mask: np.ndarray, min_points: int = 5, max_iters: int = 20, ) -> tuple[np.ndarray, np.ndarray]: """Iteratively dilate a 2D blob mask until at least ``min_points`` of the valid projected COLMAP points fall inside it. Returns (final_mask, point_indices_inside). """ H, W = blob_mask.shape valid_uv = uv_int[valid_mask] valid_idx = np.where(valid_mask)[0] def hit_indices(mask): # Indices into valid_uv that fall inside the mask. # Critical: cast to bool — masks are uint8 0/255 and integer # indexing would otherwise be silently wrong (fancy indexing). h_inside = mask[valid_uv[:, 1], valid_uv[:, 0]] > 0 return valid_idx[h_inside] inside = hit_indices(blob_mask) if len(inside) >= min_points: return blob_mask, inside kernel = np.ones((3, 3), np.uint8) cur = blob_mask.copy() for _ in range(max_iters): cur = cv2.dilate(cur, kernel, iterations=1) inside = hit_indices(cur) if len(inside) >= min_points: return cur, inside return cur, inside def _per_view_clusters( gest_np: np.ndarray, colmap_xyz: np.ndarray, P: np.ndarray, W: int, H: int, view_id: str, min_colmap_points: int = 5, min_blob_area: int = 4, ) -> list[tuple[set[int], str, str]]: """Yield clusters from a single view. Returns list of (point_indices_set, gestalt_class, view_id). """ uv_int, valid = _project_colmap_to_view(colmap_xyz, P, W, H) out: list[tuple[set[int], str, str]] = [] if not np.any(valid): return out for v_class in VERTEX_CLASSES: color = np.array(gestalt_color_mapping[v_class]) mask = cv2.inRange(gest_np, color - 0.5, color + 0.5) if mask.sum() == 0: continue n_lbl, lbl, stats, _ = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) for i in range(1, n_lbl): area = int(stats[i, cv2.CC_STAT_AREA]) if area < min_blob_area: continue blob_mask = (lbl == i).astype(np.uint8) _, inside = _expand_blob_to_min_colmap( blob_mask, uv_int, valid, min_points=min_colmap_points, ) if len(inside) >= min_colmap_points: out.append((set(inside.tolist()), v_class, view_id)) return out def _merge_clusters( raw_clusters: list[tuple[set[int], str, str]], colmap_xyz: np.ndarray, cluster_radius: float = 0.5, overlap_threshold: float = 0.5, ) -> list[WinnerCandidate]: """Global merge step. 1. Filter the global cloud to points that appear in at least one cluster. 2. For each cluster: centroid → all filtered points within cluster_radius. 3. Merge any pair sharing >50% of its points (smaller side). """ if not raw_clusters: return [] used_idx = set() for pts, _, _ in raw_clusters: used_idx.update(pts) used_idx_arr = np.array(sorted(used_idx), dtype=np.int64) if len(used_idx_arr) == 0: return [] filtered_xyz = colmap_xyz[used_idx_arr] # Map global → filtered index for fast neighbour query g_to_f = -np.ones(len(colmap_xyz), dtype=np.int64) g_to_f[used_idx_arr] = np.arange(len(used_idx_arr)) # Build KDTree on filtered cloud from scipy.spatial import cKDTree tree = cKDTree(filtered_xyz) # Step 2: redefine each cluster by ball query around its centroid candidates: list[WinnerCandidate] = [] for pts, cls, vid in raw_clusters: if not pts: continue pts_arr = np.array([p for p in pts if g_to_f[p] >= 0]) if len(pts_arr) == 0: continue local = filtered_xyz[g_to_f[pts_arr]] centroid = local.mean(axis=0) # Ball query in 0.5 m nbr_f_idx = tree.query_ball_point(centroid, cluster_radius) if not nbr_f_idx: continue nbr_global = set(int(used_idx_arr[i]) for i in nbr_f_idx) candidates.append(WinnerCandidate( centroid=centroid, point_indices=nbr_global, classes={cls}, view_count=1, )) if not candidates: return [] # Step 3: greedy merge by overlap > 50% changed = True while changed: changed = False i = 0 while i < len(candidates): j = i + 1 while j < len(candidates): a, b = candidates[i], candidates[j] inter = len(a.point_indices & b.point_indices) smaller = min(len(a.point_indices), len(b.point_indices)) if smaller > 0 and inter / smaller > overlap_threshold: # Merge b into a merged_pts = a.point_indices | b.point_indices merged_xyz = colmap_xyz[np.array(sorted(merged_pts))] a.centroid = merged_xyz.mean(axis=0) a.point_indices = merged_pts a.classes |= b.classes a.view_count = a.view_count + b.view_count candidates.pop(j) changed = True else: j += 1 i += 1 return candidates def generate_winner_candidates( entry, min_colmap_points: int = 5, cluster_radius: float = 0.5, overlap_threshold: float = 0.5, min_blob_area: int = 4, ) -> tuple[list[WinnerCandidate], dict]: """Run the winner-2025 3D vertex candidate generator. Returns (candidates, good_entry). """ good = convert_entry_to_human_readable(entry) colmap_rec = good.get('colmap') or good.get('colmap_binary') if colmap_rec is None: return [], good colmap_xyz = np.array( [p.xyz for p in colmap_rec.points3D.values()], dtype=np.float64 ) if len(colmap_xyz) == 0: return [], good views = collect_views(colmap_rec, good['image_ids']) raw_clusters: list[tuple[set[int], str, str]] = [] for gest, depth, img_id in zip(good['gestalt'], good['depth'], good['image_ids']): info = views.get(img_id) if info is None: continue depth_np = np.array(depth) H, W = depth_np.shape[:2] gest_np = np.array(gest.resize((W, H))).astype(np.uint8) view_clusters = _per_view_clusters( gest_np, colmap_xyz, info['P'], W, H, img_id, min_colmap_points=min_colmap_points, min_blob_area=min_blob_area, ) raw_clusters.extend(view_clusters) candidates = _merge_clusters( raw_clusters, colmap_xyz, cluster_radius=cluster_radius, overlap_threshold=overlap_threshold, ) return candidates, good