|
"""The Projector class for visualization, serve as a helper module for evaluator and visualizer""" |
|
from abc import ABC, abstractmethod |
|
import os |
|
import json |
|
import numpy as np |
|
import torch |
|
|
|
class ProjectorAbstractClass(ABC): |
|
|
|
@abstractmethod |
|
def __init__(self, vis_model, content_path, *args, **kwargs): |
|
pass |
|
|
|
@abstractmethod |
|
def load(self, *args, **kwargs): |
|
pass |
|
|
|
@abstractmethod |
|
def batch_project(self, *args, **kwargs): |
|
pass |
|
|
|
@abstractmethod |
|
def individual_project(self, *args, **kwargs): |
|
pass |
|
|
|
@abstractmethod |
|
def batch_inverse(self, *args, **kwargs): |
|
pass |
|
|
|
@abstractmethod |
|
def individual_inverse(self, *args, **kwargs): |
|
pass |
|
|
|
class Projector(ProjectorAbstractClass): |
|
def __init__(self, vis_model, content_path, vis_model_name, device): |
|
self.vis_model = vis_model |
|
self.content_path = content_path |
|
self.vis_model_name = vis_model_name |
|
self.DEVICE = device |
|
|
|
def load(self, iteration): |
|
raise NotImplementedError |
|
|
|
def batch_project(self, iteration, data): |
|
self.load(iteration) |
|
embedding = self.vis_model.encoder(torch.from_numpy(data).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return embedding |
|
|
|
def individual_project(self, iteration, data): |
|
self.load(iteration) |
|
embedding = self.vis_model.encoder(torch.from_numpy(np.expand_dims(data, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return embedding.squeeze(axis=0) |
|
|
|
def batch_inverse(self, iteration, embedding): |
|
self.load(iteration) |
|
data = self.vis_model.decoder(torch.from_numpy(embedding).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return data |
|
|
|
def individual_inverse(self, iteration, embedding): |
|
self.load(iteration) |
|
data = self.vis_model.decoder(torch.from_numpy(np.expand_dims(embedding, axis=0)).to(dtype=torch.float32, device="cpu")).cpu().detach().numpy() |
|
return data.squeeze(axis=0) |
|
|
|
class DeepDebuggerProjector(Projector): |
|
def __init__(self, vis_model, content_path, vis_model_name, segments, device): |
|
super().__init__(vis_model, content_path, vis_model_name, device) |
|
self.segments = segments |
|
self.segments = segments |
|
self.current_range = (-1,-1) |
|
|
|
def load(self, iteration): |
|
|
|
init_e = self.segments[-1][1] |
|
if (iteration >= self.current_range[0] and iteration <self.current_range[1]) or (iteration == init_e and self.current_range[1] == init_e): |
|
print("Same range as current visualization model...") |
|
return |
|
|
|
for i in range(len(self.segments)): |
|
s = self.segments[i][0] |
|
e = self.segments[i][1] |
|
|
|
if (iteration >= s and iteration < e) or (iteration == init_e and e == init_e): |
|
idx = i |
|
break |
|
|
|
file_path = os.path.join(self.content_path, "Model", "{}_{}.pth".format(self.vis_model_name, idx)) |
|
save_model = torch.load(file_path, map_location="cpu") |
|
self.vis_model.load_state_dict(save_model["state_dict"]) |
|
self.vis_model.to(self.DEVICE) |
|
self.vis_model.eval() |
|
self.current_range = (s, e) |
|
print("Successfully load the visualization model for range ({},{})...".format(s,e)) |
|
|
|
|
|
class ALProjector(Projector): |
|
def __init__(self, vis_model, content_path, vis_model_name, device) -> None: |
|
super().__init__(vis_model, content_path,vis_model_name, device) |
|
self.current_range = None |
|
|
|
def load(self, iteration): |
|
file_path=os.path.join(self.content_path, "Model", "Iteration_{}".format(iteration), self.vis_model_name+".pth") |
|
|
|
save_model = torch.load(file_path, map_location=torch.device("cpu")) |
|
self.vis_model.load_state_dict(save_model["state_dict"]) |
|
self.vis_model.to(self.DEVICE) |
|
self.vis_model.eval() |
|
print("Successfully load the visualization model for Iteration {}...".format(iteration)) |
|
|
|
|
|
class DenseALProjector(DeepDebuggerProjector): |
|
def __init__(self, vis_model, content_path, vis_model_name, device) -> None: |
|
super().__init__(vis_model, content_path, vis_model_name, None, device) |
|
self.current_range = [-1,-1,-1] |
|
|
|
def load(self, iteration, epoch): |
|
|
|
curr_iteration, curr_s, curr_e = self.current_range |
|
segment_path = os.path.join(self.content_path, "Model", "Iteration_{}".format(iteration), "segments.json") |
|
with open(segment_path, "r") as f: |
|
segments = json.load(f) |
|
init_e = segments[-1][1] |
|
|
|
if iteration == curr_iteration: |
|
if (curr_e==init_e and epoch == curr_e) or (epoch >= curr_s and epoch < curr_e): |
|
print("Same range as current visualization model...") |
|
return |
|
|
|
for i in range(len(segments)): |
|
s = segments[i][0] |
|
e = segments[i][1] |
|
|
|
if (epoch >= s and epoch < e) or (e == init_e and epoch == e): |
|
idx = i |
|
break |
|
file_path = os.path.join(self.content_path, "Model", "Iteration_{}".format(iteration), "{}_{}.pth".format(self.vis_model_name, idx)) |
|
save_model = torch.load(file_path, map_location=self.DEVICE) |
|
self.vis_model.load_state_dict(save_model["state_dict"]) |
|
self.vis_model.to(self.DEVICE) |
|
self.vis_model.eval() |
|
self.current_range = (iteration, s, e) |
|
print("Successfully load the visualization model in iteration {} for range ({},{}]...".format(iteration, s,e)) |
|
|
|
def batch_project(self, iteration, epoch, data): |
|
self.load(iteration, epoch) |
|
embedding = self.vis_model.encoder(torch.from_numpy(data).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return embedding |
|
|
|
def individual_project(self, iteration, epoch, data): |
|
self.load(iteration, epoch) |
|
embedding = self.vis_model.encoder(torch.from_numpy(np.expand_dims(data, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return embedding.squeeze(axis=0) |
|
|
|
def batch_inverse(self, iteration, epoch, embedding): |
|
self.load(iteration, epoch) |
|
data = self.vis_model.decoder(torch.from_numpy(embedding).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return data |
|
|
|
def individual_inverse(self, iteration, epoch, embedding): |
|
self.load(iteration, epoch) |
|
data = self.vis_model.decoder(torch.from_numpy(np.expand_dims(embedding, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return data.squeeze(axis=0) |
|
|
|
|
|
class EvalProjector(DeepDebuggerProjector): |
|
def __init__(self, vis_model, content_path, vis_model_name, device, exp) -> None: |
|
super().__init__(vis_model, content_path, vis_model_name, None, device) |
|
self.exp = exp |
|
file_path = os.path.join(content_path, "Model", "{}".format(exp), "segments.json") |
|
with open(file_path, "r") as f: |
|
self.segments = json.load(f) |
|
|
|
def load(self, iteration): |
|
|
|
|
|
init_s = self.segments[0][0] |
|
if (iteration > self.current_range[0] and iteration <=self.current_range[1]) or (iteration == init_s and self.current_range[0] == init_s): |
|
print("Same range as current visualization model...") |
|
return |
|
|
|
for i in range(len(self.segments)): |
|
s = self.segments[i][0] |
|
e = self.segments[i][1] |
|
|
|
if (iteration > s and iteration <= e) or (iteration == init_s and s == init_s): |
|
idx = i |
|
break |
|
file_path = os.path.join(self.content_path, "Model", "{}".format(self.exp), "tnn_hybrid_{}.pth".format(idx)) |
|
save_model = torch.load(file_path, map_location="cpu") |
|
self.vis_model.load_state_dict(save_model["state_dict"]) |
|
self.vis_model.to(self.DEVICE) |
|
self.vis_model.eval() |
|
self.current_range = (s, e) |
|
print("Successfully load the visualization model for range ({},{})...".format(s,e)) |
|
|
|
|
|
class DVIProjector(Projector): |
|
def __init__(self, vis_model, content_path, vis_model_name, device) -> None: |
|
super().__init__(vis_model, content_path, vis_model_name, device) |
|
|
|
def load(self, iteration): |
|
file_path = os.path.join(self.content_path, "Model", "Epoch_{}".format(iteration), "{}.pth".format(self.vis_model_name)) |
|
save_model = torch.load(file_path, map_location="cpu") |
|
self.vis_model.load_state_dict(save_model["state_dict"]) |
|
self.vis_model.to(self.DEVICE) |
|
self.vis_model.eval() |
|
print("Successfully load the DVI visualization model for iteration {}".format(iteration)) |
|
|
|
|
|
class TimeVisProjector(Projector): |
|
def __init__(self, vis_model, content_path, vis_model_name, device, verbose=0) -> None: |
|
super().__init__(vis_model, content_path, vis_model_name, device) |
|
self.verbose = verbose |
|
|
|
def load(self, iteration): |
|
file_path = os.path.join(self.content_path, "Model", "{}.pth".format(self.vis_model_name)) |
|
save_model = torch.load(file_path, map_location="cpu") |
|
self.vis_model.load_state_dict(save_model["state_dict"]) |
|
self.vis_model.to(self.DEVICE) |
|
self.vis_model.eval() |
|
if self.verbose>0: |
|
print("Successfully load the TimeVis visualization model for iteration {}".format(iteration)) |
|
|
|
|
|
class TimeVisDenseALProjector(Projector): |
|
def __init__(self, vis_model, content_path, vis_model_name, device, verbose=0) -> None: |
|
super().__init__(vis_model, content_path, vis_model_name, device) |
|
self.verbose = verbose |
|
self.curr_iteration = -1 |
|
|
|
def load(self, iteration, epoch): |
|
if iteration == self.curr_iteration: |
|
return |
|
file_path = os.path.join(self.content_path, "Model", f'Iteration_{iteration}', "{}.pth".format(self.vis_model_name)) |
|
save_model = torch.load(file_path, map_location="cpu") |
|
self.vis_model.load_state_dict(save_model["state_dict"]) |
|
self.vis_model.to(self.DEVICE) |
|
self.vis_model.eval() |
|
if self.verbose>0: |
|
print("Successfully load the TimeVis visualization model for iteration {}".format(iteration)) |
|
self.curr_iteration = iteration |
|
|
|
|
|
def batch_project(self, iteration, epoch, data): |
|
self.load(iteration, epoch) |
|
embedding = self.vis_model.encoder(torch.from_numpy(data).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return embedding |
|
|
|
def individual_project(self, iteration, epoch, data): |
|
self.load(iteration, epoch) |
|
embedding = self.vis_model.encoder(torch.from_numpy(np.expand_dims(data, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return embedding.squeeze(axis=0) |
|
|
|
def batch_inverse(self, iteration, epoch, embedding): |
|
self.load(iteration, epoch) |
|
data = self.vis_model.decoder(torch.from_numpy(embedding).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return data |
|
|
|
def individual_inverse(self, iteration, epoch, embedding): |
|
self.load(iteration, epoch) |
|
data = self.vis_model.decoder(torch.from_numpy(np.expand_dims(embedding, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() |
|
return data.squeeze(axis=0) |
|
|
|
|
|
import tensorflow as tf |
|
class tfDVIProjector(ProjectorAbstractClass): |
|
def __init__(self, content_path, flag, verbose=0): |
|
self.content_path = content_path |
|
self.model_path = os.path.join(self.content_path, "Model") |
|
self.flag = flag |
|
self.curr_iteration = -1 |
|
self.encoder = None |
|
self.decoder = None |
|
self.verbose = verbose |
|
|
|
def load(self, epoch): |
|
if self.curr_iteration == epoch: |
|
print("Current autocoder model loaded from Epoch {}".format(epoch)) |
|
return |
|
encoder_location = os.path.join(self.model_path, "Epoch_{:d}".format(epoch),"encoder" + self.flag) |
|
decoder_location = os.path.join(self.model_path, "Epoch_{:d}".format(epoch),"decoder" + self.flag) |
|
try: |
|
self.encoder = tf.keras.models.load_model(encoder_location) |
|
self.decoder = tf.keras.models.load_model(decoder_location) |
|
if self.verbose>0: |
|
print("Keras autocoder model loaded from Epoch {}".format(epoch)) |
|
self.curr_iteration = epoch |
|
except FileNotFoundError: |
|
print("Error! Projection function has not been initialized! Pls first visualize all.") |
|
|
|
def batch_project(self, epoch, data): |
|
''' |
|
batch project data to 2D space |
|
:param data: numpy.ndarray |
|
:param epoch: int |
|
:return: embedding numpy.ndarray |
|
''' |
|
self.load(epoch) |
|
embedding = self.encoder(data).cpu().numpy() |
|
return embedding |
|
|
|
def individual_project(self, epoch, data): |
|
''' |
|
project a data to 2D space |
|
:param data: numpy.ndarray |
|
:param epoch: int |
|
:return: embedding numpy.ndarray |
|
''' |
|
self.load(epoch) |
|
data = np.expand_dims(data, axis=0) |
|
embedding = self.encoder(data).cpu().numpy() |
|
return embedding.squeeze(0) |
|
|
|
def batch_inverse(self, epoch, data): |
|
""" |
|
map 2D points back into high dimensional space |
|
:param data: ndarray, (n, 2) |
|
:param epoch: num of epoch |
|
:return: high dim representation, numpy.ndarray |
|
""" |
|
self.load(epoch) |
|
representation_data = self.decoder(data).cpu().numpy() |
|
return representation_data |
|
|
|
def individual_inverse(self, epoch, data): |
|
""" |
|
map a 2D point back into high dimensional space |
|
:param data: ndarray, (1, 2) |
|
:param epoch: num of epoch |
|
:return: high dim representation, numpy.ndarray |
|
""" |
|
self.load(epoch) |
|
data = np.expand_dims(data, axis=0) |
|
representation_data = self.decoder(data).cpu().numpy() |
|
return representation_data.squeeze(0) |
|
|
|
|
|
class tfDVIDenseALProjector(ProjectorAbstractClass): |
|
def __init__(self, content_path, flag, verbose=0): |
|
self.content_path = content_path |
|
self.model_path = os.path.join(self.content_path, "Model") |
|
self.flag = flag |
|
self.curr_iteration = -1 |
|
self.curr_epoch = -1 |
|
self.encoder = None |
|
self.decoder = None |
|
self.verbose = verbose |
|
|
|
def load(self, iteration, epoch): |
|
if self.curr_iteration == iteration and self.curr_epoch == epoch: |
|
print("Current autocoder model loaded from Iteration {}/Epoch {}".format(iteration, epoch)) |
|
return |
|
encoder_location = os.path.join(self.model_path, "Iteration_{:d}".format(iteration), "Epoch_{:d}".format(epoch),"encoder" + self.flag) |
|
decoder_location = os.path.join(self.model_path, "Iteration_{:d}".format(iteration), "Epoch_{:d}".format(epoch),"decoder" + self.flag) |
|
try: |
|
self.encoder = tf.keras.models.load_model(encoder_location) |
|
self.decoder = tf.keras.models.load_model(decoder_location) |
|
if self.verbose>0: |
|
print("Keras autocoder model loaded from Epoch {}".format(epoch)) |
|
self.curr_iteration = iteration |
|
self.curr_epoch = epoch |
|
except FileNotFoundError: |
|
print("Error! Projection function has not been initialized! Pls first visualize all.") |
|
|
|
def batch_project(self, iteration, epoch, data): |
|
''' |
|
batch project data to 2D space |
|
:param data: numpy.ndarray |
|
:param epoch: int |
|
:return: embedding numpy.ndarray |
|
''' |
|
self.load(iteration, epoch) |
|
embedding = self.encoder(data).cpu().numpy() |
|
return embedding |
|
|
|
def individual_project(self, iteration, epoch, data): |
|
''' |
|
project a data to 2D space |
|
:param data: numpy.ndarray |
|
:param epoch: int |
|
:return: embedding numpy.ndarray |
|
''' |
|
self.load(iteration, epoch) |
|
data = np.expand_dims(data, axis=0) |
|
embedding = self.encoder(data).cpu().numpy() |
|
return embedding.squeeze(0) |
|
|
|
def batch_inverse(self, iteration, epoch, data): |
|
""" |
|
map 2D points back into high dimensional space |
|
:param data: ndarray, (n, 2) |
|
:param epoch: num of epoch |
|
:return: high dim representation, numpy.ndarray |
|
""" |
|
self.load(iteration, epoch) |
|
representation_data = self.decoder(data).cpu().numpy() |
|
return representation_data |
|
|
|
def individual_inverse(self, iteration, epoch, data): |
|
""" |
|
map a 2D point back into high dimensional space |
|
:param data: ndarray, (1, 2) |
|
:param epoch: num of epoch |
|
:return: high dim representation, numpy.ndarray |
|
""" |
|
self.load(iteration, epoch) |
|
data = np.expand_dims(data, axis=0) |
|
representation_data = self.decoder(data).cpu().numpy() |
|
return representation_data.squeeze(0) |