|
import numpy as np |
|
from cleverhans.future.tf2.attacks import fast_gradient_method |
|
import pandas as pd |
|
from sklearn.model_selection import KFold |
|
import sys |
|
import tensorflow |
|
import tensorflow as tf |
|
from multiprocessing import Pool |
|
|
|
from _utility import lrate, get_adversarial_examples, print_test, step_decay |
|
import hickle as hkl |
|
import pickle |
|
|
|
model_name = "ResNet_da" |
|
|
|
|
|
class AdversarialTraining(object): |
|
""" |
|
The class provides an adversarial training for a given model and epsilon values. |
|
In addition to this, the class changes the half of the batch with their adversarial examples. |
|
The adversarial exaples obtain using fast gradient sign method of CleverHans framework. |
|
""" |
|
|
|
def __init__(self, parameter): |
|
self.epochs = parameter["epochs"] |
|
self.batch_size = parameter["batch_size"] |
|
self.optimizer = parameter["optimizer"] |
|
|
|
self.generator = tf.keras.preprocessing.image.ImageDataGenerator( |
|
rotation_range=10, |
|
width_shift_range=5.0 / 32, |
|
height_shift_range=5.0 / 32, |
|
) |
|
|
|
def train(self, model, train_dataset, val_dataset, epsilon_list): |
|
|
|
|
|
for epoch in range(self.epochs): |
|
lr_rate = step_decay(epoch) |
|
tf.keras.backend.set_value(model.optimizer.learning_rate, lr_rate) |
|
|
|
for step, (x_train, y_train) in enumerate(train_dataset): |
|
print(step) |
|
x_train = self.data_augmentation(x_train, y_train, model, epsilon_list) |
|
model.fit( |
|
self.generator.flow(x_train, y_train, self.batch_size), |
|
batch_size=self.batch_size, |
|
verbose=0.0, |
|
) |
|
|
|
def data_augmentation(self, X_train, Y_train, pretrained_model, epsilon_list): |
|
"""[summary] |
|
|
|
Args: |
|
X_train ([type]): Training inputs |
|
Y_train ([type]): outputs |
|
epsilon_list ([type]): according to SNR |
|
|
|
Returns: |
|
augmented batch which consists of the adversarial and clean examples. |
|
""" |
|
first_half_end = int(len(X_train) / 2) |
|
second_half_end = int(len(X_train)) |
|
x_clean = X_train[0:first_half_end, :, :, :] |
|
x_adv = self.get_adversarial( |
|
pretrained_model, |
|
X_train[first_half_end:second_half_end, :, :, :], |
|
Y_train[first_half_end:second_half_end], |
|
epsilon_list, |
|
) |
|
x_mix = self.merge_data(x_clean, x_adv) |
|
y_mix = Y_train[0:second_half_end] |
|
|
|
return x_mix, y_mix |
|
|
|
def merge_data(self, x_clean, x_adv): |
|
"""[summary] |
|
|
|
Args: |
|
x_clean ([type]): [description] |
|
x_adv ([type]): [description] |
|
|
|
Returns: |
|
combine the clean and adversarial inputs. |
|
""" |
|
x_mix = [] |
|
for i in range(len(x_clean)): |
|
x_mix.append(x_clean[i]) |
|
for j in range(len(x_adv)): |
|
x_mix.append(x_adv[j]) |
|
x_mix = np.array(x_mix) |
|
|
|
return x_mix |
|
|
|
def get_adversarial(self, logits_model, X_true, y_true, epsilon_list): |
|
return self.adversarial_example(logits_model, X_true, y_true, epsilon_list) |
|
|
|
def adversarial_example(self, logits_model, X_true, y_true, epsilon_list): |
|
X_adv = [] |
|
|
|
for index, x_true in enumerate(X_true): |
|
epsilon = epsilon_list[index] |
|
|
|
original_image = x_true |
|
original_image = tf.reshape(original_image, (1, 32, 32)) |
|
original_label = y_true[index] |
|
original_label = np.reshape(np.argmax(original_label), (1,)).astype("int64") |
|
adv_example_targeted_label = fast_gradient_method( |
|
logits_model, |
|
original_image, |
|
epsilon, |
|
np.inf, |
|
y=original_label, |
|
targeted=False, |
|
) |
|
X_adv.append(np.array(adv_example_targeted_label).reshape(32, 32, 1)) |
|
X_adv = np.array(X_adv) |
|
|
|
return X_adv |
|
|
|
|
|
def simulate_train(s): |
|
|
|
for j, (train, val) in enumerate(kfold.split(X_train)): |
|
if j == s: |
|
print(s) |
|
model = wideresnet.create_wide_residual_network() |
|
model.compile( |
|
loss="categorical_crossentropy", optimizer=sgd, metrics=["acc"] |
|
) |
|
print("Finished compiling") |
|
x_train, y_train = X_train[train], Y_train[train] |
|
x_val, y_val = X_train[val], Y_train[val] |
|
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) |
|
train_dataset = train_dataset.batch(BS) |
|
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)) |
|
val_dataset = val_dataset.batch(BS) |
|
adversarial_training.train(model, train_dataset, val_dataset, epsilons) |
|
name = model_name + "_" + str(j) + ".h5" |
|
model.save_weights(name) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
data = hkl.load("data.hkl") |
|
X_train, X_test, Y_train, y_test = ( |
|
data["xtrain"], |
|
data["xtest"], |
|
data["ytrain"], |
|
data["ytest"], |
|
) |
|
epsilons = [i / 1000 for i in range(1, 33)] |
|
|
|
kfold = KFold(n_splits=10, random_state=42, shuffle=False) |
|
EPOCHS = 50 |
|
BS = 64 |
|
init = (32, 32, 1) |
|
sgd = SGD(lr=0.1, momentum=0.9) |
|
parameter = {"epochs": EPOCHS, "batch_size": BS, "optimizer": sgd} |
|
|
|
wideresnet = WideResidualNetwork( |
|
init, 0.0001, 0.9, nb_classes=4, N=2, k=1, dropout=0.0 |
|
) |
|
|
|
with Pool(10) as p: |
|
print(p.map(f, np.range(10))) |
|
|