|
from torch import nn |
|
|
|
|
|
class SingleVisualizationModel(nn.Module): |
|
def __init__(self, input_dims, output_dims, units, hidden_layer=3): |
|
super(SingleVisualizationModel, self).__init__() |
|
|
|
self.input_dims = input_dims |
|
self.output_dims = output_dims |
|
self.units = units |
|
self.hidden_layer = hidden_layer |
|
self._init_autoencoder() |
|
|
|
|
|
def _init_autoencoder(self): |
|
self.encoder = nn.Sequential( |
|
nn.Linear(self.input_dims, self.units), |
|
nn.ReLU(True)) |
|
for h in range(self.hidden_layer): |
|
self.encoder.add_module("{}".format(2*h+2), nn.Linear(self.units, self.units)) |
|
self.encoder.add_module("{}".format(2*h+3), nn.ReLU(True)) |
|
self.encoder.add_module("{}".format(2*(self.hidden_layer+1)), nn.Linear(self.units, self.output_dims)) |
|
|
|
self.decoder = nn.Sequential( |
|
nn.Linear(self.output_dims, self.units), |
|
nn.ReLU(True)) |
|
for h in range(self.hidden_layer): |
|
self.decoder.add_module("{}".format(2*h+2), nn.Linear(self.units, self.units)) |
|
self.decoder.add_module("{}".format(2*h+3), nn.ReLU(True)) |
|
self.decoder.add_module("{}".format(2*(self.hidden_layer+1)), nn.Linear(self.units, self.input_dims)) |
|
|
|
def forward(self, edge_to, edge_from): |
|
outputs = dict() |
|
embedding_to = self.encoder(edge_to) |
|
embedding_from = self.encoder(edge_from) |
|
recon_to = self.decoder(embedding_to) |
|
recon_from = self.decoder(embedding_from) |
|
|
|
outputs["umap"] = (embedding_to, embedding_from) |
|
outputs["recon"] = (recon_to, recon_from) |
|
|
|
return outputs |
|
|
|
class VisModel(nn.Module): |
|
"""define you own visualizatio model by specifying the structure |
|
|
|
""" |
|
def __init__(self, encoder_dims, decoder_dims): |
|
"""define you own visualizatio model by specifying the structure |
|
|
|
Parameters |
|
---------- |
|
encoder_dims : list of int |
|
the neuron number of your encoder |
|
for example, [100,50,2], denote two fully connect layers, with shape (100,50) and (50,2) |
|
decoder_dims : list of int |
|
same as encoder_dims |
|
""" |
|
super(VisModel, self).__init__() |
|
assert len(encoder_dims) > 1 |
|
assert len(decoder_dims) > 1 |
|
self.encoder_dims = encoder_dims |
|
self.decoder_dims = decoder_dims |
|
self._init_autoencoder() |
|
|
|
def _init_autoencoder(self): |
|
self.encoder = nn.Sequential() |
|
for i in range(0, len(self.encoder_dims)-2): |
|
self.encoder.add_module("{}".format(len(self.encoder)), nn.Linear(self.encoder_dims[i], self.encoder_dims[i+1])) |
|
self.encoder.add_module("{}".format(len(self.encoder)), nn.ReLU(True)) |
|
self.encoder.add_module("{}".format(len(self.encoder)), nn.Linear(self.encoder_dims[-2], self.encoder_dims[-1])) |
|
|
|
self.decoder = nn.Sequential() |
|
for i in range(0, len(self.decoder_dims)-2): |
|
self.decoder.add_module("{}".format(len(self.decoder)), nn.Linear(self.decoder_dims[i], self.decoder_dims[i+1])) |
|
self.decoder.add_module("{}".format(len(self.decoder)), nn.ReLU(True)) |
|
self.decoder.add_module("{}".format(len(self.decoder)), nn.Linear(self.decoder_dims[-2], self.decoder_dims[-1])) |
|
|
|
|
|
def forward(self, edge_to, edge_from): |
|
outputs = dict() |
|
embedding_to = self.encoder(edge_to) |
|
embedding_from = self.encoder(edge_from) |
|
recon_to = self.decoder(embedding_to) |
|
recon_from = self.decoder(embedding_from) |
|
|
|
outputs["umap"] = (embedding_to, embedding_from) |
|
outputs["recon"] = (recon_to, recon_from) |
|
|
|
return outputs |
|
|
|
|
|
''' |
|
The visualization model definition class |
|
''' |
|
import tensorflow as tf |
|
from tensorflow import keras |
|
class tfModel(keras.Model): |
|
def __init__(self, optimizer, loss, loss_weights, encoder_dims, decoder_dims, batch_size, withoutB=True, attention=True, prev_trainable_variables=None): |
|
|
|
super(tfModel, self).__init__() |
|
self._init_autoencoder(encoder_dims, decoder_dims) |
|
self.optimizer = optimizer |
|
self.withoutB = withoutB |
|
self.attention = attention |
|
|
|
self.loss = loss |
|
self.loss_weights = loss_weights |
|
|
|
self.prev_trainable_variables = prev_trainable_variables |
|
self.batch_size = batch_size |
|
|
|
def _init_autoencoder(self, encoder_dims, decoder_dims): |
|
self.encoder = tf.keras.Sequential([ |
|
tf.keras.layers.InputLayer(input_shape=(encoder_dims[0],)), |
|
tf.keras.layers.Flatten(), |
|
]) |
|
for i in range(1, len(encoder_dims)-1, 1): |
|
self.encoder.add(tf.keras.layers.Dense(units=encoder_dims[i], activation="relu")) |
|
self.encoder.add(tf.keras.layers.Dense(units=encoder_dims[-1]),) |
|
|
|
self.decoder = tf.keras.Sequential([ |
|
tf.keras.layers.InputLayer(input_shape=(decoder_dims[0],)), |
|
]) |
|
for i in range(1, len(decoder_dims)-1, 1): |
|
self.decoder.add(tf.keras.layers.Dense(units=decoder_dims[i], activation="relu")) |
|
self.decoder.add(tf.keras.layers.Dense(units=decoder_dims[-1])) |
|
print(self.encoder.summary()) |
|
print(self.decoder.summary()) |
|
|
|
def train_step(self, x): |
|
|
|
to_x, from_x, to_alpha, from_alpha, n_rate, weight = x[0] |
|
to_x = tf.cast(to_x, dtype=tf.float32) |
|
from_x = tf.cast(from_x, dtype=tf.float32) |
|
to_alpha = tf.cast(to_alpha, dtype=tf.float32) |
|
from_alpha = tf.cast(from_alpha, dtype=tf.float32) |
|
n_rate = tf.cast(n_rate, dtype=tf.float32) |
|
weight = tf.cast(weight, dtype=tf.float32) |
|
|
|
|
|
with tf.GradientTape(persistent=True) as tape: |
|
|
|
|
|
embedding_to = self.encoder(to_x) |
|
embedding_from = self.encoder(from_x) |
|
embedding_to_recon = self.decoder(embedding_to) |
|
embedding_from_recon = self.decoder(embedding_from) |
|
|
|
|
|
embedding_to_from = tf.concat((embedding_to, embedding_from, weight), |
|
axis=1) |
|
|
|
if self.attention: |
|
reconstruct_loss = self.loss["reconstruction"](to_x, from_x, embedding_to_recon, embedding_from_recon,to_alpha, from_alpha) |
|
else: |
|
self.loss["reconstruction"] = tf.keras.losses.MeanSquaredError() |
|
reconstruct_loss = self.loss["reconstruction"](y_true=to_x, y_pred=embedding_to_recon)/2 + self.loss["reconstruction"](y_true=from_x, y_pred=embedding_from_recon)/2 |
|
|
|
|
|
umap_loss = self.loss["umap"](None, embed_to_from=embedding_to_from) |
|
|
|
|
|
alpha_mean = tf.cast(tf.reduce_mean(tf.stop_gradient(n_rate)), dtype=tf.float32) |
|
|
|
|
|
if self.prev_trainable_variables is None: |
|
prev_trainable_variables = [tf.stop_gradient(x) for x in self.trainable_variables] |
|
else: |
|
prev_trainable_variables = self.prev_trainable_variables |
|
regularization_loss = self.loss["regularization"](w_prev=prev_trainable_variables,w_current=self.trainable_variables, to_alpha=alpha_mean) |
|
|
|
|
|
loss = tf.add(tf.add(tf.math.multiply(tf.constant(self.loss_weights["reconstruction"]), reconstruct_loss), |
|
tf.math.multiply(tf.constant(self.loss_weights["umap"]), umap_loss)), |
|
tf.math.multiply(tf.constant(self.loss_weights["regularization"]), regularization_loss)) |
|
|
|
|
|
trainable_vars = self.trainable_variables |
|
grads = tape.gradient(loss, trainable_vars) |
|
|
|
|
|
self.optimizer.apply_gradients(zip(grads, trainable_vars)) |
|
|
|
return {"loss": loss, "umap": umap_loss, "reconstruction": reconstruct_loss, |
|
"regularization": regularization_loss} |
|
|
|
|
|
|