"""The Projector class for visualization, serve as a helper module for evaluator and visualizer""" from abc import ABC, abstractmethod import os import json import numpy as np import torch class ProjectorAbstractClass(ABC): @abstractmethod def __init__(self, vis_model, content_path, *args, **kwargs): pass @abstractmethod def load(self, *args, **kwargs): pass @abstractmethod def batch_project(self, *args, **kwargs): pass @abstractmethod def individual_project(self, *args, **kwargs): pass @abstractmethod def batch_inverse(self, *args, **kwargs): pass @abstractmethod def individual_inverse(self, *args, **kwargs): pass class Projector(ProjectorAbstractClass): def __init__(self, vis_model, content_path, vis_model_name, device): self.vis_model = vis_model self.content_path = content_path self.vis_model_name = vis_model_name self.DEVICE = device def load(self, iteration): raise NotImplementedError def batch_project(self, iteration, data): self.load(iteration) embedding = self.vis_model.encoder(torch.from_numpy(data).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return embedding def individual_project(self, iteration, data): self.load(iteration) embedding = self.vis_model.encoder(torch.from_numpy(np.expand_dims(data, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return embedding.squeeze(axis=0) def batch_inverse(self, iteration, embedding): self.load(iteration) data = self.vis_model.decoder(torch.from_numpy(embedding).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return data def individual_inverse(self, iteration, embedding): self.load(iteration) data = self.vis_model.decoder(torch.from_numpy(np.expand_dims(embedding, axis=0)).to(dtype=torch.float32, device="cpu")).cpu().detach().numpy() return data.squeeze(axis=0) class DeepDebuggerProjector(Projector): def __init__(self, vis_model, content_path, vis_model_name, segments, device): super().__init__(vis_model, content_path, vis_model_name, device) self.segments = segments self.segments = segments #[(1,6),(6, 15),(15,42),(42,200)] self.current_range = (-1,-1) def load(self, iteration): # [s,e) init_e = self.segments[-1][1] if (iteration >= self.current_range[0] and iteration = s and iteration < e) or (iteration == init_e and e == init_e): idx = i break # TODO vis model name as a hyperparameter file_path = os.path.join(self.content_path, "Model", "{}_{}.pth".format(self.vis_model_name, idx)) save_model = torch.load(file_path, map_location="cpu") self.vis_model.load_state_dict(save_model["state_dict"]) self.vis_model.to(self.DEVICE) self.vis_model.eval() self.current_range = (s, e) print("Successfully load the visualization model for range ({},{})...".format(s,e)) class ALProjector(Projector): def __init__(self, vis_model, content_path, vis_model_name, device) -> None: super().__init__(vis_model, content_path,vis_model_name, device) self.current_range = None def load(self, iteration): file_path=os.path.join(self.content_path, "Model", "Iteration_{}".format(iteration), self.vis_model_name+".pth") save_model = torch.load(file_path, map_location=torch.device("cpu")) self.vis_model.load_state_dict(save_model["state_dict"]) self.vis_model.to(self.DEVICE) self.vis_model.eval() print("Successfully load the visualization model for Iteration {}...".format(iteration)) class DenseALProjector(DeepDebuggerProjector): def __init__(self, vis_model, content_path, vis_model_name, device) -> None: super().__init__(vis_model, content_path, vis_model_name, None, device) self.current_range = [-1,-1,-1] # iteration, e_s, e_e def load(self, iteration, epoch): # [s,e) curr_iteration, curr_s, curr_e = self.current_range segment_path = os.path.join(self.content_path, "Model", "Iteration_{}".format(iteration), "segments.json") with open(segment_path, "r") as f: segments = json.load(f) init_e = segments[-1][1] # [s,e) if iteration == curr_iteration: if (curr_e==init_e and epoch == curr_e) or (epoch >= curr_s and epoch < curr_e): print("Same range as current visualization model...") return for i in range(len(segments)): s = segments[i][0] e = segments[i][1] # range [s, e) if (epoch >= s and epoch < e) or (e == init_e and epoch == e): idx = i break file_path = os.path.join(self.content_path, "Model", "Iteration_{}".format(iteration), "{}_{}.pth".format(self.vis_model_name, idx)) save_model = torch.load(file_path, map_location=self.DEVICE) self.vis_model.load_state_dict(save_model["state_dict"]) self.vis_model.to(self.DEVICE) self.vis_model.eval() self.current_range = (iteration, s, e) print("Successfully load the visualization model in iteration {} for range ({},{}]...".format(iteration, s,e)) def batch_project(self, iteration, epoch, data): self.load(iteration, epoch) embedding = self.vis_model.encoder(torch.from_numpy(data).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return embedding def individual_project(self, iteration, epoch, data): self.load(iteration, epoch) embedding = self.vis_model.encoder(torch.from_numpy(np.expand_dims(data, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return embedding.squeeze(axis=0) def batch_inverse(self, iteration, epoch, embedding): self.load(iteration, epoch) data = self.vis_model.decoder(torch.from_numpy(embedding).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return data def individual_inverse(self, iteration, epoch, embedding): self.load(iteration, epoch) data = self.vis_model.decoder(torch.from_numpy(np.expand_dims(embedding, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return data.squeeze(axis=0) class EvalProjector(DeepDebuggerProjector): def __init__(self, vis_model, content_path, vis_model_name, device, exp) -> None: super().__init__(vis_model, content_path, vis_model_name, None, device) self.exp = exp file_path = os.path.join(content_path, "Model", "{}".format(exp), "segments.json") with open(file_path, "r") as f: self.segments = json.load(f) def load(self, iteration): # (s, e] # (s,e] init_s = self.segments[0][0] if (iteration > self.current_range[0] and iteration <=self.current_range[1]) or (iteration == init_s and self.current_range[0] == init_s): print("Same range as current visualization model...") return # else for i in range(len(self.segments)): s = self.segments[i][0] e = self.segments[i][1] # range (s,e] if (iteration > s and iteration <= e) or (iteration == init_s and s == init_s): idx = i break file_path = os.path.join(self.content_path, "Model", "{}".format(self.exp), "tnn_hybrid_{}.pth".format(idx)) save_model = torch.load(file_path, map_location="cpu") self.vis_model.load_state_dict(save_model["state_dict"]) self.vis_model.to(self.DEVICE) self.vis_model.eval() self.current_range = (s, e) print("Successfully load the visualization model for range ({},{})...".format(s,e)) class DVIProjector(Projector): def __init__(self, vis_model, content_path, vis_model_name, device) -> None: super().__init__(vis_model, content_path, vis_model_name, device) def load(self, iteration): file_path = os.path.join(self.content_path, "Model", "Epoch_{}".format(iteration), "{}.pth".format(self.vis_model_name)) save_model = torch.load(file_path, map_location="cpu") self.vis_model.load_state_dict(save_model["state_dict"]) self.vis_model.to(self.DEVICE) self.vis_model.eval() print("Successfully load the DVI visualization model for iteration {}".format(iteration)) class TimeVisProjector(Projector): def __init__(self, vis_model, content_path, vis_model_name, device, verbose=0) -> None: super().__init__(vis_model, content_path, vis_model_name, device) self.verbose = verbose def load(self, iteration): file_path = os.path.join(self.content_path, "Model", "{}.pth".format(self.vis_model_name)) save_model = torch.load(file_path, map_location="cpu") self.vis_model.load_state_dict(save_model["state_dict"]) self.vis_model.to(self.DEVICE) self.vis_model.eval() if self.verbose>0: print("Successfully load the TimeVis visualization model for iteration {}".format(iteration)) class TimeVisDenseALProjector(Projector): def __init__(self, vis_model, content_path, vis_model_name, device, verbose=0) -> None: super().__init__(vis_model, content_path, vis_model_name, device) self.verbose = verbose self.curr_iteration = -1 def load(self, iteration, epoch): if iteration == self.curr_iteration: return file_path = os.path.join(self.content_path, "Model", f'Iteration_{iteration}', "{}.pth".format(self.vis_model_name)) save_model = torch.load(file_path, map_location="cpu") self.vis_model.load_state_dict(save_model["state_dict"]) self.vis_model.to(self.DEVICE) self.vis_model.eval() if self.verbose>0: print("Successfully load the TimeVis visualization model for iteration {}".format(iteration)) self.curr_iteration = iteration def batch_project(self, iteration, epoch, data): self.load(iteration, epoch) embedding = self.vis_model.encoder(torch.from_numpy(data).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return embedding def individual_project(self, iteration, epoch, data): self.load(iteration, epoch) embedding = self.vis_model.encoder(torch.from_numpy(np.expand_dims(data, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return embedding.squeeze(axis=0) def batch_inverse(self, iteration, epoch, embedding): self.load(iteration, epoch) data = self.vis_model.decoder(torch.from_numpy(embedding).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return data def individual_inverse(self, iteration, epoch, embedding): self.load(iteration, epoch) data = self.vis_model.decoder(torch.from_numpy(np.expand_dims(embedding, axis=0)).to(dtype=torch.float32, device=self.DEVICE)).cpu().detach().numpy() return data.squeeze(axis=0) import tensorflow as tf class tfDVIProjector(ProjectorAbstractClass): def __init__(self, content_path, flag, verbose=0): self.content_path = content_path self.model_path = os.path.join(self.content_path, "Model") self.flag = flag self.curr_iteration = -1 self.encoder = None self.decoder = None self.verbose = verbose def load(self, epoch): if self.curr_iteration == epoch: print("Current autocoder model loaded from Epoch {}".format(epoch)) return encoder_location = os.path.join(self.model_path, "Epoch_{:d}".format(epoch),"encoder" + self.flag) decoder_location = os.path.join(self.model_path, "Epoch_{:d}".format(epoch),"decoder" + self.flag) try: self.encoder = tf.keras.models.load_model(encoder_location) self.decoder = tf.keras.models.load_model(decoder_location) if self.verbose>0: print("Keras autocoder model loaded from Epoch {}".format(epoch)) self.curr_iteration = epoch except FileNotFoundError: print("Error! Projection function has not been initialized! Pls first visualize all.") def batch_project(self, epoch, data): ''' batch project data to 2D space :param data: numpy.ndarray :param epoch: int :return: embedding numpy.ndarray ''' self.load(epoch) embedding = self.encoder(data).cpu().numpy() return embedding def individual_project(self, epoch, data): ''' project a data to 2D space :param data: numpy.ndarray :param epoch: int :return: embedding numpy.ndarray ''' self.load(epoch) data = np.expand_dims(data, axis=0) embedding = self.encoder(data).cpu().numpy() return embedding.squeeze(0) def batch_inverse(self, epoch, data): """ map 2D points back into high dimensional space :param data: ndarray, (n, 2) :param epoch: num of epoch :return: high dim representation, numpy.ndarray """ self.load(epoch) representation_data = self.decoder(data).cpu().numpy() return representation_data def individual_inverse(self, epoch, data): """ map a 2D point back into high dimensional space :param data: ndarray, (1, 2) :param epoch: num of epoch :return: high dim representation, numpy.ndarray """ self.load(epoch) data = np.expand_dims(data, axis=0) representation_data = self.decoder(data).cpu().numpy() return representation_data.squeeze(0) class tfDVIDenseALProjector(ProjectorAbstractClass): def __init__(self, content_path, flag, verbose=0): self.content_path = content_path self.model_path = os.path.join(self.content_path, "Model") self.flag = flag self.curr_iteration = -1 self.curr_epoch = -1 self.encoder = None self.decoder = None self.verbose = verbose def load(self, iteration, epoch): if self.curr_iteration == iteration and self.curr_epoch == epoch: print("Current autocoder model loaded from Iteration {}/Epoch {}".format(iteration, epoch)) return encoder_location = os.path.join(self.model_path, "Iteration_{:d}".format(iteration), "Epoch_{:d}".format(epoch),"encoder" + self.flag) decoder_location = os.path.join(self.model_path, "Iteration_{:d}".format(iteration), "Epoch_{:d}".format(epoch),"decoder" + self.flag) try: self.encoder = tf.keras.models.load_model(encoder_location) self.decoder = tf.keras.models.load_model(decoder_location) if self.verbose>0: print("Keras autocoder model loaded from Epoch {}".format(epoch)) self.curr_iteration = iteration self.curr_epoch = epoch except FileNotFoundError: print("Error! Projection function has not been initialized! Pls first visualize all.") def batch_project(self, iteration, epoch, data): ''' batch project data to 2D space :param data: numpy.ndarray :param epoch: int :return: embedding numpy.ndarray ''' self.load(iteration, epoch) embedding = self.encoder(data).cpu().numpy() return embedding def individual_project(self, iteration, epoch, data): ''' project a data to 2D space :param data: numpy.ndarray :param epoch: int :return: embedding numpy.ndarray ''' self.load(iteration, epoch) data = np.expand_dims(data, axis=0) embedding = self.encoder(data).cpu().numpy() return embedding.squeeze(0) def batch_inverse(self, iteration, epoch, data): """ map 2D points back into high dimensional space :param data: ndarray, (n, 2) :param epoch: num of epoch :return: high dim representation, numpy.ndarray """ self.load(iteration, epoch) representation_data = self.decoder(data).cpu().numpy() return representation_data def individual_inverse(self, iteration, epoch, data): """ map a 2D point back into high dimensional space :param data: ndarray, (1, 2) :param epoch: num of epoch :return: high dim representation, numpy.ndarray """ self.load(iteration, epoch) data = np.expand_dims(data, axis=0) representation_data = self.decoder(data).cpu().numpy() return representation_data.squeeze(0)