File size: 3,643 Bytes
a80d6bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import torch
import numpy as np
import os
from collections import OrderedDict,namedtuple
import sys
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, ROOT_DIR)

from sgmnet import matcher as SGM_Model
from superglue import matcher as SG_Model
from utils import evaluation_utils

class GNN_Matcher(object):

    def __init__(self,config,model_name):
        assert model_name=='SGM' or model_name=='SG'

        config=namedtuple('config',config.keys())(*config.values())
        self.p_th=config.p_th
        self.model = SGM_Model(config) if model_name=='SGM' else SG_Model(config) 
        self.model.cuda(),self.model.eval()
        checkpoint = torch.load(os.path.join(config.model_dir, 'model_best.pth'))
        #for ddp model
        if list(checkpoint['state_dict'].items())[0][0].split('.')[0]=='module':
            new_stat_dict=OrderedDict()
            for key,value in checkpoint['state_dict'].items():
                new_stat_dict[key[7:]]=value
            checkpoint['state_dict']=new_stat_dict
        self.model.load_state_dict(checkpoint['state_dict'])

    def run(self,test_data):
        norm_x1,norm_x2=evaluation_utils.normalize_size(test_data['x1'][:,:2],test_data['size1']),\
                                                    evaluation_utils.normalize_size(test_data['x2'][:,:2],test_data['size2'])
        x1,x2=np.concatenate([norm_x1,test_data['x1'][:,2,np.newaxis]],axis=-1),np.concatenate([norm_x2,test_data['x2'][:,2,np.newaxis]],axis=-1)
        feed_data={'x1':torch.from_numpy(x1[np.newaxis]).cuda().float(),
                   'x2':torch.from_numpy(x2[np.newaxis]).cuda().float(),
                   'desc1':torch.from_numpy(test_data['desc1'][np.newaxis]).cuda().float(),
                   'desc2':torch.from_numpy(test_data['desc2'][np.newaxis]).cuda().float()}
        with torch.no_grad():
            res=self.model(feed_data,test_mode=True)
            p=res['p']
        index1,index2=self.match_p(p[0,:-1,:-1])
        corr1,corr2=test_data['x1'][:,:2][index1.cpu()],test_data['x2'][:,:2][index2.cpu()]
        if len(corr1.shape)==1:
            corr1,corr2=corr1[np.newaxis],corr2[np.newaxis]
        return corr1,corr2
    
    def match_p(self,p):#p N*M
        score,index=torch.topk(p,k=1,dim=-1)
        _,index2=torch.topk(p,k=1,dim=-2)
        mask_th,index,index2=score[:,0]>self.p_th,index[:,0],index2.squeeze(0)
        mask_mc=index2[index] == torch.arange(len(p)).cuda()
        mask=mask_th&mask_mc
        index1,index2=torch.nonzero(mask).squeeze(1),index[mask]
        return index1,index2


class NN_Matcher(object):

    def __init__(self,config):
        config=namedtuple('config',config.keys())(*config.values())
        self.mutual_check=config.mutual_check
        self.ratio_th=config.ratio_th

    def run(self,test_data):
        desc1,desc2,x1,x2=test_data['desc1'],test_data['desc2'],test_data['x1'],test_data['x2']
        desc_mat=np.sqrt(abs((desc1**2).sum(-1)[:,np.newaxis]+(desc2**2).sum(-1)[np.newaxis]-2*desc1@desc2.T))
        nn_index=np.argpartition(desc_mat,kth=(1,2),axis=-1)
        dis_value12=np.take_along_axis(desc_mat,nn_index, axis=-1)
        ratio_score=dis_value12[:,0]/dis_value12[:,1]
        nn_index1=nn_index[:,0]
        nn_index2=np.argmin(desc_mat,axis=0)
        mask_ratio,mask_mutual=ratio_score<self.ratio_th,np.arange(len(x1))==nn_index2[nn_index1]
        corr1,corr2=x1[:,:2],x2[:,:2][nn_index1]
        if self.mutual_check:
            mask=mask_ratio&mask_mutual
        else:
            mask=mask_ratio
        corr1,corr2=corr1[mask],corr2[mask]
        return corr1,corr2