|
import argparse |
|
import pprint |
|
from collections import Counter, defaultdict |
|
from itertools import chain |
|
from pathlib import Path |
|
from types import SimpleNamespace |
|
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union |
|
|
|
import cv2 |
|
import h5py |
|
import numpy as np |
|
import torch |
|
import torchvision.transforms.functional as F |
|
from scipy.spatial import KDTree |
|
from tqdm import tqdm |
|
|
|
from . import logger, matchers |
|
from .extract_features import read_image, resize_image |
|
from .match_features import find_unique_new_pairs |
|
from .utils.base_model import dynamic_load |
|
from .utils.io import list_h5_names |
|
from .utils.parsers import names_to_pair, parse_retrieval |
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
confs = { |
|
|
|
"loftr": { |
|
"output": "matches-loftr", |
|
"model": { |
|
"name": "loftr", |
|
"weights": "outdoor", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": True, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"width": 640, |
|
"height": 480, |
|
"force_resize": True, |
|
}, |
|
"max_error": 1, |
|
"cell_size": 1, |
|
}, |
|
"eloftr": { |
|
"output": "matches-eloftr", |
|
"model": { |
|
"name": "eloftr", |
|
"weights": "weights/eloftr_outdoor.ckpt", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": True, |
|
"resize_max": 1024, |
|
"dfactor": 32, |
|
"width": 640, |
|
"height": 480, |
|
"force_resize": True, |
|
}, |
|
"max_error": 1, |
|
"cell_size": 1, |
|
}, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"cotr": { |
|
"output": "matches-cotr", |
|
"model": { |
|
"name": "cotr", |
|
"weights": "out/default", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"width": 640, |
|
"height": 480, |
|
"force_resize": True, |
|
}, |
|
"max_error": 1, |
|
"cell_size": 1, |
|
}, |
|
|
|
"loftr_aachen": { |
|
"output": "matches-loftr_aachen", |
|
"model": { |
|
"name": "loftr", |
|
"weights": "outdoor", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": True, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"width": 640, |
|
"height": 480, |
|
"force_resize": True, |
|
}, |
|
"max_error": 2, |
|
"cell_size": 8, |
|
}, |
|
|
|
"loftr_superpoint": { |
|
"output": "matches-loftr_aachen", |
|
"model": { |
|
"name": "loftr", |
|
"weights": "outdoor", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": True, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"width": 640, |
|
"height": 480, |
|
"force_resize": True, |
|
}, |
|
"max_error": 4, |
|
"cell_size": 4, |
|
}, |
|
|
|
"topicfm": { |
|
"output": "matches-topicfm", |
|
"model": { |
|
"name": "topicfm", |
|
"weights": "outdoor", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": True, |
|
"force_resize": True, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"width": 640, |
|
"height": 480, |
|
}, |
|
}, |
|
|
|
"aspanformer": { |
|
"output": "matches-aspanformer", |
|
"model": { |
|
"name": "aspanformer", |
|
"weights": "outdoor", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": True, |
|
"force_resize": True, |
|
"resize_max": 1024, |
|
"width": 640, |
|
"height": 480, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
"duster": { |
|
"output": "matches-duster", |
|
"model": { |
|
"name": "duster", |
|
"weights": "vit_large", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"resize_max": 512, |
|
"dfactor": 16, |
|
}, |
|
}, |
|
"mast3r": { |
|
"output": "matches-mast3r", |
|
"model": { |
|
"name": "mast3r", |
|
"weights": "vit_large", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"resize_max": 512, |
|
"dfactor": 16, |
|
}, |
|
}, |
|
"xfeat_lightglue": { |
|
"output": "matches-xfeat_lightglue", |
|
"model": { |
|
"name": "xfeat_lightglue", |
|
"max_keypoints": 8000, |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"force_resize": False, |
|
"resize_max": 1024, |
|
"width": 640, |
|
"height": 480, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
"xfeat_dense": { |
|
"output": "matches-xfeat_dense", |
|
"model": { |
|
"name": "xfeat_dense", |
|
"max_keypoints": 8000, |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"force_resize": False, |
|
"resize_max": 1024, |
|
"width": 640, |
|
"height": 480, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
"dkm": { |
|
"output": "matches-dkm", |
|
"model": { |
|
"name": "dkm", |
|
"weights": "outdoor", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"force_resize": True, |
|
"resize_max": 1024, |
|
"width": 80, |
|
"height": 60, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
"roma": { |
|
"output": "matches-roma", |
|
"model": { |
|
"name": "roma", |
|
"weights": "outdoor", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"force_resize": True, |
|
"resize_max": 1024, |
|
"width": 320, |
|
"height": 240, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
"gim(dkm)": { |
|
"output": "matches-gim", |
|
"model": { |
|
"name": "gim", |
|
"weights": "gim_dkm_100h.ckpt", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"force_resize": True, |
|
"resize_max": 1024, |
|
"width": 320, |
|
"height": 240, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
"omniglue": { |
|
"output": "matches-omniglue", |
|
"model": { |
|
"name": "omniglue", |
|
"match_threshold": 0.2, |
|
"max_keypoints": 2000, |
|
"features": "null", |
|
}, |
|
"preprocessing": { |
|
"grayscale": False, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"force_resize": False, |
|
"resize_max": 1024, |
|
"width": 640, |
|
"height": 480, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
"sold2": { |
|
"output": "matches-sold2", |
|
"model": { |
|
"name": "sold2", |
|
"max_keypoints": 2000, |
|
"match_threshold": 0.2, |
|
}, |
|
"preprocessing": { |
|
"grayscale": True, |
|
"force_resize": True, |
|
"resize_max": 1024, |
|
"width": 640, |
|
"height": 480, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
"gluestick": { |
|
"output": "matches-gluestick", |
|
"model": { |
|
"name": "gluestick", |
|
"use_lines": True, |
|
"max_keypoints": 1000, |
|
"max_lines": 300, |
|
"force_num_keypoints": False, |
|
}, |
|
"preprocessing": { |
|
"grayscale": True, |
|
"force_resize": True, |
|
"resize_max": 1024, |
|
"width": 640, |
|
"height": 480, |
|
"dfactor": 8, |
|
}, |
|
}, |
|
} |
|
|
|
|
|
def to_cpts(kpts, ps): |
|
if ps > 0.0: |
|
kpts = np.round(np.round((kpts + 0.5) / ps) * ps - 0.5, 2) |
|
return [tuple(cpt) for cpt in kpts] |
|
|
|
|
|
def assign_keypoints( |
|
kpts: np.ndarray, |
|
other_cpts: Union[List[Tuple], np.ndarray], |
|
max_error: float, |
|
update: bool = False, |
|
ref_bins: Optional[List[Counter]] = None, |
|
scores: Optional[np.ndarray] = None, |
|
cell_size: Optional[int] = None, |
|
): |
|
if not update: |
|
|
|
if len(other_cpts) == 0 or len(kpts) == 0: |
|
return np.full(len(kpts), -1) |
|
dist, kpt_ids = KDTree(np.array(other_cpts)).query(kpts) |
|
valid = dist <= max_error |
|
kpt_ids[~valid] = -1 |
|
return kpt_ids |
|
else: |
|
ps = cell_size if cell_size is not None else max_error |
|
ps = max(ps, max_error) |
|
|
|
assert isinstance(other_cpts, list) |
|
kpt_ids = [] |
|
cpts = to_cpts(kpts, ps) |
|
bpts = to_cpts(kpts, int(max_error)) |
|
cp_to_id = {val: i for i, val in enumerate(other_cpts)} |
|
for i, (cpt, bpt) in enumerate(zip(cpts, bpts)): |
|
try: |
|
kid = cp_to_id[cpt] |
|
except KeyError: |
|
kid = len(cp_to_id) |
|
cp_to_id[cpt] = kid |
|
other_cpts.append(cpt) |
|
if ref_bins is not None: |
|
ref_bins.append(Counter()) |
|
if ref_bins is not None: |
|
score = scores[i] if scores is not None else 1 |
|
ref_bins[cp_to_id[cpt]][bpt] += score |
|
kpt_ids.append(kid) |
|
return np.array(kpt_ids) |
|
|
|
|
|
def get_grouped_ids(array): |
|
|
|
|
|
idx_sort = np.argsort(array) |
|
sorted_array = array[idx_sort] |
|
_, ids, _ = np.unique(sorted_array, return_counts=True, return_index=True) |
|
res = np.split(idx_sort, ids[1:]) |
|
return res |
|
|
|
|
|
def get_unique_matches(match_ids, scores): |
|
if len(match_ids.shape) == 1: |
|
return [0] |
|
|
|
isets1 = get_grouped_ids(match_ids[:, 0]) |
|
isets2 = get_grouped_ids(match_ids[:, 1]) |
|
uid1s = [ids[scores[ids].argmax()] for ids in isets1 if len(ids) > 0] |
|
uid2s = [ids[scores[ids].argmax()] for ids in isets2 if len(ids) > 0] |
|
uids = list(set(uid1s).intersection(uid2s)) |
|
return match_ids[uids], scores[uids] |
|
|
|
|
|
def matches_to_matches0(matches, scores): |
|
if len(matches) == 0: |
|
return np.zeros(0, dtype=np.int32), np.zeros(0, dtype=np.float16) |
|
n_kps0 = np.max(matches[:, 0]) + 1 |
|
matches0 = -np.ones((n_kps0,)) |
|
scores0 = np.zeros((n_kps0,)) |
|
matches0[matches[:, 0]] = matches[:, 1] |
|
scores0[matches[:, 0]] = scores |
|
return matches0.astype(np.int32), scores0.astype(np.float16) |
|
|
|
|
|
def kpids_to_matches0(kpt_ids0, kpt_ids1, scores): |
|
valid = (kpt_ids0 != -1) & (kpt_ids1 != -1) |
|
matches = np.dstack([kpt_ids0[valid], kpt_ids1[valid]]) |
|
matches = matches.reshape(-1, 2) |
|
scores = scores[valid] |
|
|
|
|
|
matches, scores = get_unique_matches(matches, scores) |
|
return matches_to_matches0(matches, scores) |
|
|
|
|
|
def scale_keypoints(kpts, scale): |
|
if np.any(scale != 1.0): |
|
kpts *= kpts.new_tensor(scale) |
|
return kpts |
|
|
|
|
|
class ImagePairDataset(torch.utils.data.Dataset): |
|
default_conf = { |
|
"grayscale": True, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"cache_images": False, |
|
} |
|
|
|
def __init__(self, image_dir, conf, pairs): |
|
self.image_dir = image_dir |
|
self.conf = conf = SimpleNamespace(**{**self.default_conf, **conf}) |
|
self.pairs = pairs |
|
if self.conf.cache_images: |
|
image_names = set(sum(pairs, ())) |
|
logger.info( |
|
f"Loading and caching {len(image_names)} unique images." |
|
) |
|
self.images = {} |
|
self.scales = {} |
|
for name in tqdm(image_names): |
|
image = read_image(self.image_dir / name, self.conf.grayscale) |
|
self.images[name], self.scales[name] = self.preprocess(image) |
|
|
|
def preprocess(self, image: np.ndarray): |
|
image = image.astype(np.float32, copy=False) |
|
size = image.shape[:2][::-1] |
|
scale = np.array([1.0, 1.0]) |
|
|
|
if self.conf.resize_max: |
|
scale = self.conf.resize_max / max(size) |
|
if scale < 1.0: |
|
size_new = tuple(int(round(x * scale)) for x in size) |
|
image = resize_image(image, size_new, "cv2_area") |
|
scale = np.array(size) / np.array(size_new) |
|
|
|
if self.conf.grayscale: |
|
assert image.ndim == 2, image.shape |
|
image = image[None] |
|
else: |
|
image = image.transpose((2, 0, 1)) |
|
image = torch.from_numpy(image / 255.0).float() |
|
|
|
|
|
size_new = tuple( |
|
map( |
|
lambda x: int(x // self.conf.dfactor * self.conf.dfactor), |
|
image.shape[-2:], |
|
) |
|
) |
|
image = F.resize(image, size=size_new) |
|
scale = np.array(size) / np.array(size_new)[::-1] |
|
return image, scale |
|
|
|
def __len__(self): |
|
return len(self.pairs) |
|
|
|
def __getitem__(self, idx): |
|
name0, name1 = self.pairs[idx] |
|
if self.conf.cache_images: |
|
image0, scale0 = self.images[name0], self.scales[name0] |
|
image1, scale1 = self.images[name1], self.scales[name1] |
|
else: |
|
image0 = read_image(self.image_dir / name0, self.conf.grayscale) |
|
image1 = read_image(self.image_dir / name1, self.conf.grayscale) |
|
image0, scale0 = self.preprocess(image0) |
|
image1, scale1 = self.preprocess(image1) |
|
return image0, image1, scale0, scale1, name0, name1 |
|
|
|
|
|
@torch.no_grad() |
|
def match_dense( |
|
conf: Dict, |
|
pairs: List[Tuple[str, str]], |
|
image_dir: Path, |
|
match_path: Path, |
|
existing_refs: Optional[List] = [], |
|
): |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
Model = dynamic_load(matchers, conf["model"]["name"]) |
|
model = Model(conf["model"]).eval().to(device) |
|
|
|
dataset = ImagePairDataset(image_dir, conf["preprocessing"], pairs) |
|
loader = torch.utils.data.DataLoader( |
|
dataset, num_workers=16, batch_size=1, shuffle=False |
|
) |
|
|
|
logger.info("Performing dense matching...") |
|
with h5py.File(str(match_path), "a") as fd: |
|
for data in tqdm(loader, smoothing=0.1): |
|
|
|
image0, image1, scale0, scale1, (name0,), (name1,) = data |
|
scale0, scale1 = scale0[0].numpy(), scale1[0].numpy() |
|
image0, image1 = image0.to(device), image1.to(device) |
|
|
|
|
|
|
|
if name0 in existing_refs: |
|
|
|
pred = model({"image0": image1, "image1": image0}) |
|
pred = { |
|
**pred, |
|
"keypoints0": pred["keypoints1"], |
|
"keypoints1": pred["keypoints0"], |
|
} |
|
else: |
|
|
|
pred = model({"image0": image0, "image1": image1}) |
|
|
|
|
|
kpts0, kpts1 = pred["keypoints0"], pred["keypoints1"] |
|
kpts0 = scale_keypoints(kpts0 + 0.5, scale0) - 0.5 |
|
kpts1 = scale_keypoints(kpts1 + 0.5, scale1) - 0.5 |
|
kpts0 = kpts0.cpu().numpy() |
|
kpts1 = kpts1.cpu().numpy() |
|
scores = pred["scores"].cpu().numpy() |
|
|
|
|
|
pair = names_to_pair(name0, name1) |
|
if pair in fd: |
|
del fd[pair] |
|
grp = fd.create_group(pair) |
|
|
|
|
|
grp.create_dataset("keypoints0", data=kpts0) |
|
grp.create_dataset("keypoints1", data=kpts1) |
|
grp.create_dataset("scores", data=scores) |
|
del model, loader |
|
|
|
|
|
|
|
def load_keypoints( |
|
conf: Dict, feature_paths_refs: List[Path], quantize: Optional[set] = None |
|
): |
|
name2ref = { |
|
n: i for i, p in enumerate(feature_paths_refs) for n in list_h5_names(p) |
|
} |
|
|
|
existing_refs = set(name2ref.keys()) |
|
if quantize is None: |
|
quantize = existing_refs |
|
if len(existing_refs) > 0: |
|
logger.info(f"Loading keypoints from {len(existing_refs)} images.") |
|
|
|
|
|
cpdict = defaultdict(list) |
|
bindict = defaultdict(list) |
|
for name in existing_refs: |
|
with h5py.File(str(feature_paths_refs[name2ref[name]]), "r") as fd: |
|
kps = fd[name]["keypoints"].__array__() |
|
if name not in quantize: |
|
cpdict[name] = kps |
|
else: |
|
if "scores" in fd[name].keys(): |
|
kp_scores = fd[name]["scores"].__array__() |
|
else: |
|
|
|
|
|
|
|
kp_scores = [1.0 for _ in range(kps.shape[0])] |
|
|
|
assign_keypoints( |
|
kps, |
|
cpdict[name], |
|
conf["max_error"], |
|
True, |
|
bindict[name], |
|
kp_scores, |
|
conf["cell_size"], |
|
) |
|
return cpdict, bindict |
|
|
|
|
|
def aggregate_matches( |
|
conf: Dict, |
|
pairs: List[Tuple[str, str]], |
|
match_path: Path, |
|
feature_path: Path, |
|
required_queries: Optional[Set[str]] = None, |
|
max_kps: Optional[int] = None, |
|
cpdict: Dict[str, Iterable] = defaultdict(list), |
|
bindict: Dict[str, List[Counter]] = defaultdict(list), |
|
): |
|
if required_queries is None: |
|
required_queries = set(sum(pairs, ())) |
|
|
|
required_queries -= set(list_h5_names(feature_path)) |
|
|
|
|
|
required_queries -= set( |
|
[k for k, v in cpdict.items() if isinstance(v, np.ndarray)] |
|
) |
|
|
|
|
|
pairs_per_q = Counter(list(chain(*pairs))) |
|
pairs_score = [min(pairs_per_q[i], pairs_per_q[j]) for i, j in pairs] |
|
pairs = [p for _, p in sorted(zip(pairs_score, pairs))] |
|
|
|
if len(required_queries) > 0: |
|
logger.info( |
|
f"Aggregating keypoints for {len(required_queries)} images." |
|
) |
|
n_kps = 0 |
|
with h5py.File(str(match_path), "a") as fd: |
|
for name0, name1 in tqdm(pairs, smoothing=0.1): |
|
pair = names_to_pair(name0, name1) |
|
grp = fd[pair] |
|
kpts0 = grp["keypoints0"].__array__() |
|
kpts1 = grp["keypoints1"].__array__() |
|
scores = grp["scores"].__array__() |
|
|
|
|
|
update0 = name0 in required_queries |
|
update1 = name1 in required_queries |
|
|
|
|
|
|
|
if update0 and not update1 and max_kps is None: |
|
max_error0 = cell_size0 = 0.0 |
|
else: |
|
max_error0 = conf["max_error"] |
|
cell_size0 = conf["cell_size"] |
|
|
|
|
|
mkp_ids0 = assign_keypoints( |
|
kpts0, |
|
cpdict[name0], |
|
max_error0, |
|
update0, |
|
bindict[name0], |
|
scores, |
|
cell_size0, |
|
) |
|
mkp_ids1 = assign_keypoints( |
|
kpts1, |
|
cpdict[name1], |
|
conf["max_error"], |
|
update1, |
|
bindict[name1], |
|
scores, |
|
conf["cell_size"], |
|
) |
|
|
|
|
|
matches0, scores0 = kpids_to_matches0(mkp_ids0, mkp_ids1, scores) |
|
|
|
assert kpts0.shape[0] == scores.shape[0] |
|
grp.create_dataset("matches0", data=matches0) |
|
grp.create_dataset("matching_scores0", data=scores0) |
|
|
|
|
|
for name in (name0, name1): |
|
pairs_per_q[name] -= 1 |
|
if pairs_per_q[name] > 0 or name not in required_queries: |
|
continue |
|
kp_score = [c.most_common(1)[0][1] for c in bindict[name]] |
|
cpdict[name] = [c.most_common(1)[0][0] for c in bindict[name]] |
|
cpdict[name] = np.array(cpdict[name], dtype=np.float32) |
|
|
|
|
|
if max_kps: |
|
top_k = min(max_kps, cpdict[name].shape[0]) |
|
top_k = np.argsort(kp_score)[::-1][:top_k] |
|
cpdict[name] = cpdict[name][top_k] |
|
kp_score = np.array(kp_score)[top_k] |
|
|
|
|
|
with h5py.File(feature_path, "a") as kfd: |
|
if name in kfd: |
|
del kfd[name] |
|
kgrp = kfd.create_group(name) |
|
kgrp.create_dataset("keypoints", data=cpdict[name]) |
|
kgrp.create_dataset("score", data=kp_score) |
|
n_kps += cpdict[name].shape[0] |
|
del bindict[name] |
|
|
|
if len(required_queries) > 0: |
|
avg_kp_per_image = round(n_kps / len(required_queries), 1) |
|
logger.info( |
|
f"Finished assignment, found {avg_kp_per_image} " |
|
f"keypoints/image (avg.), total {n_kps}." |
|
) |
|
return cpdict |
|
|
|
|
|
def assign_matches( |
|
pairs: List[Tuple[str, str]], |
|
match_path: Path, |
|
keypoints: Union[List[Path], Dict[str, np.array]], |
|
max_error: float, |
|
): |
|
if isinstance(keypoints, list): |
|
keypoints = load_keypoints({}, keypoints, kpts_as_bin=set([])) |
|
assert len(set(sum(pairs, ())) - set(keypoints.keys())) == 0 |
|
with h5py.File(str(match_path), "a") as fd: |
|
for name0, name1 in tqdm(pairs): |
|
pair = names_to_pair(name0, name1) |
|
grp = fd[pair] |
|
kpts0 = grp["keypoints0"].__array__() |
|
kpts1 = grp["keypoints1"].__array__() |
|
scores = grp["scores"].__array__() |
|
|
|
|
|
mkp_ids0 = assign_keypoints(kpts0, keypoints[name0], max_error) |
|
mkp_ids1 = assign_keypoints(kpts1, keypoints[name1], max_error) |
|
|
|
matches0, scores0 = kpids_to_matches0(mkp_ids0, mkp_ids1, scores) |
|
|
|
|
|
del grp["matches0"], grp["matching_scores0"] |
|
grp.create_dataset("matches0", data=matches0) |
|
grp.create_dataset("matching_scores0", data=scores0) |
|
|
|
|
|
@torch.no_grad() |
|
def match_and_assign( |
|
conf: Dict, |
|
pairs_path: Path, |
|
image_dir: Path, |
|
match_path: Path, |
|
feature_path_q: Path, |
|
feature_paths_refs: Optional[List[Path]] = [], |
|
max_kps: Optional[int] = 8192, |
|
overwrite: bool = False, |
|
) -> Path: |
|
for path in feature_paths_refs: |
|
if not path.exists(): |
|
raise FileNotFoundError(f"Reference feature file {path}.") |
|
pairs = parse_retrieval(pairs_path) |
|
pairs = [(q, r) for q, rs in pairs.items() for r in rs] |
|
pairs = find_unique_new_pairs(pairs, None if overwrite else match_path) |
|
required_queries = set(sum(pairs, ())) |
|
|
|
name2ref = { |
|
n: i for i, p in enumerate(feature_paths_refs) for n in list_h5_names(p) |
|
} |
|
existing_refs = required_queries.intersection(set(name2ref.keys())) |
|
|
|
|
|
required_queries = required_queries - existing_refs |
|
|
|
if feature_path_q.exists(): |
|
existing_queries = set(list_h5_names(feature_path_q)) |
|
feature_paths_refs.append(feature_path_q) |
|
existing_refs = set.union(existing_refs, existing_queries) |
|
if not overwrite: |
|
required_queries = required_queries - existing_queries |
|
|
|
if len(pairs) == 0 and len(required_queries) == 0: |
|
logger.info("All pairs exist. Skipping dense matching.") |
|
return |
|
|
|
|
|
match_dense(conf, pairs, image_dir, match_path, existing_refs=existing_refs) |
|
|
|
logger.info("Assigning matches...") |
|
|
|
|
|
cpdict, bindict = load_keypoints( |
|
conf, feature_paths_refs, quantize=required_queries |
|
) |
|
|
|
|
|
cpdict = aggregate_matches( |
|
conf, |
|
pairs, |
|
match_path, |
|
feature_path=feature_path_q, |
|
required_queries=required_queries, |
|
max_kps=max_kps, |
|
cpdict=cpdict, |
|
bindict=bindict, |
|
) |
|
|
|
|
|
if max_kps is not None: |
|
logger.info(f'Reassign matches with max_error={conf["max_error"]}.') |
|
assign_matches(pairs, match_path, cpdict, max_error=conf["max_error"]) |
|
|
|
|
|
def scale_lines(lines, scale): |
|
if np.any(scale != 1.0): |
|
lines *= lines.new_tensor(scale) |
|
return lines |
|
|
|
|
|
def match(model, path_0, path_1, conf): |
|
default_conf = { |
|
"grayscale": True, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"cache_images": False, |
|
"force_resize": False, |
|
"width": 320, |
|
"height": 240, |
|
} |
|
|
|
def preprocess(image: np.ndarray): |
|
image = image.astype(np.float32, copy=False) |
|
size = image.shape[:2][::-1] |
|
scale = np.array([1.0, 1.0]) |
|
if conf.resize_max: |
|
scale = conf.resize_max / max(size) |
|
if scale < 1.0: |
|
size_new = tuple(int(round(x * scale)) for x in size) |
|
image = resize_image(image, size_new, "cv2_area") |
|
scale = np.array(size) / np.array(size_new) |
|
if conf.force_resize: |
|
size = image.shape[:2][::-1] |
|
image = resize_image(image, (conf.width, conf.height), "cv2_area") |
|
size_new = (conf.width, conf.height) |
|
scale = np.array(size) / np.array(size_new) |
|
if conf.grayscale: |
|
assert image.ndim == 2, image.shape |
|
image = image[None] |
|
else: |
|
image = image.transpose((2, 0, 1)) |
|
image = torch.from_numpy(image / 255.0).float() |
|
|
|
size_new = tuple( |
|
map( |
|
lambda x: int(x // conf.dfactor * conf.dfactor), |
|
image.shape[-2:], |
|
) |
|
) |
|
image = F.resize(image, size=size_new, antialias=True) |
|
scale = np.array(size) / np.array(size_new)[::-1] |
|
return image, scale |
|
|
|
conf = SimpleNamespace(**{**default_conf, **conf}) |
|
image0 = read_image(path_0, conf.grayscale) |
|
image1 = read_image(path_1, conf.grayscale) |
|
image0, scale0 = preprocess(image0) |
|
image1, scale1 = preprocess(image1) |
|
image0 = image0.to(device)[None] |
|
image1 = image1.to(device)[None] |
|
pred = model({"image0": image0, "image1": image1}) |
|
|
|
|
|
kpts0, kpts1 = pred["keypoints0"], pred["keypoints1"] |
|
kpts0 = scale_keypoints(kpts0 + 0.5, scale0) - 0.5 |
|
kpts1 = scale_keypoints(kpts1 + 0.5, scale1) - 0.5 |
|
|
|
ret = { |
|
"image0": image0.squeeze().cpu().numpy(), |
|
"image1": image1.squeeze().cpu().numpy(), |
|
"keypoints0": kpts0.cpu().numpy(), |
|
"keypoints1": kpts1.cpu().numpy(), |
|
} |
|
if "mconf" in pred.keys(): |
|
ret["mconf"] = pred["mconf"].cpu().numpy() |
|
return ret |
|
|
|
|
|
@torch.no_grad() |
|
def match_images(model, image_0, image_1, conf, device="cpu"): |
|
default_conf = { |
|
"grayscale": True, |
|
"resize_max": 1024, |
|
"dfactor": 8, |
|
"cache_images": False, |
|
"force_resize": False, |
|
"width": 320, |
|
"height": 240, |
|
} |
|
|
|
def preprocess(image: np.ndarray): |
|
image = image.astype(np.float32, copy=False) |
|
size = image.shape[:2][::-1] |
|
scale = np.array([1.0, 1.0]) |
|
if conf.resize_max: |
|
scale = conf.resize_max / max(size) |
|
if scale < 1.0: |
|
size_new = tuple(int(round(x * scale)) for x in size) |
|
image = resize_image(image, size_new, "cv2_area") |
|
scale = np.array(size) / np.array(size_new) |
|
if conf.force_resize: |
|
size = image.shape[:2][::-1] |
|
image = resize_image(image, (conf.width, conf.height), "cv2_area") |
|
size_new = (conf.width, conf.height) |
|
scale = np.array(size) / np.array(size_new) |
|
if conf.grayscale: |
|
assert image.ndim == 2, image.shape |
|
image = image[None] |
|
else: |
|
image = image.transpose((2, 0, 1)) |
|
image = torch.from_numpy(image / 255.0).float() |
|
|
|
|
|
size_new = tuple( |
|
map( |
|
lambda x: int(x // conf.dfactor * conf.dfactor), |
|
image.shape[-2:], |
|
) |
|
) |
|
image = F.resize(image, size=size_new) |
|
scale = np.array(size) / np.array(size_new)[::-1] |
|
return image, scale |
|
|
|
conf = SimpleNamespace(**{**default_conf, **conf}) |
|
|
|
if len(image_0.shape) == 3 and conf.grayscale: |
|
image0 = cv2.cvtColor(image_0, cv2.COLOR_RGB2GRAY) |
|
else: |
|
image0 = image_0 |
|
if len(image_0.shape) == 3 and conf.grayscale: |
|
image1 = cv2.cvtColor(image_1, cv2.COLOR_RGB2GRAY) |
|
else: |
|
image1 = image_1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
image0, scale0 = preprocess(image0) |
|
image1, scale1 = preprocess(image1) |
|
image0 = image0.to(device)[None] |
|
image1 = image1.to(device)[None] |
|
pred = model({"image0": image0, "image1": image1}) |
|
|
|
s0 = np.array(image_0.shape[:2][::-1]) / np.array(image0.shape[-2:][::-1]) |
|
s1 = np.array(image_1.shape[:2][::-1]) / np.array(image1.shape[-2:][::-1]) |
|
|
|
|
|
if "keypoints0" in pred.keys() and "keypoints1" in pred.keys(): |
|
kpts0, kpts1 = pred["keypoints0"], pred["keypoints1"] |
|
kpts0_origin = scale_keypoints(kpts0 + 0.5, s0) - 0.5 |
|
kpts1_origin = scale_keypoints(kpts1 + 0.5, s1) - 0.5 |
|
|
|
ret = { |
|
"image0": image0.squeeze().cpu().numpy(), |
|
"image1": image1.squeeze().cpu().numpy(), |
|
"image0_orig": image_0, |
|
"image1_orig": image_1, |
|
"keypoints0": kpts0.cpu().numpy(), |
|
"keypoints1": kpts1.cpu().numpy(), |
|
"keypoints0_orig": kpts0_origin.cpu().numpy(), |
|
"keypoints1_orig": kpts1_origin.cpu().numpy(), |
|
"mkeypoints0": kpts0.cpu().numpy(), |
|
"mkeypoints1": kpts1.cpu().numpy(), |
|
"mkeypoints0_orig": kpts0_origin.cpu().numpy(), |
|
"mkeypoints1_orig": kpts1_origin.cpu().numpy(), |
|
"original_size0": np.array(image_0.shape[:2][::-1]), |
|
"original_size1": np.array(image_1.shape[:2][::-1]), |
|
"new_size0": np.array(image0.shape[-2:][::-1]), |
|
"new_size1": np.array(image1.shape[-2:][::-1]), |
|
"scale0": s0, |
|
"scale1": s1, |
|
} |
|
if "mconf" in pred.keys(): |
|
ret["mconf"] = pred["mconf"].cpu().numpy() |
|
elif "scores" in pred.keys(): |
|
ret["mconf"] = pred["scores"].cpu().numpy() |
|
else: |
|
ret["mconf"] = np.ones_like(kpts0.cpu().numpy()[:, 0]) |
|
if "lines0" in pred.keys() and "lines1" in pred.keys(): |
|
if "keypoints0" in pred.keys() and "keypoints1" in pred.keys(): |
|
kpts0, kpts1 = pred["keypoints0"], pred["keypoints1"] |
|
kpts0_origin = scale_keypoints(kpts0 + 0.5, s0) - 0.5 |
|
kpts1_origin = scale_keypoints(kpts1 + 0.5, s1) - 0.5 |
|
kpts0_origin = kpts0_origin.cpu().numpy() |
|
kpts1_origin = kpts1_origin.cpu().numpy() |
|
else: |
|
kpts0_origin, kpts1_origin = ( |
|
None, |
|
None, |
|
) |
|
lines0, lines1 = pred["lines0"], pred["lines1"] |
|
lines0_raw, lines1_raw = pred["raw_lines0"], pred["raw_lines1"] |
|
|
|
lines0_raw = torch.from_numpy(lines0_raw.copy()) |
|
lines1_raw = torch.from_numpy(lines1_raw.copy()) |
|
lines0_raw = scale_lines(lines0_raw + 0.5, s0) - 0.5 |
|
lines1_raw = scale_lines(lines1_raw + 0.5, s1) - 0.5 |
|
|
|
lines0 = torch.from_numpy(lines0.copy()) |
|
lines1 = torch.from_numpy(lines1.copy()) |
|
lines0 = scale_lines(lines0 + 0.5, s0) - 0.5 |
|
lines1 = scale_lines(lines1 + 0.5, s1) - 0.5 |
|
|
|
ret = { |
|
"image0_orig": image_0, |
|
"image1_orig": image_1, |
|
"line0": lines0_raw.cpu().numpy(), |
|
"line1": lines1_raw.cpu().numpy(), |
|
"line0_orig": lines0.cpu().numpy(), |
|
"line1_orig": lines1.cpu().numpy(), |
|
"line_keypoints0_orig": kpts0_origin, |
|
"line_keypoints1_orig": kpts1_origin, |
|
} |
|
del pred |
|
torch.cuda.empty_cache() |
|
return ret |
|
|
|
|
|
@torch.no_grad() |
|
def main( |
|
conf: Dict, |
|
pairs: Path, |
|
image_dir: Path, |
|
export_dir: Optional[Path] = None, |
|
matches: Optional[Path] = None, |
|
features: Optional[Path] = None, |
|
features_ref: Optional[Path] = None, |
|
max_kps: Optional[int] = 8192, |
|
overwrite: bool = False, |
|
) -> Path: |
|
logger.info( |
|
"Extracting semi-dense features with configuration:" |
|
f"\n{pprint.pformat(conf)}" |
|
) |
|
|
|
if features is None: |
|
features = "feats_" |
|
|
|
if isinstance(features, Path): |
|
features_q = features |
|
if matches is None: |
|
raise ValueError( |
|
"Either provide both features and matches as Path" |
|
" or both as names." |
|
) |
|
else: |
|
if export_dir is None: |
|
raise ValueError( |
|
"Provide an export_dir if features and matches" |
|
f" are not file paths: {features}, {matches}." |
|
) |
|
features_q = Path(export_dir, f'{features}{conf["output"]}.h5') |
|
if matches is None: |
|
matches = Path(export_dir, f'{conf["output"]}_{pairs.stem}.h5') |
|
|
|
if features_ref is None: |
|
features_ref = [] |
|
elif isinstance(features_ref, list): |
|
features_ref = list(features_ref) |
|
elif isinstance(features_ref, Path): |
|
features_ref = [features_ref] |
|
else: |
|
raise TypeError(str(features_ref)) |
|
|
|
match_and_assign( |
|
conf, |
|
pairs, |
|
image_dir, |
|
matches, |
|
features_q, |
|
features_ref, |
|
max_kps, |
|
overwrite, |
|
) |
|
|
|
return features_q, matches |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--pairs", type=Path, required=True) |
|
parser.add_argument("--image_dir", type=Path, required=True) |
|
parser.add_argument("--export_dir", type=Path, required=True) |
|
parser.add_argument( |
|
"--matches", type=Path, default=confs["loftr"]["output"] |
|
) |
|
parser.add_argument( |
|
"--features", type=str, default="feats_" + confs["loftr"]["output"] |
|
) |
|
parser.add_argument( |
|
"--conf", type=str, default="loftr", choices=list(confs.keys()) |
|
) |
|
args = parser.parse_args() |
|
main( |
|
confs[args.conf], |
|
args.pairs, |
|
args.image_dir, |
|
args.export_dir, |
|
args.matches, |
|
args.features, |
|
) |
|
|