SalazarPevelll
be
f291f4a
import torch
import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import euclidean_distances
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
class SkeletonGenerator:
"""SkeletonGenerator except allows for generate skeleton"""
def __init__(self, data_provider, epoch, interval=25,base_num_samples=10):
"""
interval: int : layer number of the radius
"""
self.data_provider = data_provider
self.epoch = epoch
self.interval = interval
self.base_num_samples= base_num_samples
def skeleton_gen(self):
torch.manual_seed(0) # freeze the radom seed
torch.cuda.manual_seed_all(0)
# Set the random seed for numpy
np.random.seed(0)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
train_data=self.data_provider.train_representation(epoch=self.epoch)
train_data = torch.Tensor(train_data)
center = train_data.mean(dim=0)
# calculate the farest distance
radius = ((train_data - center)**2).sum(dim=1).max().sqrt()
# print("radius,radius",radius)
# min_radius_log = np.log10(1e-3)
# max_radius_log = np.log10(radius.item())
# # *****************************************************************************************
# # generate 100 points in log space
# radii_log = np.linspace(max_radius_log, min_radius_log, self.interval)
# # convert back to linear space
# radii = 10 ** radii_log
# generate points in log space
# generate points in linear space
radii = self.create_decreasing_array(1e-3,radius.item(), self.interval)
epsilon = 1e-2
train_data_distances = ((train_data - center)**2).sum(dim=1).sqrt().cpu().detach().numpy()
# calculate the number of samples for each radius
num_samples_per_radius_l = []
for r in radii:
close_points_indices = np.where(np.abs(train_data_distances - r) < epsilon)[0]
close_points = train_data[close_points_indices].cpu().detach().numpy()
print("len()",r, len(close_points))
# calculate the log surface area for the current radius
# convert it back to the original scale
# calculate the number of samples
base_num_samples = len(close_points) + 1
num_samples = int(base_num_samples * r // 4)
num_samples_per_radius_l.append(num_samples)
# *****************************************************************************************
# radii = [radius*1.1, radius, radius / 2, radius / 4, radius / 10, 1e-3] # radii at which to sample points
# # num_samples_per_radius_l = [500, 500, 500, 500, 500, 500] # number of samples per radius
# aaa = 500
# num_samples_per_radius_l = [aaa, aaa, aaa, aaa, aaa, aaa] # number of samples per radius
print("num_samples_per_radius_l",radii)
print("num_samples_per_radssius_l",num_samples_per_radius_l)
# list to store samples at all radii
high_bom_samples = []
for i in range(len(radii)):
r = radii[i]
num_samples_per_radius = num_samples_per_radius_l[i]
# sample points on the sphere with radius r
samples = torch.randn(num_samples_per_radius, 512)
samples = samples / samples.norm(dim=1, keepdim=True) * r
high_bom_samples.append(samples)
# concatenate samples from all radii
high_bom = torch.cat(high_bom_samples, dim=0)
high_bom = high_bom.cpu().detach().numpy()
print("shape", high_bom.shape)
# calculate the distance of each training point to the center
# for each radius, find the training data points close to it and add them to the high_bom
epsilon = 1e-3 # the threshold for considering a point is close to the radius
for r in radii:
close_points_indices = np.where(np.abs(train_data_distances - r) < epsilon)[0]
close_points = train_data[close_points_indices].cpu().detach().numpy()
high_bom = np.concatenate((high_bom, close_points), axis=0)
return high_bom
def skeleton_gen_union(self):
torch.manual_seed(0) # freeze the radom seed
torch.cuda.manual_seed_all(0)
# Set the random seed for numpy
np.random.seed(0)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
train_data=self.data_provider.train_representation(epoch=self.epoch)
# border_data = self.data_provider.border_representation(epoch=self.epoch)
# train_data = np.concatenate((train_data,border_data),axis=0)
kmeans = KMeans(n_clusters=1) # 'k' 是你想要的聚类数量
# 训练模型
kmeans.fit(train_data)
# 获取聚类中心
center = kmeans.cluster_centers_[0]
center = torch.Tensor(center)
# calculate the farest distance
radius = ((train_data - center)**2).sum(dim=1).max().sqrt()
print("radius,radius",radius)
min_radius_log = np.log10(1e-3)
max_radius_log = np.log10(radius.item() * 1)
# *****************************************************************************************
# generate 100 points in log space
radii_log = np.linspace(max_radius_log, min_radius_log, self.interval)
# convert back to linear space
radii = 10 ** radii_log
# calculate the number of samples for each radius
num_samples_per_radius_l = []
for r in radii:
# calculate the log surface area for the current radius
# convert it back to the original scale
# calculate the number of samples
num_samples = int(self.base_num_samples * r // 2)
num_samples_per_radius_l.append(num_samples)
# *****************************************************************************************
radius = radius.item()
# radii = [radius*1.1, radius, radius / 2, radius / 4, radius / 10, 1e-3] # radii at which to sample points
radii = [ radius / 4, radius / 10, 1e-3] # radii at which to sample points
# num_samples_per_radius_l = [500, 500, 500, 500, 500, 500] # number of samples per radius
aaa = 200
num_samples_per_radius_l = [aaa, aaa, aaa, aaa, aaa, aaa] # number of samples per radius
print("num_samples_per_radius_l",radii)
print("num_samples_per_radius_l",num_samples_per_radius_l)
# list to store samples at all radii
high_bom_samples = []
for i in range(len(radii)):
r = radii[i]
num_samples_per_radius = num_samples_per_radius_l[i]
# sample points on the sphere with radius r
samples = torch.randn(num_samples_per_radius, 512)
samples = samples / samples.norm(dim=1, keepdim=True) * r
high_bom_samples.append(samples)
# concatenate samples from all radii
high_bom = torch.cat(high_bom_samples, dim=0)
high_bom = high_bom.cpu().detach().numpy()
print("shape", high_bom.shape)
# calculate the distance of each training point to the center
train_data_distances = ((train_data - center)**2).sum(dim=1).sqrt().cpu().detach().numpy()
# for each radius, find the training data points close to it and add them to the high_bom
epsilon = 1e-2 # the threshold for considering a point is close to the radius
for r in radii:
close_points_indices = np.where(np.abs(train_data_distances - r) < epsilon)[0]
close_points = train_data[close_points_indices].cpu().detach().numpy()
high_bom = np.concatenate((high_bom, close_points), axis=0)
return high_bom
def skeleton_gen_use_perturb(self, _epsilon=1e-2, _per=0.7):
"""
find the nearest training data for each radius,
and then generate new proxes by this add perturbation on these nearest training data
"""
torch.manual_seed(0) # freeze the random seed
torch.cuda.manual_seed_all(0)
np.random.seed(0)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
epsilon = _epsilon
train_data = self.data_provider.train_representation(epoch=self.epoch)
# border_data = self.data_provider.border_representation(epoch=self.epoch)
# train_data = np.concatenate((train_data,border_data),axis=0)
train_data = torch.Tensor(train_data)
center = train_data.mean(dim=0)
# calculate the furthest distance
max_radius = ((train_data - center)**2).sum(dim=1).max().sqrt().item()
min_radius = max_radius * _per # this is your minimum radius
# interval = int(max_radius * 12.8) #MINNIST and CIFAR 10
interval = int(max_radius * 12.8)
print("max_radius", max_radius,"interval",interval)
# split the interval between max_radius and min_radius into 100 parts
radii = np.linspace(max_radius, min_radius, interval)
high_bom_samples = []
train_data_distances = ((train_data - center)**2).sum(dim=1).sqrt().cpu().detach().numpy()
print(train_data_distances)
for r in radii:
# find the training data that is close to the current radius
close_points_indices = np.where(np.abs(train_data_distances - r) < epsilon)[0]
close_points = train_data[close_points_indices]
# calculate the unit vector from center to the points
direction_to_center = (close_points - center) / torch.norm(close_points - center, dim=1, keepdim=True)
# add a small perturbation along the direction to the center to get the proxies on the sphere with radius r
noise = direction_to_center * (epsilon)
# noise = direction_to_center * torch.randn_like(close_points) * epsilon
proxies = (close_points + noise).cpu().detach().numpy()
# add the proxies to the skeleton
high_bom_samples.append(proxies)
high_bom = np.concatenate(high_bom_samples, axis=0)
return high_bom
def gen_skeleton_by_center(self,):
train_data = self.data_provider.train_representation(self.epoch)
kmeans = KMeans(n_clusters=1) # 'k' 是你想要的聚类数量
# 训练模型
kmeans.fit(train_data)
# 获取聚类中心
centers = kmeans.cluster_centers_
return
def create_decreasing_array(self,min_val, max_val, levels, factor=0.8):
# Calculate the total range
range_val = max_val - min_val
# Create an array with the specified number of levels
level_indices = np.arange(levels)
# Apply the factor to the levels
scaled_levels = factor ** level_indices
# Scale the values to fit within the range
scaled_values = scaled_levels * range_val / np.max(scaled_levels)
# Shift the values to start at the min_val
final_values = max_val - scaled_values
return final_values
class CenterSkeletonGenerator:
"""SkeletonGenerator except allows for generate skeleton"""
def __init__(self, data_provider, epoch,threshold=0.5,min_cluster=500):
"""
"""
self.data_provider = data_provider
self.epoch = epoch
self.threshold = threshold
self.min_cluster = min_cluster
def gen_center(self,data,k=2):
"""
"""
kmeans = KMeans(n_clusters=k)
kmeans.fit(data)
centers = kmeans.cluster_centers_
labels = kmeans.labels_
radii = []
for i in range(k):
cluster_data = data[labels == i]
if len(cluster_data) > 0:
# calculate each sample distance to center
distances = np.sqrt(((cluster_data - centers[i]) ** 2).sum(axis=1))
radii.append(np.max(distances))
else:
radii.append(0)
return centers,labels,radii
def if_need_split(self, data):
if len(data) < self.min_cluster:
return False
kmeans = KMeans(n_clusters=2)
kmeans.fit(data)
labels = kmeans.labels_
dunn_index = self.dunns_index(data, labels)
print(dunn_index)
return dunn_index < self.threshold
def dunns_index(self, X, labels):
distance_matrix = euclidean_distances(X)
inter_cluster_distances = []
intra_cluster_distances = []
unique_labels = np.unique(labels)
# Check if we have at least two clusters
if len(unique_labels) < 2:
return float('inf') # Ineligible for splitting
# Compute maximal intra-cluster distance
for label in unique_labels:
members = np.where(labels == label)[0]
if len(members) <= 1: # Skip clusters with only one member
continue
pairwise_distances = distance_matrix[np.ix_(members, members)]
intra_cluster_distances.append(np.max(pairwise_distances))
if not intra_cluster_distances: # No eligible clusters found
return float('inf')
max_intra_cluster_distance = max(intra_cluster_distances)
# Compute minimal inter-cluster distance
for i in range(len(unique_labels)):
for j in range(i+1, len(unique_labels)):
members_i = np.where(labels == unique_labels[i])[0]
members_j = np.where(labels == unique_labels[j])[0]
pairwise_distances = distance_matrix[np.ix_(members_i, members_j)]
inter_cluster_distances.append(np.min(pairwise_distances))
if not inter_cluster_distances: # No eligible clusters found
return float('inf')
return min(inter_cluster_distances) / max_intra_cluster_distance
# def if_need_split(self, data):
# if len(data) < self.min_cluster:
# return False
# kmeans = KMeans(n_clusters=1)
# kmeans.fit(data)
# centers = kmeans.cluster_centers_
# center = centers[0]
# train_data_distances = np.sqrt(((data - center)**2).sum(axis=1))
# pred = self.data_provider.get_pred(self.epoch,np.concatenate((data,centers),axis=0))
# distance_condition = np.any(train_data_distances > self.distance_condition_val)
# variance_condition = np.any(np.var(pred, axis=0) > self.variance_condition_val)
# return distance_condition or variance_condition
def recursive_clustering(self, data,k=2):
centers, labels, radii = self.gen_center(data, k=k)
all_centers = list(centers) # Save intermediate centers
all_radii = list(radii)
for label in set(labels):
cluster = data[labels == label]
if len(cluster):
if self.if_need_split(cluster):
# all_centers.extend(self.recursive_clustering(cluster, k=2))
sub_centers, sub_radii = self.recursive_clustering(cluster, k=2)
all_centers.extend(sub_centers)
all_radii.extend(sub_radii)
return all_centers, all_radii
def center_skeleton_genertaion(self):
# Initial centers
data = self.data_provider.train_representation(self.epoch)
centers_c, _, radii_c = self.gen_center(self.data_provider.train_representation(self.epoch),k=1)
centers_n, labels,radii_n = self.gen_center(self.data_provider.train_representation(self.epoch),k=10)
print("finished init")
# Recursive clustering
# Recursive clustering with initial split into 10 clusters
all_centers = []
all_radii = [] # 存储所有簇的最大半径
for label in range(len(labels)):
cluster = data[labels == label]
if len(cluster):
# all_centers.extend(self.recursive_clustering(cluster, k=2))
sub_centers, sub_radii = self.recursive_clustering(cluster, k=2)
all_centers.extend(sub_centers)
all_radii.extend(sub_radii)
all_centers = np.array(all_centers)
all_radii = np.array(all_radii)
return np.concatenate((centers_c,centers_n,all_centers),axis=0),np.concatenate((radii_c, radii_n, all_radii), axis=0)