image-matching-webui / hloc /match_dense.py
Vincentqyw
update: sync with hloc
4c88343
raw
history blame
35.6 kB
from types import SimpleNamespace
import cv2
import numpy as np
import torch
import torchvision.transforms.functional as F
from .extract_features import read_image, resize_image
device = "cuda" if torch.cuda.is_available() else "cpu"
confs = {
# Best quality but loads of points. Only use for small scenes
"loftr": {
"output": "matches-loftr",
"model": {
"name": "loftr",
"weights": "outdoor",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": True,
"resize_max": 1024,
"dfactor": 8,
"width": 640,
"height": 480,
"force_resize": True,
},
"max_error": 1, # max error for assigned keypoints (in px)
"cell_size": 1, # size of quantization patch (max 1 kp/patch)
},
# "loftr_quadtree": {
# "output": "matches-loftr-quadtree",
# "model": {
# "name": "quadtree",
# "weights": "outdoor",
# "max_keypoints": 2000,
# "match_threshold": 0.2,
# },
# "preprocessing": {
# "grayscale": True,
# "resize_max": 1024,
# "dfactor": 8,
# "width": 640,
# "height": 480,
# "force_resize": True,
# },
# "max_error": 1, # max error for assigned keypoints (in px)
# "cell_size": 1, # size of quantization patch (max 1 kp/patch)
# },
"cotr": {
"output": "matches-cotr",
"model": {
"name": "cotr",
"weights": "out/default",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": False,
"resize_max": 1024,
"dfactor": 8,
"width": 640,
"height": 480,
"force_resize": True,
},
"max_error": 1, # max error for assigned keypoints (in px)
"cell_size": 1, # size of quantization patch (max 1 kp/patch)
},
# Semi-scalable loftr which limits detected keypoints
"loftr_aachen": {
"output": "matches-loftr_aachen",
"model": {
"name": "loftr",
"weights": "outdoor",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {"grayscale": True, "resize_max": 1024, "dfactor": 8},
"max_error": 2, # max error for assigned keypoints (in px)
"cell_size": 8, # size of quantization patch (max 1 kp/patch)
},
# Use for matching superpoint feats with loftr
"loftr_superpoint": {
"output": "matches-loftr_aachen",
"model": {
"name": "loftr",
"weights": "outdoor",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {"grayscale": True, "resize_max": 1024, "dfactor": 8},
"max_error": 4, # max error for assigned keypoints (in px)
"cell_size": 4, # size of quantization patch (max 1 kp/patch)
},
# Use topicfm for matching feats
"topicfm": {
"output": "matches-topicfm",
"model": {
"name": "topicfm",
"weights": "outdoor",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": True,
"force_resize": True,
"resize_max": 1024,
"dfactor": 8,
"width": 640,
"height": 480,
},
},
# Use aspanformer for matching feats
"aspanformer": {
"output": "matches-aspanformer",
"model": {
"name": "aspanformer",
"weights": "outdoor",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": True,
"force_resize": True,
"resize_max": 1024,
"width": 640,
"height": 480,
"dfactor": 8,
},
},
"duster": {
"output": "matches-duster",
"model": {
"name": "duster",
"weights": "vit_large",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": False,
"resize_max": 512,
"dfactor": 16,
},
},
"mast3r": {
"output": "matches-mast3r",
"model": {
"name": "mast3r",
"weights": "vit_large",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": False,
"resize_max": 512,
"dfactor": 16,
},
},
"xfeat_dense": {
"output": "matches-xfeat_dense",
"model": {
"name": "xfeat_dense",
"max_keypoints": 8000,
},
"preprocessing": {
"grayscale": False,
"force_resize": False,
"resize_max": 1024,
"width": 640,
"height": 480,
"dfactor": 8,
},
},
"dkm": {
"output": "matches-dkm",
"model": {
"name": "dkm",
"weights": "outdoor",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": False,
"force_resize": True,
"resize_max": 1024,
"width": 80,
"height": 60,
"dfactor": 8,
},
},
"roma": {
"output": "matches-roma",
"model": {
"name": "roma",
"weights": "outdoor",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": False,
"force_resize": True,
"resize_max": 1024,
"width": 320,
"height": 240,
"dfactor": 8,
},
},
"gim(dkm)": {
"output": "matches-gim",
"model": {
"name": "gim",
"weights": "gim_dkm_100h.ckpt",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": False,
"force_resize": True,
"resize_max": 1024,
"width": 320,
"height": 240,
"dfactor": 8,
},
},
"omniglue": {
"output": "matches-omniglue",
"model": {
"name": "omniglue",
"match_threshold": 0.2,
"max_keypoints": 2000,
"features": "null",
},
"preprocessing": {
"grayscale": False,
"resize_max": 1024,
"dfactor": 8,
"force_resize": False,
},
},
"sold2": {
"output": "matches-sold2",
"model": {
"name": "sold2",
"max_keypoints": 2000,
"match_threshold": 0.2,
},
"preprocessing": {
"grayscale": True,
"force_resize": True,
"resize_max": 1024,
"width": 640,
"height": 480,
"dfactor": 8,
},
},
"gluestick": {
"output": "matches-gluestick",
"model": {
"name": "gluestick",
"use_lines": True,
"max_keypoints": 1000,
"max_lines": 300,
"force_num_keypoints": False,
},
"preprocessing": {
"grayscale": True,
"force_resize": True,
"resize_max": 1024,
"width": 640,
"height": 480,
"dfactor": 8,
},
},
}
def to_cpts(kpts, ps):
if ps > 0.0:
kpts = np.round(np.round((kpts + 0.5) / ps) * ps - 0.5, 2)
return [tuple(cpt) for cpt in kpts]
def assign_keypoints(
kpts: np.ndarray,
other_cpts: Union[List[Tuple], np.ndarray],
max_error: float,
update: bool = False,
ref_bins: Optional[List[Counter]] = None,
scores: Optional[np.ndarray] = None,
cell_size: Optional[int] = None,
):
if not update:
# Without update this is just a NN search
if len(other_cpts) == 0 or len(kpts) == 0:
return np.full(len(kpts), -1)
dist, kpt_ids = KDTree(np.array(other_cpts)).query(kpts)
valid = dist <= max_error
kpt_ids[~valid] = -1
return kpt_ids
else:
ps = cell_size if cell_size is not None else max_error
ps = max(ps, max_error)
# With update we quantize and bin (optionally)
assert isinstance(other_cpts, list)
kpt_ids = []
cpts = to_cpts(kpts, ps)
bpts = to_cpts(kpts, int(max_error))
cp_to_id = {val: i for i, val in enumerate(other_cpts)}
for i, (cpt, bpt) in enumerate(zip(cpts, bpts)):
try:
kid = cp_to_id[cpt]
except KeyError:
kid = len(cp_to_id)
cp_to_id[cpt] = kid
other_cpts.append(cpt)
if ref_bins is not None:
ref_bins.append(Counter())
if ref_bins is not None:
score = scores[i] if scores is not None else 1
ref_bins[cp_to_id[cpt]][bpt] += score
kpt_ids.append(kid)
return np.array(kpt_ids)
def get_grouped_ids(array):
# Group array indices based on its values
# all duplicates are grouped as a set
idx_sort = np.argsort(array)
sorted_array = array[idx_sort]
_, ids, _ = np.unique(sorted_array, return_counts=True, return_index=True)
res = np.split(idx_sort, ids[1:])
return res
def get_unique_matches(match_ids, scores):
if len(match_ids.shape) == 1:
return [0]
isets1 = get_grouped_ids(match_ids[:, 0])
isets2 = get_grouped_ids(match_ids[:, 1])
uid1s = [ids[scores[ids].argmax()] for ids in isets1 if len(ids) > 0]
uid2s = [ids[scores[ids].argmax()] for ids in isets2 if len(ids) > 0]
uids = list(set(uid1s).intersection(uid2s))
return match_ids[uids], scores[uids]
def matches_to_matches0(matches, scores):
if len(matches) == 0:
return np.zeros(0, dtype=np.int32), np.zeros(0, dtype=np.float16)
n_kps0 = np.max(matches[:, 0]) + 1
matches0 = -np.ones((n_kps0,))
scores0 = np.zeros((n_kps0,))
matches0[matches[:, 0]] = matches[:, 1]
scores0[matches[:, 0]] = scores
return matches0.astype(np.int32), scores0.astype(np.float16)
def kpids_to_matches0(kpt_ids0, kpt_ids1, scores):
valid = (kpt_ids0 != -1) & (kpt_ids1 != -1)
matches = np.dstack([kpt_ids0[valid], kpt_ids1[valid]])
matches = matches.reshape(-1, 2)
scores = scores[valid]
# Remove n-to-1 matches
matches, scores = get_unique_matches(matches, scores)
return matches_to_matches0(matches, scores)
def scale_keypoints(kpts, scale):
if np.any(scale != 1.0):
kpts *= kpts.new_tensor(scale)
return kpts
class ImagePairDataset(torch.utils.data.Dataset):
default_conf = {
"grayscale": True,
"resize_max": 1024,
"dfactor": 8,
"cache_images": False,
}
def __init__(self, image_dir, conf, pairs):
self.image_dir = image_dir
self.conf = conf = SimpleNamespace(**{**self.default_conf, **conf})
self.pairs = pairs
if self.conf.cache_images:
image_names = set(sum(pairs, ())) # unique image names in pairs
logger.info(f"Loading and caching {len(image_names)} unique images.")
self.images = {}
self.scales = {}
for name in tqdm(image_names):
image = read_image(self.image_dir / name, self.conf.grayscale)
self.images[name], self.scales[name] = self.preprocess(image)
def preprocess(self, image: np.ndarray):
image = image.astype(np.float32, copy=False)
size = image.shape[:2][::-1]
scale = np.array([1.0, 1.0])
if self.conf.resize_max:
scale = self.conf.resize_max / max(size)
if scale < 1.0:
size_new = tuple(int(round(x * scale)) for x in size)
image = resize_image(image, size_new, "cv2_area")
scale = np.array(size) / np.array(size_new)
if self.conf.grayscale:
assert image.ndim == 2, image.shape
image = image[None]
else:
image = image.transpose((2, 0, 1)) # HxWxC to CxHxW
image = torch.from_numpy(image / 255.0).float()
# assure that the size is divisible by dfactor
size_new = tuple(
map(
lambda x: int(x // self.conf.dfactor * self.conf.dfactor),
image.shape[-2:],
)
)
image = F.resize(image, size=size_new)
scale = np.array(size) / np.array(size_new)[::-1]
return image, scale
def __len__(self):
return len(self.pairs)
def __getitem__(self, idx):
name0, name1 = self.pairs[idx]
if self.conf.cache_images:
image0, scale0 = self.images[name0], self.scales[name0]
image1, scale1 = self.images[name1], self.scales[name1]
else:
image0 = read_image(self.image_dir / name0, self.conf.grayscale)
image1 = read_image(self.image_dir / name1, self.conf.grayscale)
image0, scale0 = self.preprocess(image0)
image1, scale1 = self.preprocess(image1)
return image0, image1, scale0, scale1, name0, name1
@torch.no_grad()
def match_dense(
conf: Dict,
pairs: List[Tuple[str, str]],
image_dir: Path,
match_path: Path, # out
existing_refs: Optional[List] = [],
):
device = "cuda" if torch.cuda.is_available() else "cpu"
Model = dynamic_load(matchers, conf["model"]["name"])
model = Model(conf["model"]).eval().to(device)
dataset = ImagePairDataset(image_dir, conf["preprocessing"], pairs)
loader = torch.utils.data.DataLoader(
dataset, num_workers=16, batch_size=1, shuffle=False
)
logger.info("Performing dense matching...")
with h5py.File(str(match_path), "a") as fd:
for data in tqdm(loader, smoothing=0.1):
# load image-pair data
image0, image1, scale0, scale1, (name0,), (name1,) = data
scale0, scale1 = scale0[0].numpy(), scale1[0].numpy()
image0, image1 = image0.to(device), image1.to(device)
# match semi-dense
# for consistency with pairs_from_*: refine kpts of image0
if name0 in existing_refs:
# special case: flip to enable refinement in query image
pred = model({"image0": image1, "image1": image0})
pred = {
**pred,
"keypoints0": pred["keypoints1"],
"keypoints1": pred["keypoints0"],
}
else:
# usual case
pred = model({"image0": image0, "image1": image1})
# Rescale keypoints and move to cpu
kpts0, kpts1 = pred["keypoints0"], pred["keypoints1"]
kpts0 = scale_keypoints(kpts0 + 0.5, scale0) - 0.5
kpts1 = scale_keypoints(kpts1 + 0.5, scale1) - 0.5
kpts0 = kpts0.cpu().numpy()
kpts1 = kpts1.cpu().numpy()
scores = pred["scores"].cpu().numpy()
# Write matches and matching scores in hloc format
pair = names_to_pair(name0, name1)
if pair in fd:
del fd[pair]
grp = fd.create_group(pair)
# Write dense matching output
grp.create_dataset("keypoints0", data=kpts0)
grp.create_dataset("keypoints1", data=kpts1)
grp.create_dataset("scores", data=scores)
del model, loader
# default: quantize all!
def load_keypoints(
conf: Dict, feature_paths_refs: List[Path], quantize: Optional[set] = None
):
name2ref = {
n: i for i, p in enumerate(feature_paths_refs) for n in list_h5_names(p)
}
existing_refs = set(name2ref.keys())
if quantize is None:
quantize = existing_refs # quantize all
if len(existing_refs) > 0:
logger.info(f"Loading keypoints from {len(existing_refs)} images.")
# Load query keypoints
cpdict = defaultdict(list)
bindict = defaultdict(list)
for name in existing_refs:
with h5py.File(str(feature_paths_refs[name2ref[name]]), "r") as fd:
kps = fd[name]["keypoints"].__array__()
if name not in quantize:
cpdict[name] = kps
else:
if "scores" in fd[name].keys():
kp_scores = fd[name]["scores"].__array__()
else:
# we set the score to 1.0 if not provided
# increase for more weight on reference keypoints for
# stronger anchoring
kp_scores = [1.0 for _ in range(kps.shape[0])]
# bin existing keypoints of reference images for association
assign_keypoints(
kps,
cpdict[name],
conf["max_error"],
True,
bindict[name],
kp_scores,
conf["cell_size"],
)
return cpdict, bindict
def aggregate_matches(
conf: Dict,
pairs: List[Tuple[str, str]],
match_path: Path,
feature_path: Path,
required_queries: Optional[Set[str]] = None,
max_kps: Optional[int] = None,
cpdict: Dict[str, Iterable] = defaultdict(list),
bindict: Dict[str, List[Counter]] = defaultdict(list),
):
if required_queries is None:
required_queries = set(sum(pairs, ()))
# default: do not overwrite existing features in feature_path!
required_queries -= set(list_h5_names(feature_path))
# if an entry in cpdict is provided as np.ndarray we assume it is fixed
required_queries -= set([k for k, v in cpdict.items() if isinstance(v, np.ndarray)])
# sort pairs for reduced RAM
pairs_per_q = Counter(list(chain(*pairs)))
pairs_score = [min(pairs_per_q[i], pairs_per_q[j]) for i, j in pairs]
pairs = [p for _, p in sorted(zip(pairs_score, pairs))]
if len(required_queries) > 0:
logger.info(f"Aggregating keypoints for {len(required_queries)} images.")
n_kps = 0
with h5py.File(str(match_path), "a") as fd:
for name0, name1 in tqdm(pairs, smoothing=0.1):
pair = names_to_pair(name0, name1)
grp = fd[pair]
kpts0 = grp["keypoints0"].__array__()
kpts1 = grp["keypoints1"].__array__()
scores = grp["scores"].__array__()
# Aggregate local features
update0 = name0 in required_queries
update1 = name1 in required_queries
# in localization we do not want to bin the query kp
# assumes that the query is name0!
if update0 and not update1 and max_kps is None:
max_error0 = cell_size0 = 0.0
else:
max_error0 = conf["max_error"]
cell_size0 = conf["cell_size"]
# Get match ids and extend query keypoints (cpdict)
mkp_ids0 = assign_keypoints(
kpts0,
cpdict[name0],
max_error0,
update0,
bindict[name0],
scores,
cell_size0,
)
mkp_ids1 = assign_keypoints(
kpts1,
cpdict[name1],
conf["max_error"],
update1,
bindict[name1],
scores,
conf["cell_size"],
)
# Build matches from assignments
matches0, scores0 = kpids_to_matches0(mkp_ids0, mkp_ids1, scores)
assert kpts0.shape[0] == scores.shape[0]
grp.create_dataset("matches0", data=matches0)
grp.create_dataset("matching_scores0", data=scores0)
# Convert bins to kps if finished, and store them
for name in (name0, name1):
pairs_per_q[name] -= 1
if pairs_per_q[name] > 0 or name not in required_queries:
continue
kp_score = [c.most_common(1)[0][1] for c in bindict[name]]
cpdict[name] = [c.most_common(1)[0][0] for c in bindict[name]]
cpdict[name] = np.array(cpdict[name], dtype=np.float32)
# Select top-k query kps by score (reassign matches later)
if max_kps:
top_k = min(max_kps, cpdict[name].shape[0])
top_k = np.argsort(kp_score)[::-1][:top_k]
cpdict[name] = cpdict[name][top_k]
kp_score = np.array(kp_score)[top_k]
# Write query keypoints
with h5py.File(feature_path, "a") as kfd:
if name in kfd:
del kfd[name]
kgrp = kfd.create_group(name)
kgrp.create_dataset("keypoints", data=cpdict[name])
kgrp.create_dataset("score", data=kp_score)
n_kps += cpdict[name].shape[0]
del bindict[name]
if len(required_queries) > 0:
avg_kp_per_image = round(n_kps / len(required_queries), 1)
logger.info(
f"Finished assignment, found {avg_kp_per_image} "
f"keypoints/image (avg.), total {n_kps}."
)
return cpdict
def assign_matches(
pairs: List[Tuple[str, str]],
match_path: Path,
keypoints: Union[List[Path], Dict[str, np.array]],
max_error: float,
):
if isinstance(keypoints, list):
keypoints = load_keypoints({}, keypoints, kpts_as_bin=set([]))
assert len(set(sum(pairs, ())) - set(keypoints.keys())) == 0
with h5py.File(str(match_path), "a") as fd:
for name0, name1 in tqdm(pairs):
pair = names_to_pair(name0, name1)
grp = fd[pair]
kpts0 = grp["keypoints0"].__array__()
kpts1 = grp["keypoints1"].__array__()
scores = grp["scores"].__array__()
# NN search across cell boundaries
mkp_ids0 = assign_keypoints(kpts0, keypoints[name0], max_error)
mkp_ids1 = assign_keypoints(kpts1, keypoints[name1], max_error)
matches0, scores0 = kpids_to_matches0(mkp_ids0, mkp_ids1, scores)
# overwrite matches0 and matching_scores0
del grp["matches0"], grp["matching_scores0"]
grp.create_dataset("matches0", data=matches0)
grp.create_dataset("matching_scores0", data=scores0)
@torch.no_grad()
def match_and_assign(
conf: Dict,
pairs_path: Path,
image_dir: Path,
match_path: Path, # out
feature_path_q: Path, # out
feature_paths_refs: Optional[List[Path]] = [],
max_kps: Optional[int] = 8192,
overwrite: bool = False,
) -> Path:
for path in feature_paths_refs:
if not path.exists():
raise FileNotFoundError(f"Reference feature file {path}.")
pairs = parse_retrieval(pairs_path)
pairs = [(q, r) for q, rs in pairs.items() for r in rs]
pairs = find_unique_new_pairs(pairs, None if overwrite else match_path)
required_queries = set(sum(pairs, ()))
name2ref = {
n: i for i, p in enumerate(feature_paths_refs) for n in list_h5_names(p)
}
existing_refs = required_queries.intersection(set(name2ref.keys()))
# images which require feature extraction
required_queries = required_queries - existing_refs
if feature_path_q.exists():
existing_queries = set(list_h5_names(feature_path_q))
feature_paths_refs.append(feature_path_q)
existing_refs = set.union(existing_refs, existing_queries)
if not overwrite:
required_queries = required_queries - existing_queries
if len(pairs) == 0 and len(required_queries) == 0:
logger.info("All pairs exist. Skipping dense matching.")
return
# extract semi-dense matches
match_dense(conf, pairs, image_dir, match_path, existing_refs=existing_refs)
logger.info("Assigning matches...")
# Pre-load existing keypoints
cpdict, bindict = load_keypoints(
conf, feature_paths_refs, quantize=required_queries
)
# Reassign matches by aggregation
cpdict = aggregate_matches(
conf,
pairs,
match_path,
feature_path=feature_path_q,
required_queries=required_queries,
max_kps=max_kps,
cpdict=cpdict,
bindict=bindict,
)
# Invalidate matches that are far from selected bin by reassignment
if max_kps is not None:
logger.info(f'Reassign matches with max_error={conf["max_error"]}.')
assign_matches(pairs, match_path, cpdict, max_error=conf["max_error"])
def scale_lines(lines, scale):
if np.any(scale != 1.0):
lines *= lines.new_tensor(scale)
return lines
def match(model, path_0, path_1, conf):
default_conf = {
"grayscale": True,
"resize_max": 1024,
"dfactor": 8,
"cache_images": False,
"force_resize": False,
"width": 320,
"height": 240,
}
def preprocess(image: np.ndarray):
image = image.astype(np.float32, copy=False)
size = image.shape[:2][::-1]
scale = np.array([1.0, 1.0])
if conf.resize_max:
scale = conf.resize_max / max(size)
if scale < 1.0:
size_new = tuple(int(round(x * scale)) for x in size)
image = resize_image(image, size_new, "cv2_area")
scale = np.array(size) / np.array(size_new)
if conf.force_resize:
size = image.shape[:2][::-1]
image = resize_image(image, (conf.width, conf.height), "cv2_area")
size_new = (conf.width, conf.height)
scale = np.array(size) / np.array(size_new)
if conf.grayscale:
assert image.ndim == 2, image.shape
image = image[None]
else:
image = image.transpose((2, 0, 1)) # HxWxC to CxHxW
image = torch.from_numpy(image / 255.0).float()
# assure that the size is divisible by dfactor
size_new = tuple(
map(
lambda x: int(x // conf.dfactor * conf.dfactor),
image.shape[-2:],
)
)
image = F.resize(image, size=size_new, antialias=True)
scale = np.array(size) / np.array(size_new)[::-1]
return image, scale
conf = SimpleNamespace(**{**default_conf, **conf})
image0 = read_image(path_0, conf.grayscale)
image1 = read_image(path_1, conf.grayscale)
image0, scale0 = preprocess(image0)
image1, scale1 = preprocess(image1)
image0 = image0.to(device)[None]
image1 = image1.to(device)[None]
pred = model({"image0": image0, "image1": image1})
# Rescale keypoints and move to cpu
kpts0, kpts1 = pred["keypoints0"], pred["keypoints1"]
kpts0 = scale_keypoints(kpts0 + 0.5, scale0) - 0.5
kpts1 = scale_keypoints(kpts1 + 0.5, scale1) - 0.5
ret = {
"image0": image0.squeeze().cpu().numpy(),
"image1": image1.squeeze().cpu().numpy(),
"keypoints0": kpts0.cpu().numpy(),
"keypoints1": kpts1.cpu().numpy(),
}
if "mconf" in pred.keys():
ret["mconf"] = pred["mconf"].cpu().numpy()
return ret
@torch.no_grad()
def match_images(model, image_0, image_1, conf, device="cpu"):
default_conf = {
"grayscale": True,
"resize_max": 1024,
"dfactor": 8,
"cache_images": False,
"force_resize": False,
"width": 320,
"height": 240,
}
def preprocess(image: np.ndarray):
image = image.astype(np.float32, copy=False)
size = image.shape[:2][::-1]
scale = np.array([1.0, 1.0])
if conf.resize_max:
scale = conf.resize_max / max(size)
if scale < 1.0:
size_new = tuple(int(round(x * scale)) for x in size)
image = resize_image(image, size_new, "cv2_area")
scale = np.array(size) / np.array(size_new)
if conf.force_resize:
size = image.shape[:2][::-1]
image = resize_image(image, (conf.width, conf.height), "cv2_area")
size_new = (conf.width, conf.height)
scale = np.array(size) / np.array(size_new)
if conf.grayscale:
assert image.ndim == 2, image.shape
image = image[None]
else:
image = image.transpose((2, 0, 1)) # HxWxC to CxHxW
image = torch.from_numpy(image / 255.0).float()
# assure that the size is divisible by dfactor
size_new = tuple(
map(
lambda x: int(x // conf.dfactor * conf.dfactor),
image.shape[-2:],
)
)
image = F.resize(image, size=size_new)
scale = np.array(size) / np.array(size_new)[::-1]
return image, scale
conf = SimpleNamespace(**{**default_conf, **conf})
if len(image_0.shape) == 3 and conf.grayscale:
image0 = cv2.cvtColor(image_0, cv2.COLOR_RGB2GRAY)
else:
image0 = image_0
if len(image_0.shape) == 3 and conf.grayscale:
image1 = cv2.cvtColor(image_1, cv2.COLOR_RGB2GRAY)
else:
image1 = image_1
# comment following lines, image is always RGB mode
# if not conf.grayscale and len(image0.shape) == 3:
# image0 = image0[:, :, ::-1] # BGR to RGB
# if not conf.grayscale and len(image1.shape) == 3:
# image1 = image1[:, :, ::-1] # BGR to RGB
image0, scale0 = preprocess(image0)
image1, scale1 = preprocess(image1)
image0 = image0.to(device)[None]
image1 = image1.to(device)[None]
pred = model({"image0": image0, "image1": image1})
s0 = np.array(image_0.shape[:2][::-1]) / np.array(image0.shape[-2:][::-1])
s1 = np.array(image_1.shape[:2][::-1]) / np.array(image1.shape[-2:][::-1])
# Rescale keypoints and move to cpu
if "keypoints0" in pred.keys() and "keypoints1" in pred.keys():
kpts0, kpts1 = pred["keypoints0"], pred["keypoints1"]
kpts0_origin = scale_keypoints(kpts0 + 0.5, s0) - 0.5
kpts1_origin = scale_keypoints(kpts1 + 0.5, s1) - 0.5
ret = {
"image0": image0.squeeze().cpu().numpy(),
"image1": image1.squeeze().cpu().numpy(),
"image0_orig": image_0,
"image1_orig": image_1,
"keypoints0": kpts0.cpu().numpy(),
"keypoints1": kpts1.cpu().numpy(),
"keypoints0_orig": kpts0_origin.cpu().numpy(),
"keypoints1_orig": kpts1_origin.cpu().numpy(),
"mkeypoints0": kpts0.cpu().numpy(),
"mkeypoints1": kpts1.cpu().numpy(),
"mkeypoints0_orig": kpts0_origin.cpu().numpy(),
"mkeypoints1_orig": kpts1_origin.cpu().numpy(),
"original_size0": np.array(image_0.shape[:2][::-1]),
"original_size1": np.array(image_1.shape[:2][::-1]),
"new_size0": np.array(image0.shape[-2:][::-1]),
"new_size1": np.array(image1.shape[-2:][::-1]),
"scale0": s0,
"scale1": s1,
}
if "mconf" in pred.keys():
ret["mconf"] = pred["mconf"].cpu().numpy()
elif "scores" in pred.keys(): # adapting loftr
ret["mconf"] = pred["scores"].cpu().numpy()
else:
ret["mconf"] = np.ones_like(kpts0.cpu().numpy()[:, 0])
if "lines0" in pred.keys() and "lines1" in pred.keys():
if "keypoints0" in pred.keys() and "keypoints1" in pred.keys():
kpts0, kpts1 = pred["keypoints0"], pred["keypoints1"]
kpts0_origin = scale_keypoints(kpts0 + 0.5, s0) - 0.5
kpts1_origin = scale_keypoints(kpts1 + 0.5, s1) - 0.5
kpts0_origin = kpts0_origin.cpu().numpy()
kpts1_origin = kpts1_origin.cpu().numpy()
else:
kpts0_origin, kpts1_origin = (
None,
None,
) # np.zeros([0]), np.zeros([0])
lines0, lines1 = pred["lines0"], pred["lines1"]
lines0_raw, lines1_raw = pred["raw_lines0"], pred["raw_lines1"]
lines0_raw = torch.from_numpy(lines0_raw.copy())
lines1_raw = torch.from_numpy(lines1_raw.copy())
lines0_raw = scale_lines(lines0_raw + 0.5, s0) - 0.5
lines1_raw = scale_lines(lines1_raw + 0.5, s1) - 0.5
lines0 = torch.from_numpy(lines0.copy())
lines1 = torch.from_numpy(lines1.copy())
lines0 = scale_lines(lines0 + 0.5, s0) - 0.5
lines1 = scale_lines(lines1 + 0.5, s1) - 0.5
ret = {
"image0_orig": image_0,
"image1_orig": image_1,
"line0": lines0_raw.cpu().numpy(),
"line1": lines1_raw.cpu().numpy(),
"line0_orig": lines0.cpu().numpy(),
"line1_orig": lines1.cpu().numpy(),
"line_keypoints0_orig": kpts0_origin,
"line_keypoints1_orig": kpts1_origin,
}
del pred
torch.cuda.empty_cache()
return ret
@torch.no_grad()
def main(
conf: Dict,
pairs: Path,
image_dir: Path,
export_dir: Optional[Path] = None,
matches: Optional[Path] = None, # out
features: Optional[Path] = None, # out
features_ref: Optional[Path] = None,
max_kps: Optional[int] = 8192,
overwrite: bool = False,
) -> Path:
logger.info(
"Extracting semi-dense features with configuration:" f"\n{pprint.pformat(conf)}"
)
if features is None:
features = "feats_"
if isinstance(features, Path):
features_q = features
if matches is None:
raise ValueError(
"Either provide both features and matches as Path" " or both as names."
)
else:
if export_dir is None:
raise ValueError(
"Provide an export_dir if features and matches"
f" are not file paths: {features}, {matches}."
)
features_q = Path(export_dir, f'{features}{conf["output"]}.h5')
if matches is None:
matches = Path(export_dir, f'{conf["output"]}_{pairs.stem}.h5')
if features_ref is None:
features_ref = []
elif isinstance(features_ref, list):
features_ref = list(features_ref)
elif isinstance(features_ref, Path):
features_ref = [features_ref]
else:
raise TypeError(str(features_ref))
match_and_assign(
conf, pairs, image_dir, matches, features_q, features_ref, max_kps, overwrite
)
return features_q, matches
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--pairs", type=Path, required=True)
parser.add_argument("--image_dir", type=Path, required=True)
parser.add_argument("--export_dir", type=Path, required=True)
parser.add_argument("--matches", type=Path, default=confs["loftr"]["output"])
parser.add_argument(
"--features", type=str, default="feats_" + confs["loftr"]["output"]
)
parser.add_argument("--conf", type=str, default="loftr", choices=list(confs.keys()))
args = parser.parse_args()
main(
confs[args.conf],
args.pairs,
args.image_dir,
args.export_dir,
args.matches,
args.features,
)