File size: 14,675 Bytes
f291f4a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
"""The Sampeling class serve as a helper module for retriving subject model data"""
from abc import ABC, abstractmethod
import os
import gc
import time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from singleVis.utils import *
# from sklearn.neighbors import NearestNeighbors
# from scipy.special import gamma
# import math
# from pynndescent import NNDescent
# from sklearn.cluster import KMeans
from scipy.special import softmax
import torch
from torch import nn
from torch.nn import functional as F
class VAE(nn.Module):
def __init__(self, input_dim, hidden_dim, latent_dim):
super(VAE, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc21 = nn.Linear(hidden_dim, latent_dim)
self.fc22 = nn.Linear(hidden_dim, latent_dim)
self.fc3 = nn.Linear(latent_dim, hidden_dim)
self.fc4 = nn.Linear(hidden_dim, input_dim)
def encode(self, x):
h1 = F.relu(self.fc1(x))
return self.fc21(h1), self.fc22(h1)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5*logvar)
eps = torch.randn_like(std)
return mu + eps*std
def decode(self, z):
h3 = F.relu(self.fc3(z))
return torch.sigmoid(self.fc4(h3))
def forward(self, x):
mu, logvar = self.encode(x.view(-1, 512))
z = self.reparameterize(mu, logvar)
return self.decode(z), mu, logvar
"""
DataContainder module
1. calculate information entropy for singel sample and subset
2. sample informative subset
"""
class DataGenerationAbstractClass(ABC):
def __init__(self, data_provider, epoch):
self.mode = "abstract"
self.data_provider = data_provider
# self.model = model
self.epoch = epoch
# @abstractmethod
# def info_calculator(self):
# pass
class DataGeneration(DataGenerationAbstractClass):
def __init__(self, model, data_provider, epoch, device):
self.data_provider = data_provider
self.model = model
# self.model = model
self.epoch = epoch
self.DEVICE = device
def generate_adversarial_example(self,input_data, target,epsilon):
self.model.to(self.DEVICE)
self.model.eval()
# 对输入数据进行梯度追踪
input_data.requires_grad = True
target = target.to(self.DEVICE)
# 正向传播计算模型的输出
output = self.model(input_data)
loss_function = nn.CrossEntropyLoss()
target = target.expand(input_data.size(0))
loss = loss_function(output, target)
"""calculate the input data's graint of the loss function """
self.model.zero_grad()
loss.backward()
gradient = input_data.grad.data
# generate adv samples
adversarial_example = input_data + epsilon * gradient.sign()
return adversarial_example
def gen(self,epsilon=0.2,sample_ratio=0.1):
labels = self.data_provider.train_labels(self.epoch)
# training_data = self.data_provider.train_representation(self.epoch)
training_data_path = os.path.join(self.data_provider.content_path, "Training_data")
training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"),
map_location="cpu")
training_data = training_data.to(self.DEVICE)
sample_ratio = sample_ratio
adversarial_samples = []
epsilon = epsilon # perturbration
for label in range(10):
indices = np.where(labels == label)[0] # indices of data in the current cluster
sample_size = int(len(indices) * sample_ratio) # number of samples to select
sampled_indices = np.random.choice(indices, size=sample_size, replace=False) # select samples without replacement
sampled_data = torch.Tensor(training_data[sampled_indices])
print("sampeled data:{}".format(len(sampled_data)))
for i in range(10):
if i == label:
continue
target_label = i
target = torch.tensor([target_label]) # target label
adversarial_example = self.generate_adversarial_example(sampled_data, target, epsilon)
print("generating class {} 's adversary sampes for target{}, num of adv{}".format(label,i,len(adversarial_example)))
adversarial_samples.extend(adversarial_example)
repr_model = self.feature_function(self.epoch)
adversarial_samples_torch = torch.stack(adversarial_samples)
print("adversarial_samples_torch", adversarial_samples_torch.shape)
data_representation = batch_run(repr_model,adversarial_samples_torch)
np.save(os.path.join(self.data_provider.content_path, "Model", "Epoch_{}".format(self.epoch), "adv_representation.npy"),data_representation )
return adversarial_samples,data_representation
def gen_specific_class_adv(self,epsilon=0.2,sample_ratio=0.1,from_label=1,target_label=2):
labels = self.data_provider.train_labels(self.epoch)
# training_data = self.data_provider.train_representation(self.epoch)
training_data_path = os.path.join(self.data_provider.content_path, "Training_data")
training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"),
map_location="cpu")
training_data = training_data.to(self.DEVICE)
sample_ratio = sample_ratio
adversarial_samples = []
epsilon = epsilon # perturbration
indices = np.where(labels == from_label)[0] # indices of data in the current cluster
sample_size = int(len(indices) * sample_ratio) # number of samples to select
sampled_indices = np.random.choice(indices, size=sample_size, replace=False) # select samples without replacement
sampled_data = torch.Tensor(training_data[sampled_indices])
print("sampeled data:{}".format(len(sampled_data)))
target_label = target_label
target = torch.tensor([target_label]) # target label
adversarial_example = self.generate_adversarial_example(sampled_data, target, epsilon)
print("generating class {} 's adversary sampes for target{}, num of adv{}".format(from_label,target_label,len(adversarial_example)))
adversarial_samples.extend(adversarial_example)
repr_model = self.feature_function(self.epoch)
adversarial_samples_torch = torch.stack(adversarial_samples)
print("adversarial_samples_torch", adversarial_samples_torch.shape)
data_representation = batch_run(repr_model,adversarial_samples_torch)
return adversarial_samples,data_representation
def feature_function(self, epoch):
model_path = os.path.join(self.data_provider.content_path, "Model")
model_location = os.path.join(model_path, "{}_{:d}".format('Epoch', epoch), "subject_model.pth")
self.model.load_state_dict(torch.load(model_location, map_location=torch.device("cpu")))
self.model = self.model.to(self.DEVICE)
self.model.eval()
fea_fn = self.model.feature
return fea_fn
def vae_loss(self,recon_x, x, mu, logvar):
BCE = F.binary_cross_entropy(recon_x, x.view(-1, 512), reduction='sum')
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return BCE + KLD
def generate_by_VAE(self):
train_data = self.data_provider.train_representation(self.epoch)
data_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True)
vae = VAE(512, 256, 2).to(self.data_provider.DEVICE) # Example dimensions
optimizer = optim.Adam(vae.parameters())
vae.train()
num_epochs = 20 # Example
for epoch in range(num_epochs):
for i, data in enumerate(data_loader):
data = data.to(self.data_provider.DEVICE)
optimizer.zero_grad()
recon_batch, mu, logvar = vae(data)
loss = self.vae_loss(recon_batch, data, mu, logvar)
loss.backward()
optimizer.step()
print(f'Epoch {epoch}, Loss: {loss.item()}')
with torch.no_grad():
mu, _ = vae.encode(torch.Tensor(train_data).to(self.data_provider.DEVICE))
mu = mu.cpu().numpy() # Convert to numpy array for easier manipulation
ebd_min = np.min(mu, axis=0)
ebd_max = np.max(mu, axis=0)
ebd_extent = ebd_max - ebd_min
x_min, y_min = ebd_min - 0.02 * ebd_extent
x_max, y_max = ebd_max + 0.02 * ebd_extent
x_min = min(x_min, y_min)
y_min = min(x_min, y_min)
x_max = max(x_max, y_max)
y_max = max(x_max, y_max)
num_points =100 # for example
x_values = np.linspace(x_min, x_max, num_points)
y_values = np.linspace(y_min, y_max, num_points)
x_grid, y_grid = np.meshgrid(x_values, y_values)
z_grid = np.column_stack([x_grid.flat, y_grid.flat]) # Make a 2D array of shape (num_points**2, 2)
with torch.no_grad():
z = torch.tensor(z_grid).to(self.data_provider.DEVICE).float()
samples = vae.decode(z)
# np.save(os.path.join(self.data_provider.content_path, "Model", "Epoch_{}".format(20),"VAE_GEN.npy"), samples)
return samples
### ===================================================interpolate_samples ===========================
def interpolate_samples(self, sample1, sample2, t):
return t * sample1 + (1 - t) * sample2
def select_samples_from_different_classes(self, X, labels):
classes = np.unique(labels)
selected_samples = []
for i in range(len(classes)-1):
for j in range(i+1, len(classes)):
samples_class_i = X[labels == classes[i]]
samples_class_j = X[labels == classes[j]]
sample1 = samples_class_i[np.random.choice(samples_class_i.shape[0])]
sample2 = samples_class_j[np.random.choice(samples_class_j.shape[0])]
selected_samples.append((sample1, sample2))
return selected_samples
def get_conf(self, epoch, interpolated_X):
predctions = self.data_provider.get_pred(epoch, interpolated_X)
scores = np.amax(softmax(predctions, axis=1), axis=1)
return scores
def generate_interpolated_samples(self, X, labels, get_conf, num_interpolations_per_bin):
selected_samples = self.select_samples_from_different_classes(X, labels)
# confidence_bins = np.linspace(0, 1, 11)[1:-1] # 置信度区间
confidence_bins = np.linspace(0.5, 1, 6)[1:-1] # 置信度区间
interpolated_X = {bin: [] for bin in confidence_bins} # 储存插值样本的字典,键是置信度区间,值是插值样本
# 执行循环,直到每个置信度区间都有足够的插值样本
while min([len(samples) for samples in interpolated_X.values()]) < num_interpolations_per_bin:
batch_samples = []
for _ in range(100):
# 选择两个样本并生成插值样本
sample1, sample2 = selected_samples[np.random.choice(len(selected_samples))]
t = np.random.rand() # 随机选择插值参数t
interpolated_sample = self.interpolate_samples(sample1, sample2, t)
batch_samples.append(interpolated_sample)
# 计算插值样本的置信度并根据置信度将其分配到相应的区间
confidences = get_conf(self.iteration, np.array(batch_samples))
for i, confidence in enumerate(confidences):
for bin in confidence_bins:
if confidence < bin:
interpolated_X[bin].append(batch_samples[i])
# print("interpolated_X",len(interpolated_X[0.6]),len(interpolated_X[0.7]))
break
return interpolated_X
def inter_gen(self,num_pairs=2000):
train_data = self.data_provider.train_representation
labels = self.data_provider.train_labels
num_pairs = num_pairs
interpolated_X_div = self.generate_interpolated_samples(train_data,labels,self.get_conf,num_pairs)
confidence_bins = np.linspace(0.5, 1, 6)[1:-1] # 置信度区间
interpolated_X = np.concatenate([np.array(interpolated_X_div[bin]) for bin in confidence_bins])
np.save(os.path.join(self.data_provider.content_path, "Model", "Epoch_{}".format(self.iteration),"interpolated_X.npy"), interpolated_X)
return interpolated_X
######
def gen_more_boundary_mixed_up(self,l_bound=0.6,num_adv_eg=6000,name='border_centers_1.npy'):
training_data_path = os.path.join(self.data_provider.content_path, "Training_data")
training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"),
map_location="cpu")
training_data = training_data.to(self.DEVICE)
self.model = self.model.to(self.DEVICE)
confs = batch_run(self.model, training_data)
preds = np.argmax(confs, axis=1).squeeze()
repr_model = self.feature_function(self.epoch)
print("border_points generating...")
border_points, _, _ = get_border_points(model=self.model, input_x=training_data, confs=confs, predictions=preds, device=self.DEVICE, l_bound=l_bound, num_adv_eg=num_adv_eg, lambd=0.05, verbose=0)
# get gap layer data
border_points = border_points.to(self.DEVICE)
border_centers = batch_run(repr_model, border_points)
model_path = os.path.join(self.data_provider.content_path, "Model")
location = os.path.join(model_path, "Epoch_{:d}".format(self.epoch), name)
print("border_points saving...")
np.save(location, border_centers)
return border_centers
def get_near_epoch_border(self,n_epoch):
model_path = os.path.join(self.data_provider.content_path, "Model")
location = os.path.join(model_path, "Epoch_{:d}".format(n_epoch), "ori_border_centers.npy")
border_points = np.load(location)
border_points = torch.Tensor(border_points)
border_points = border_points.to(self.DEVICE)
repr_model = self.feature_function(self.epoch)
border_centers = batch_run(repr_model, border_points)
return border_centers
|