import argparse import sqlite3 from collections import defaultdict from pathlib import Path import numpy as np from tqdm import tqdm from . import logger from .utils.read_write_model import ( CAMERA_MODEL_NAMES, Camera, Image, Point3D, write_model, ) def recover_database_images_and_ids(database_path): images = {} cameras = {} db = sqlite3.connect(str(database_path)) ret = db.execute("SELECT name, image_id, camera_id FROM images;") for name, image_id, camera_id in ret: images[name] = image_id cameras[name] = camera_id db.close() logger.info(f"Found {len(images)} images and {len(cameras)} cameras in database.") return images, cameras def quaternion_to_rotation_matrix(qvec): qvec = qvec / np.linalg.norm(qvec) w, x, y, z = qvec R = np.array( [ [1 - 2 * y * y - 2 * z * z, 2 * x * y - 2 * z * w, 2 * x * z + 2 * y * w], [2 * x * y + 2 * z * w, 1 - 2 * x * x - 2 * z * z, 2 * y * z - 2 * x * w], [2 * x * z - 2 * y * w, 2 * y * z + 2 * x * w, 1 - 2 * x * x - 2 * y * y], ] ) return R def camera_center_to_translation(c, qvec): R = quaternion_to_rotation_matrix(qvec) return (-1) * np.matmul(R, c) def read_nvm_model(nvm_path, intrinsics_path, image_ids, camera_ids, skip_points=False): with open(intrinsics_path, "r") as f: raw_intrinsics = f.readlines() logger.info(f"Reading {len(raw_intrinsics)} cameras...") cameras = {} for intrinsics in raw_intrinsics: intrinsics = intrinsics.strip("\n").split(" ") name, camera_model, width, height = intrinsics[:4] params = [float(p) for p in intrinsics[4:]] camera_model = CAMERA_MODEL_NAMES[camera_model] assert len(params) == camera_model.num_params camera_id = camera_ids[name] camera = Camera( id=camera_id, model=camera_model.model_name, width=int(width), height=int(height), params=params, ) cameras[camera_id] = camera nvm_f = open(nvm_path, "r") line = nvm_f.readline() while line == "\n" or line.startswith("NVM_V3"): line = nvm_f.readline() num_images = int(line) assert num_images == len(cameras) logger.info(f"Reading {num_images} images...") image_idx_to_db_image_id = [] image_data = [] i = 0 while i < num_images: line = nvm_f.readline() if line == "\n": continue data = line.strip("\n").split(" ") image_data.append(data) image_idx_to_db_image_id.append(image_ids[data[0]]) i += 1 line = nvm_f.readline() while line == "\n": line = nvm_f.readline() num_points = int(line) if skip_points: logger.info(f"Skipping {num_points} points.") num_points = 0 else: logger.info(f"Reading {num_points} points...") points3D = {} image_idx_to_keypoints = defaultdict(list) i = 0 pbar = tqdm(total=num_points, unit="pts") while i < num_points: line = nvm_f.readline() if line == "\n": continue data = line.strip("\n").split(" ") x, y, z, r, g, b, num_observations = data[:7] obs_image_ids, point2D_idxs = [], [] for j in range(int(num_observations)): s = 7 + 4 * j img_index, kp_index, kx, ky = data[s : s + 4] image_idx_to_keypoints[int(img_index)].append( (int(kp_index), float(kx), float(ky), i) ) db_image_id = image_idx_to_db_image_id[int(img_index)] obs_image_ids.append(db_image_id) point2D_idxs.append(kp_index) point = Point3D( id=i, xyz=np.array([x, y, z], float), rgb=np.array([r, g, b], int), error=1.0, # fake image_ids=np.array(obs_image_ids, int), point2D_idxs=np.array(point2D_idxs, int), ) points3D[i] = point i += 1 pbar.update(1) pbar.close() logger.info("Parsing image data...") images = {} for i, data in enumerate(image_data): # Skip the focal length. Skip the distortion and terminal 0. name, _, qw, qx, qy, qz, cx, cy, cz, _, _ = data qvec = np.array([qw, qx, qy, qz], float) c = np.array([cx, cy, cz], float) t = camera_center_to_translation(c, qvec) if i in image_idx_to_keypoints: # NVM only stores triangulated 2D keypoints: add dummy ones keypoints = image_idx_to_keypoints[i] point2D_idxs = np.array([d[0] for d in keypoints]) tri_xys = np.array([[x, y] for _, x, y, _ in keypoints]) tri_ids = np.array([i for _, _, _, i in keypoints]) num_2Dpoints = max(point2D_idxs) + 1 xys = np.zeros((num_2Dpoints, 2), float) point3D_ids = np.full(num_2Dpoints, -1, int) xys[point2D_idxs] = tri_xys point3D_ids[point2D_idxs] = tri_ids else: xys = np.zeros((0, 2), float) point3D_ids = np.full(0, -1, int) image_id = image_ids[name] image = Image( id=image_id, qvec=qvec, tvec=t, camera_id=camera_ids[name], name=name, xys=xys, point3D_ids=point3D_ids, ) images[image_id] = image return cameras, images, points3D def main(nvm, intrinsics, database, output, skip_points=False): assert nvm.exists(), nvm assert intrinsics.exists(), intrinsics assert database.exists(), database image_ids, camera_ids = recover_database_images_and_ids(database) logger.info("Reading the NVM model...") model = read_nvm_model( nvm, intrinsics, image_ids, camera_ids, skip_points=skip_points ) logger.info("Writing the COLMAP model...") output.mkdir(exist_ok=True, parents=True) write_model(*model, path=str(output), ext=".bin") logger.info("Done.") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--nvm", required=True, type=Path) parser.add_argument("--intrinsics", required=True, type=Path) parser.add_argument("--database", required=True, type=Path) parser.add_argument("--output", required=True, type=Path) parser.add_argument("--skip_points", action="store_true") args = parser.parse_args() main(**args.__dict__)