Spaces:
Running
Running
import argparse | |
import contextlib | |
import io | |
import sys | |
from pathlib import Path | |
from typing import Any, Dict, List, Optional | |
import numpy as np | |
import pycolmap | |
from tqdm import tqdm | |
from . import logger | |
from .utils.database import COLMAPDatabase | |
from .utils.geometry import compute_epipolar_errors | |
from .utils.io import get_keypoints, get_matches | |
from .utils.parsers import parse_retrieval | |
class OutputCapture: | |
def __init__(self, verbose: bool): | |
self.verbose = verbose | |
def __enter__(self): | |
if not self.verbose: | |
self.capture = contextlib.redirect_stdout(io.StringIO()) | |
self.out = self.capture.__enter__() | |
def __exit__(self, exc_type, *args): | |
if not self.verbose: | |
self.capture.__exit__(exc_type, *args) | |
if exc_type is not None: | |
logger.error("Failed with output:\n%s", self.out.getvalue()) | |
sys.stdout.flush() | |
def create_db_from_model( | |
reconstruction: pycolmap.Reconstruction, database_path: Path | |
) -> Dict[str, int]: | |
if database_path.exists(): | |
logger.warning("The database already exists, deleting it.") | |
database_path.unlink() | |
db = COLMAPDatabase.connect(database_path) | |
db.create_tables() | |
for i, camera in reconstruction.cameras.items(): | |
db.add_camera( | |
camera.model.value, | |
camera.width, | |
camera.height, | |
camera.params, | |
camera_id=i, | |
prior_focal_length=True, | |
) | |
for i, image in reconstruction.images.items(): | |
db.add_image(image.name, image.camera_id, image_id=i) | |
db.commit() | |
db.close() | |
return {image.name: i for i, image in reconstruction.images.items()} | |
def import_features( | |
image_ids: Dict[str, int], database_path: Path, features_path: Path | |
): | |
logger.info("Importing features into the database...") | |
db = COLMAPDatabase.connect(database_path) | |
for image_name, image_id in tqdm(image_ids.items()): | |
keypoints = get_keypoints(features_path, image_name) | |
keypoints += 0.5 # COLMAP origin | |
db.add_keypoints(image_id, keypoints) | |
db.commit() | |
db.close() | |
def import_matches( | |
image_ids: Dict[str, int], | |
database_path: Path, | |
pairs_path: Path, | |
matches_path: Path, | |
min_match_score: Optional[float] = None, | |
skip_geometric_verification: bool = False, | |
): | |
logger.info("Importing matches into the database...") | |
with open(str(pairs_path), "r") as f: | |
pairs = [p.split() for p in f.readlines()] | |
db = COLMAPDatabase.connect(database_path) | |
matched = set() | |
for name0, name1 in tqdm(pairs): | |
id0, id1 = image_ids[name0], image_ids[name1] | |
if len({(id0, id1), (id1, id0)} & matched) > 0: | |
continue | |
matches, scores = get_matches(matches_path, name0, name1) | |
if min_match_score: | |
matches = matches[scores > min_match_score] | |
db.add_matches(id0, id1, matches) | |
matched |= {(id0, id1), (id1, id0)} | |
if skip_geometric_verification: | |
db.add_two_view_geometry(id0, id1, matches) | |
db.commit() | |
db.close() | |
def estimation_and_geometric_verification( | |
database_path: Path, pairs_path: Path, verbose: bool = False | |
): | |
logger.info("Performing geometric verification of the matches...") | |
with OutputCapture(verbose): | |
with pycolmap.ostream(): | |
pycolmap.verify_matches( | |
database_path, | |
pairs_path, | |
options=dict(ransac=dict(max_num_trials=20000, min_inlier_ratio=0.1)), | |
) | |
def geometric_verification( | |
image_ids: Dict[str, int], | |
reference: pycolmap.Reconstruction, | |
database_path: Path, | |
features_path: Path, | |
pairs_path: Path, | |
matches_path: Path, | |
max_error: float = 4.0, | |
): | |
logger.info("Performing geometric verification of the matches...") | |
pairs = parse_retrieval(pairs_path) | |
db = COLMAPDatabase.connect(database_path) | |
inlier_ratios = [] | |
matched = set() | |
for name0 in tqdm(pairs): | |
id0 = image_ids[name0] | |
image0 = reference.images[id0] | |
cam0 = reference.cameras[image0.camera_id] | |
kps0, noise0 = get_keypoints(features_path, name0, return_uncertainty=True) | |
noise0 = 1.0 if noise0 is None else noise0 | |
if len(kps0) > 0: | |
kps0 = np.stack(cam0.cam_from_img(kps0)) | |
else: | |
kps0 = np.zeros((0, 2)) | |
for name1 in pairs[name0]: | |
id1 = image_ids[name1] | |
image1 = reference.images[id1] | |
cam1 = reference.cameras[image1.camera_id] | |
kps1, noise1 = get_keypoints(features_path, name1, return_uncertainty=True) | |
noise1 = 1.0 if noise1 is None else noise1 | |
if len(kps1) > 0: | |
kps1 = np.stack(cam1.cam_from_img(kps1)) | |
else: | |
kps1 = np.zeros((0, 2)) | |
matches = get_matches(matches_path, name0, name1)[0] | |
if len({(id0, id1), (id1, id0)} & matched) > 0: | |
continue | |
matched |= {(id0, id1), (id1, id0)} | |
if matches.shape[0] == 0: | |
db.add_two_view_geometry(id0, id1, matches) | |
continue | |
cam1_from_cam0 = image1.cam_from_world * image0.cam_from_world.inverse() | |
errors0, errors1 = compute_epipolar_errors( | |
cam1_from_cam0, kps0[matches[:, 0]], kps1[matches[:, 1]] | |
) | |
valid_matches = np.logical_and( | |
errors0 <= cam0.cam_from_img_threshold(noise0 * max_error), | |
errors1 <= cam1.cam_from_img_threshold(noise1 * max_error), | |
) | |
# TODO: We could also add E to the database, but we need | |
# to reverse the transformations if id0 > id1 in utils/database.py. | |
db.add_two_view_geometry(id0, id1, matches[valid_matches, :]) | |
inlier_ratios.append(np.mean(valid_matches)) | |
logger.info( | |
"mean/med/min/max valid matches %.2f/%.2f/%.2f/%.2f%%.", | |
np.mean(inlier_ratios) * 100, | |
np.median(inlier_ratios) * 100, | |
np.min(inlier_ratios) * 100, | |
np.max(inlier_ratios) * 100, | |
) | |
db.commit() | |
db.close() | |
def run_triangulation( | |
model_path: Path, | |
database_path: Path, | |
image_dir: Path, | |
reference_model: pycolmap.Reconstruction, | |
verbose: bool = False, | |
options: Optional[Dict[str, Any]] = None, | |
) -> pycolmap.Reconstruction: | |
model_path.mkdir(parents=True, exist_ok=True) | |
logger.info("Running 3D triangulation...") | |
if options is None: | |
options = {} | |
with OutputCapture(verbose): | |
with pycolmap.ostream(): | |
reconstruction = pycolmap.triangulate_points( | |
reference_model, database_path, image_dir, model_path, options=options | |
) | |
return reconstruction | |
def main( | |
sfm_dir: Path, | |
reference_model: Path, | |
image_dir: Path, | |
pairs: Path, | |
features: Path, | |
matches: Path, | |
skip_geometric_verification: bool = False, | |
estimate_two_view_geometries: bool = False, | |
min_match_score: Optional[float] = None, | |
verbose: bool = False, | |
mapper_options: Optional[Dict[str, Any]] = None, | |
) -> pycolmap.Reconstruction: | |
assert reference_model.exists(), reference_model | |
assert features.exists(), features | |
assert pairs.exists(), pairs | |
assert matches.exists(), matches | |
sfm_dir.mkdir(parents=True, exist_ok=True) | |
database = sfm_dir / "database.db" | |
reference = pycolmap.Reconstruction(reference_model) | |
image_ids = create_db_from_model(reference, database) | |
import_features(image_ids, database, features) | |
import_matches( | |
image_ids, | |
database, | |
pairs, | |
matches, | |
min_match_score, | |
skip_geometric_verification, | |
) | |
if not skip_geometric_verification: | |
if estimate_two_view_geometries: | |
estimation_and_geometric_verification(database, pairs, verbose) | |
else: | |
geometric_verification( | |
image_ids, reference, database, features, pairs, matches | |
) | |
reconstruction = run_triangulation( | |
sfm_dir, database, image_dir, reference, verbose, mapper_options | |
) | |
logger.info( | |
"Finished the triangulation with statistics:\n%s", reconstruction.summary() | |
) | |
return reconstruction | |
def parse_option_args(args: List[str], default_options) -> Dict[str, Any]: | |
options = {} | |
for arg in args: | |
idx = arg.find("=") | |
if idx == -1: | |
raise ValueError("Options format: key1=value1 key2=value2 etc.") | |
key, value = arg[:idx], arg[idx + 1 :] | |
if not hasattr(default_options, key): | |
raise ValueError( | |
f'Unknown option "{key}", allowed options and default values' | |
f" for {default_options.summary()}" | |
) | |
value = eval(value) | |
target_type = type(getattr(default_options, key)) | |
if not isinstance(value, target_type): | |
raise ValueError( | |
f'Incorrect type for option "{key}":' f" {type(value)} vs {target_type}" | |
) | |
options[key] = value | |
return options | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--sfm_dir", type=Path, required=True) | |
parser.add_argument("--reference_sfm_model", type=Path, required=True) | |
parser.add_argument("--image_dir", type=Path, required=True) | |
parser.add_argument("--pairs", type=Path, required=True) | |
parser.add_argument("--features", type=Path, required=True) | |
parser.add_argument("--matches", type=Path, required=True) | |
parser.add_argument("--skip_geometric_verification", action="store_true") | |
parser.add_argument("--min_match_score", type=float) | |
parser.add_argument("--verbose", action="store_true") | |
args = parser.parse_args().__dict__ | |
mapper_options = parse_option_args( | |
args.pop("mapper_options"), pycolmap.IncrementalMapperOptions() | |
) | |
main(**args, mapper_options=mapper_options) | |