Spaces:
Running
Running
File size: 4,028 Bytes
a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 a80d6bb c74a070 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
import torch
import numpy as np
import os
from collections import OrderedDict, namedtuple
import sys
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, ROOT_DIR)
from sgmnet import matcher as SGM_Model
from superglue import matcher as SG_Model
from utils import evaluation_utils
class GNN_Matcher(object):
def __init__(self, config, model_name):
assert model_name == "SGM" or model_name == "SG"
config = namedtuple("config", config.keys())(*config.values())
self.p_th = config.p_th
self.model = SGM_Model(config) if model_name == "SGM" else SG_Model(config)
self.model.cuda(), self.model.eval()
checkpoint = torch.load(os.path.join(config.model_dir, "model_best.pth"))
# for ddp model
if list(checkpoint["state_dict"].items())[0][0].split(".")[0] == "module":
new_stat_dict = OrderedDict()
for key, value in checkpoint["state_dict"].items():
new_stat_dict[key[7:]] = value
checkpoint["state_dict"] = new_stat_dict
self.model.load_state_dict(checkpoint["state_dict"])
def run(self, test_data):
norm_x1, norm_x2 = evaluation_utils.normalize_size(
test_data["x1"][:, :2], test_data["size1"]
), evaluation_utils.normalize_size(test_data["x2"][:, :2], test_data["size2"])
x1, x2 = np.concatenate(
[norm_x1, test_data["x1"][:, 2, np.newaxis]], axis=-1
), np.concatenate([norm_x2, test_data["x2"][:, 2, np.newaxis]], axis=-1)
feed_data = {
"x1": torch.from_numpy(x1[np.newaxis]).cuda().float(),
"x2": torch.from_numpy(x2[np.newaxis]).cuda().float(),
"desc1": torch.from_numpy(test_data["desc1"][np.newaxis]).cuda().float(),
"desc2": torch.from_numpy(test_data["desc2"][np.newaxis]).cuda().float(),
}
with torch.no_grad():
res = self.model(feed_data, test_mode=True)
p = res["p"]
index1, index2 = self.match_p(p[0, :-1, :-1])
corr1, corr2 = (
test_data["x1"][:, :2][index1.cpu()],
test_data["x2"][:, :2][index2.cpu()],
)
if len(corr1.shape) == 1:
corr1, corr2 = corr1[np.newaxis], corr2[np.newaxis]
return corr1, corr2
def match_p(self, p): # p N*M
score, index = torch.topk(p, k=1, dim=-1)
_, index2 = torch.topk(p, k=1, dim=-2)
mask_th, index, index2 = score[:, 0] > self.p_th, index[:, 0], index2.squeeze(0)
mask_mc = index2[index] == torch.arange(len(p)).cuda()
mask = mask_th & mask_mc
index1, index2 = torch.nonzero(mask).squeeze(1), index[mask]
return index1, index2
class NN_Matcher(object):
def __init__(self, config):
config = namedtuple("config", config.keys())(*config.values())
self.mutual_check = config.mutual_check
self.ratio_th = config.ratio_th
def run(self, test_data):
desc1, desc2, x1, x2 = (
test_data["desc1"],
test_data["desc2"],
test_data["x1"],
test_data["x2"],
)
desc_mat = np.sqrt(
abs(
(desc1**2).sum(-1)[:, np.newaxis]
+ (desc2**2).sum(-1)[np.newaxis]
- 2 * desc1 @ desc2.T
)
)
nn_index = np.argpartition(desc_mat, kth=(1, 2), axis=-1)
dis_value12 = np.take_along_axis(desc_mat, nn_index, axis=-1)
ratio_score = dis_value12[:, 0] / dis_value12[:, 1]
nn_index1 = nn_index[:, 0]
nn_index2 = np.argmin(desc_mat, axis=0)
mask_ratio, mask_mutual = (
ratio_score < self.ratio_th,
np.arange(len(x1)) == nn_index2[nn_index1],
)
corr1, corr2 = x1[:, :2], x2[:, :2][nn_index1]
if self.mutual_check:
mask = mask_ratio & mask_mutual
else:
mask = mask_ratio
corr1, corr2 = corr1[mask], corr2[mask]
return corr1, corr2
|