SalazarPevelll
be
f291f4a
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Returns points that minimizes the maximum distance of any point to a center. namely hausdorff distance
Implements the k-Center-Greedy method in
Ozan Sener and Silvio Savarese. A Geometric Approach to Active Learning for
Convolutional Neural Networks. https://arxiv.org/abs/1708.00489 2017
Distance metric defaults to l2 distance. Features used to calculate distance
are either raw features or if a model has transform method then uses the output
of model.transform(X).
Can be extended to a robust k centers algorithm that ignores a certain number of
outlier datapoints. Resulting centers are solution to multiple integer program.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import numpy as np
import math
from sklearn.metrics import pairwise_distances
class kCenterGreedy(object):
def __init__(self, X, metric='euclidean'):
self.features = X.reshape(len(X), -1)
self.name = 'kcenter'
self.metric = metric
self.min_distances = None
self.n_obs = self.features.shape[0]
self.already_selected = []
def update_distances(self, cluster_centers, only_new=True, reset_dist=False):
"""Update min distances given cluster centers.
Args:
cluster_centers: indices of cluster centers
only_new: only calculate distance for newly selected points and update
min_distances.
rest_dist: whether to reset min_distances.
"""
if reset_dist:
self.min_distances = None
if only_new:
cluster_centers = [d for d in cluster_centers
if d not in self.already_selected]
if cluster_centers is not None:
# Update min_distances for all examples given new cluster center.
x = self.features[cluster_centers]
dist = pairwise_distances(self.features, x, metric=self.metric)
if self.min_distances is None:
self.min_distances = np.min(dist, axis=1).reshape(-1,1)
else:
self.min_distances = np.minimum(self.min_distances, dist)
def select_batch_with_budgets(self, already_selected, budgets, return_min=False):
"""
Diversity promoting active learning method that greedily forms a batch
to minimize the maximum distance to a cluster center among all unlabeled
datapoints.
Args:
budgets: batch size
Returns:
indices of points selected to minimize distance to cluster centers
"""
print('Calculating distances...')
t0 = time.time()
self.update_distances(already_selected, only_new=False, reset_dist=True)
t1 = time.time()
print("calculating distances for {:d} points within {:.2f} seconds...".format(len(already_selected), t1 - t0))
new_batch = []
for _ in range(budgets):
ind = np.argmax(self.min_distances)
# New examples should not be in already selected since those points
# should have min_distance of zero to a cluster center.
assert ind not in already_selected
self.update_distances([ind], only_new=True, reset_dist=False)
new_batch.append(ind)
print('Hausdorff distance is {:.2f} with {:d} points'.format(self.min_distances.max(), len(already_selected)+len(new_batch)))
self.already_selected = np.concatenate((already_selected, np.array(new_batch)))
if return_min:
return new_batch, self.min_distances.max()
return new_batch
def covering_perc(self, p):
"""given a dist, return the covering percentage"""
sorted = np.sort(self.min_distances.reshape(-1))
return sorted[int(self.n_obs*p)]
def hausdorff(self):
return self.min_distances.max()
def select_batch_with_cn(self, already_selected, r_max, c_c0, d_d0, p=0.95, return_min=False):
"""
Diversity promoting active learning method that greedily forms a batch
to minimize the maximum distance to a cluster center among all unlabeled
datapoints.
Args:
budgets: batch size
Returns:
indices of points selected to minimize distance to cluster centers
"""
print('Calculating distances...')
t0 = time.time()
self.update_distances(already_selected, only_new=False, reset_dist=True)
t1 = time.time()
print("calculating distances for {:d} points within {:.2f} seconds...".format(len(already_selected), t1 - t0))
new_batch = []
while True:
ind = np.argmax(self.min_distances)
r_cover = self.min_distances[ind][0]
if r_cover/c_c0/d_d0 < r_max:
break
# New examples should not be in already selected since those points
# should have min_distance of zero to a cluster center.
assert ind not in already_selected
self.update_distances([ind], only_new=True, reset_dist=False)
new_batch.append(ind)
# sorted = np.sort(self.min_distances.reshape(-1))
# r_cover = sorted[int(self.n_obs*p)-1]
print('Hausdorff distance is {:.2f} with {:d} points'.format(r_cover, len(already_selected)+len(new_batch)))
self.already_selected = np.concatenate((already_selected, np.array(new_batch)))
if return_min:
return new_batch, self.min_distances.max()
return new_batch
def fps(data, num):
import torch
from dgl.geometry import farthest_point_sampler
data = torch.from_numpy(data[np.newaxis,:,:]).to(device=torch.device("cuda"))
point_idxs = farthest_point_sampler(data, num).squeeze(axis=0)
point_idxs = point_idx.cpu().numpy()
return point_idxs
if __name__ == "__main__":
data = np.random.rand(500,10)
selected_idxs = np.random.choice(500, size=10, replace=False)
kc = kCenterGreedy(data)
_ = kc.select_batch_with_budgets(selected_idxs, 50)
c0 = kc.hausdorff()