File size: 13,799 Bytes
76cdfb8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
import os
from itertools import compress
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from keras.layers import Dense, Dropout, LeakyReLU
from keras.models import Sequential
from keras.optimizers import Adam
from numpy.random import randn
from sklearn.decomposition import PCA
from tqdm import tqdm
class GAN(object):
def __init__(self, number_of_features, saved_models_path, learning_rate, alpha_relu, dropout, loss, activation):
"""
A constructor for the GAN class
:param number_of_features: number of features
:param saved_models_path: the output folder path
"""
self.saved_models_path = saved_models_path
self.number_of_features = number_of_features
self.generator_model = None
self.noise_dim = None
self.discriminator_model = None
self.learning_rate = learning_rate
self.gan_model = None
self.activation = activation
self.alpha_relu = alpha_relu
self.loss = loss
self.dropout = dropout
self.number_of_features = number_of_features
self.build_generator() # build the generator
self.build_discriminator() # build the discriminator
self.build_gan() # build the GAN
def build_generator(self):
"""
This function creates the generator model
:return:
"""
noise_size = int(self.number_of_features / 2)
self.noise_dim = (noise_size,) # size of the noise space
self.generator_model = Sequential()
self.generator_model.add(Dense(int(self.number_of_features * 2), input_shape=self.noise_dim))
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.generator_model.add(Dense(int(self.number_of_features * 4)))
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.generator_model.add(Dropout(self.dropout))
self.generator_model.add(Dense(int(self.number_of_features * 2)))
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.generator_model.add(Dropout(self.dropout))
# Compile it
self.generator_model.add(Dense(self.number_of_features, activation=self.activation))
self.generator_model.summary()
def build_discriminator(self):
"""
Create discriminator model
:return:
"""
self.discriminator_model = Sequential()
self.discriminator_model.add(Dense(self.number_of_features * 2, input_shape=(self.number_of_features,)))
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.discriminator_model.add(Dense(self.number_of_features * 4))
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.discriminator_model.add(Dropout(self.dropout))
self.discriminator_model.add(Dense(self.number_of_features * 2))
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.discriminator_model.add(Dropout(self.dropout))
# Compile it
self.discriminator_model.add(Dense(1, activation=self.activation))
optimizer = Adam(lr=self.learning_rate)
self.discriminator_model.compile(loss=self.loss, optimizer=optimizer)
self.discriminator_model.summary()
def build_gan(self):
"""
Create the GAN network
:return: the GAN model object
"""
self.gan_model = Sequential()
self.discriminator_model.trainable = False
# The following lines connect the generator and discriminator models to the GAN.
self.gan_model.add(self.generator_model)
self.gan_model.add(self.discriminator_model)
# Compile it
optimizer = Adam(lr=self.learning_rate)
self.gan_model.compile(loss=self.loss, optimizer=optimizer)
return self.gan_model
def train(self, scaled_data, epochs, batch_size, to_plot_losses, model_name):
"""
This function trains the generator and discriminator outputs
:param model_name:
:param to_plot_losses: whether or not to plot history
:param scaled_data: the data after min max scaling
:param epochs: number of epochs
:param batch_size: the batch size
:return: losses_list: returns the losses dictionary the generator or discriminator outputs
"""
dis_output, gen_output, prev_output = self.check_for_existed_output(model_name)
if prev_output:
return -1, -1
losses_output = os.path.join(self.saved_models_path, f'{model_name}_losses.png')
discriminator_loss = []
generator_loss = []
# We need to use half of the batch size for the fake data and half for the real one
half_batch_size = int(batch_size / 2)
iterations = int(len(scaled_data) / half_batch_size)
iterations = iterations + 1 if len(scaled_data) % batch_size != 0 else iterations
for epoch in range(1, epochs + 1): # iterates over the epochs
np.random.shuffle(scaled_data)
p_bar = tqdm(range(iterations), ascii=True)
for iteration in p_bar:
dis_loss, gen_loss = self.train_models(batch_size=batch_size, half_batch_size=half_batch_size,
index=iteration, scaled_data=scaled_data)
discriminator_loss.append(dis_loss)
generator_loss.append(gen_loss)
p_bar.set_description(
f"Epoch ({epoch}/{epochs}) | DISCRIMINATOR LOSS: {dis_loss:.2f} | GENERATOR LOSS: {gen_loss:.2f} |")
# Save weights for future use
self.discriminator_model.save_weights(dis_output)
self.generator_model.save_weights(gen_output)
# Plot losses
if to_plot_losses:
self.plot_losses(discriminator_loss=discriminator_loss, generator_loss=generator_loss,
losses_output=losses_output)
return generator_loss[-1], discriminator_loss[-1]
def check_for_existed_output(self, model_name) -> (str, str, bool):
"""
This function checks for existed output
:param model_name: model's name
:return:
"""
prev_output = False
dis_output = os.path.join(self.saved_models_path, f'{model_name}_dis_weights.h5')
gen_output = os.path.join(self.saved_models_path, f'{model_name}_gen_weights.h5')
if os.path.exists(dis_output) and os.path.exists(gen_output):
print("The model was trained in the past")
self.discriminator_model.load_weights(dis_output)
self.generator_model.load_weights(gen_output)
prev_output = True
return dis_output, gen_output, prev_output
def train_models(self, batch_size, half_batch_size, index, scaled_data):
"""
This function trains the discriminator and the generator
:param batch_size: batch size
:param half_batch_size: half of the batch size
:param index:
:param scaled_data:
:return:
"""
self.discriminator_model.trainable = True
# Create a batch of real data and train the model
x_real, y_real = self.get_real_samples(data=scaled_data, batch_size=half_batch_size, index=index)
d_real_loss = self.discriminator_model.train_on_batch(x_real, y_real)
# Create a batch of fake data and train the model
x_fake, y_fake = self.create_fake_samples(batch_size=half_batch_size)
d_fake_loss = self.discriminator_model.train_on_batch(x_fake, y_fake)
avg_dis_loss = 0.5 * (d_real_loss + d_fake_loss)
# Create noise for the generator model
noise = randn(self.noise_dim[0] * batch_size).reshape((batch_size, self.noise_dim[0]))
self.discriminator_model.trainable = False
gen_loss = self.gan_model.train_on_batch(noise, np.ones((batch_size, 1)))
return avg_dis_loss, gen_loss
@staticmethod
def get_real_samples(data, batch_size, index):
"""
Generate batch_size of real samples with class labels
:param data: the original data
:param batch_size: batch size
:param index: the index of the batch
:return: x: real samples, y: labels
"""
start_index = batch_size * index
end_index = start_index + batch_size
x = data[start_index: end_index]
return x, np.ones((len(x), 1))
def create_fake_samples(self, batch_size):
"""
Use the generator to generate n fake examples, with class labels
:param batch_size: batch size
:return:
"""
noise = randn(self.noise_dim[0] * batch_size).reshape((batch_size, self.noise_dim[0]))
x = self.generator_model.predict(noise) # create fake samples using the generator
return x, np.zeros((len(x), 1))
@staticmethod
def plot_losses(discriminator_loss, generator_loss, losses_output):
"""
Plot training loss values
:param generator_loss:
:param discriminator_loss:
:param losses_output:
:return:
"""
plt.plot(discriminator_loss)
plt.plot(generator_loss)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('Discriminator and Generator Losses')
plt.legend(['Discriminator Loss', 'Generator Loss'])
plt.savefig(losses_output)
@staticmethod
def return_minimum_euclidean_distance(scaled_data, x):
"""
This function returns the
:param scaled_data: the original data
:param x: a record we want to compare with
:return: the minimum distance and the index of the minimum value
"""
s = np.power(np.power((scaled_data - np.array(x)), 2).sum(1), 0.5)
return pd.Series([s[s.argmin()], s.argmin()])
def test(self, scaled_data, sample_num, pca_output):
"""
This function tests the model
:param scaled_data: the original scaled data
:param sample_num: number of samples to generate
:param pca_output: the output of PCA
:return:
"""
x_fake, y_fake = self.create_fake_samples(batch_size=sample_num)
fake_pred = self.discriminator_model.predict(x_fake)
# Filter data to different matrices
dis_fooled_scaled = np.asarray(list(compress(x_fake, fake_pred > 0.5)))
dis_not_fooled_scaled = np.asarray(list(compress(x_fake, fake_pred <= 0.5)))
# ------------- Euclidean -------------
mean_min_distance_fooled, mean_min_distance_not_fooled = (-1, -1)
if len(dis_fooled_scaled) > 0 and len(dis_not_fooled_scaled) > 0:
mean_min_distance_fooled = self.get_mean_distance_score(scaled_data, dis_fooled_scaled)
print(f'The mean minimum distance for fooled samples is {mean_min_distance_fooled}')
mean_min_distance_not_fooled = self.get_mean_distance_score(scaled_data, dis_not_fooled_scaled)
print(f'The mean minimum distance for not fooled samples is {mean_min_distance_not_fooled}')
else:
print(f'The fooled xor the not Fooled data frames is empty')
# ------------- PCA --------------
data_pca_df = self.get_pca_df(scaled_data, 'original')
dis_fooled_pca_df = self.get_pca_df(dis_fooled_scaled, 'fooled')
dis_not_fooled_pca_df = self.get_pca_df(dis_not_fooled_scaled, 'not fooled')
pca_frames = [data_pca_df, dis_fooled_pca_df, dis_not_fooled_pca_df]
pca_result = pd.concat(pca_frames)
self.plot_pca(pca_result, pca_output)
return dis_fooled_scaled, dis_not_fooled_scaled, mean_min_distance_fooled, mean_min_distance_not_fooled
def get_mean_distance_score(self, scaled_data, dis_scaled):
"""
This function returns the mean distance score for the given dataframe
:param scaled_data: the original data
:param dis_scaled: a dataframe
:return:
"""
dis_fooled_scaled_ecu = pd.DataFrame(dis_scaled)
dis_fooled_scaled_ecu[['min_distance', 'similar_i']] = dis_fooled_scaled_ecu.apply(
lambda x: self.return_minimum_euclidean_distance(scaled_data, x), axis=1)
mean_min_distance_fooled = dis_fooled_scaled_ecu['min_distance'].mean()
return mean_min_distance_fooled
@staticmethod
def plot_pca(pca_result, pca_output):
"""
This function plots the PCA figure
:param pca_result: dataframe with all the results
:param pca_output: output path
:return:
"""
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel('Principal Component 1', fontsize=15)
ax.set_ylabel('Principal Component 2', fontsize=15)
ax.set_title('PCA With Two Components', fontsize=20)
targets = ['original', 'fooled', 'not fooled']
colors = ['r', 'g', 'b']
for target, color in zip(targets, colors):
indices_to_keep = pca_result['name'] == target
ax.scatter(pca_result.loc[indices_to_keep, 'comp1'], pca_result.loc[indices_to_keep, 'comp2'],
c=color, s=50)
ax.legend(targets)
ax.grid()
plt.savefig(pca_output)
@staticmethod
def get_pca_df(scaled_data, data_name):
"""
This function creates the PCA dataframe
:param scaled_data: the original data
:param data_name: the name of the column
:return:
"""
pca = PCA(n_components=2)
principal_components = pca.fit_transform(scaled_data)
principal_df = pd.DataFrame(data=principal_components, columns=['comp1', 'comp2'])
principal_df['name'] = data_name
return principal_df
|