SalazarPevelll
be
f291f4a
raw
history blame
48.8 kB
from abc import ABC, abstractmethod
import os
import json
import numpy as np
from scipy import stats
from sklearn.neighbors import NearestNeighbors
from scipy.spatial.distance import cosine
from singleVis.eval.evaluate import *
from singleVis.backend import *
from singleVis.utils import is_B, js_div
from singleVis.visualizer import visualizer
class EvaluatorAbstractClass(ABC):
def __init__(self, data_provider, projector, *args, **kwargs):
self.data_provider = data_provider
self.projector = projector
@abstractmethod
def eval_nn_train(self, epoch, n_neighbors):
pass
@abstractmethod
def eval_nn_test(self, epoch, n_neighbors):
pass
@abstractmethod
def eval_inv_train(self, epoch):
pass
@abstractmethod
def eval_inv_test(self, epoch):
pass
@abstractmethod
def save_epoch_eval(self, n_epoch, file_name="evaluation"):
pass
@abstractmethod
def get_eval(self, file_name="evaluation"):
pass
class Evaluator(EvaluatorAbstractClass):
def __init__(self, data_provider, projector, verbose=1):
self.data_provider = data_provider
self.projector = projector
self.verbose = verbose
####################################### ATOM #############################################
def eval_nn_train(self, epoch, n_neighbors):
train_data = self.data_provider.train_representation(epoch)
train_data = train_data.reshape(len(train_data), -1)
embedding = self.projector.batch_project(epoch, train_data)
val = evaluate_proj_nn_perseverance_knn(train_data, embedding, n_neighbors=n_neighbors, metric="euclidean")
if self.verbose:
print("#train# nn preserving: {:.2f}/{:d} in epoch {:d}".format(val, n_neighbors, epoch))
return val
def eval_nn_test(self, epoch, n_neighbors):
train_data = self.data_provider.train_representation(epoch)
train_data = train_data.reshape(len(train_data), -1)
test_data = self.data_provider.test_representation(epoch)
test_data = test_data.reshape(len(test_data), -1)
fitting_data = np.concatenate((train_data, test_data), axis=0)
embedding = self.projector.batch_project(epoch, fitting_data)
val = evaluate_proj_nn_perseverance_knn(fitting_data, embedding, n_neighbors=n_neighbors, metric="euclidean")
if self.verbose:
print("#test# nn preserving : {:.2f}/{:d} in epoch {:d}".format(val, n_neighbors, epoch))
return val
def eval_b_train(self, epoch, n_neighbors):
train_data = self.data_provider.train_representation(epoch)
train_data = train_data.reshape(len(train_data), -1)
border_centers = self.data_provider.border_representation(epoch)
border_centers = border_centers.reshape(len(border_centers), -1)
low_center = self.projector.batch_project(epoch, border_centers)
low_train = self.projector.batch_project(epoch, train_data)
val = evaluate_proj_boundary_perseverance_knn(train_data,
low_train,
border_centers,
low_center,
n_neighbors=n_neighbors)
if self.verbose:
print("#train# boundary preserving: {:.2f}/{:d} in epoch {:d}".format(val, n_neighbors, epoch))
return val
def eval_b_test(self, epoch, n_neighbors):
test_data = self.data_provider.test_representation(epoch)
test_data = test_data.reshape(len(test_data), -1)
border_centers = self.data_provider.test_border_representation(epoch)
border_centers = border_centers.reshape(len(border_centers), -1)
low_center = self.projector.batch_project(epoch, border_centers)
low_test = self.projector.batch_project(epoch, test_data)
val = evaluate_proj_boundary_perseverance_knn(test_data,
low_test,
border_centers,
low_center,
n_neighbors=n_neighbors)
if self.verbose:
print("#test# boundary preserving: {:.2f}/{:d} in epoch {:d}".format(val, n_neighbors, epoch))
return val
def eval_inv_train(self, epoch):
train_data = self.data_provider.train_representation(epoch)
embedding = self.projector.batch_project(epoch, train_data)
inv_data = self.projector.batch_inverse(epoch, embedding)
pred = self.data_provider.get_pred(epoch, train_data).argmax(axis=1)
new_pred = self.data_provider.get_pred(epoch, inv_data).argmax(axis=1)
val = evaluate_inv_accu(pred, new_pred)
if self.verbose:
print("#train# PPR: {:.2f} in epoch {:d}".format(val, epoch))
return val
def eval_inv_test(self, epoch):
test_data = self.data_provider.test_representation(epoch)
embedding = self.projector.batch_project(epoch, test_data)
inv_data = self.projector.batch_inverse(epoch, embedding)
pred = self.data_provider.get_pred(epoch, test_data).argmax(axis=1)
new_pred = self.data_provider.get_pred(epoch, inv_data).argmax(axis=1)
val = evaluate_inv_accu(pred, new_pred)
if self.verbose:
print("#test# PPR: {:.2f} in epoch {:d}".format(val, epoch))
return val
def eval_inv_dist_train(self, epoch):
train_data = self.data_provider.train_representation(epoch)
embedding = self.projector.batch_project(epoch, train_data)
inv_data = self.projector.batch_inverse(epoch, embedding)
dist = np.linalg.norm(train_data-inv_data, axis=1).mean()
if self.verbose:
print("#train# inverse projection distance: {:.2f} in epoch {:d}".format(dist, epoch))
return float(dist)
def eval_inv_dist_test(self, epoch):
test_data = self.data_provider.test_representation(epoch)
embedding = self.projector.batch_project(epoch, test_data)
inv_data = self.projector.batch_inverse(epoch, embedding)
dist = np.linalg.norm(test_data-inv_data, axis=1).mean()
if self.verbose:
print("#test# inverse projection distance: {:.2f} in epoch {:d}".format(dist, epoch))
return float(dist)
def eval_temporal_train(self, n_neighbors):
eval_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p
l = self.data_provider.train_num
alpha = np.zeros((eval_num, l))
delta_x = np.zeros((eval_num, l))
for t in range(eval_num):
prev_data = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s)
prev_embedding = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, prev_data)
curr_data = self.data_provider.train_representation((t+1) * self.data_provider.p + self.data_provider.s)
curr_embedding = self.projector.batch_project((t+1) * self.data_provider.p + self.data_provider.s, curr_data)
alpha_ = find_neighbor_preserving_rate(prev_data, curr_data, n_neighbors=n_neighbors)
delta_x_ = np.linalg.norm(prev_embedding - curr_embedding, axis=1)
alpha[t] = alpha_
delta_x[t] = delta_x_
val_corr, corr_std = evaluate_proj_temporal_perseverance_corr(alpha, delta_x)
if self.verbose:
print("Temporal preserving (train): {:.3f}\t std :{:.3f}".format(val_corr, corr_std))
return val_corr, corr_std
def eval_temporal_test(self, n_neighbors):
eval_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p
l = self.data_provider.train_num + self.data_provider.test_num
alpha = np.zeros((eval_num, l))
delta_x = np.zeros((eval_num, l))
for t in range(eval_num):
prev_data_test = self.data_provider.test_representation(t * self.data_provider.p + self.data_provider.s)
prev_data_train = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s)
prev_data = np.concatenate((prev_data_train, prev_data_test), axis=0)
prev_embedding = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, prev_data)
curr_data_test = self.data_provider.test_representation((t+1) * self.data_provider.p + self.data_provider.s)
curr_data_train = self.data_provider.train_representation((t+1) * self.data_provider.p + self.data_provider.s)
curr_data = np.concatenate((curr_data_train, curr_data_test), axis=0)
curr_embedding = self.projector.batch_project((t+1) * self.data_provider.p + self.data_provider.s, curr_data)
alpha_ = find_neighbor_preserving_rate(prev_data, curr_data, n_neighbors=n_neighbors)
delta_x_ = np.linalg.norm(prev_embedding - curr_embedding, axis=1)
alpha[t] = alpha_
delta_x[t] = delta_x_
val_corr, corr_std = evaluate_proj_temporal_perseverance_corr(alpha, delta_x)
if self.verbose:
print("Temporal preserving (test): {:.3f}\t std:{:.3f}".format(val_corr, corr_std))
return val_corr, corr_std
def eval_temporal_nn_train(self, epoch, n_neighbors):
epoch_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p + 1
l = self.data_provider.train_num
high_dists = np.zeros((l, epoch_num))
low_dists = np.zeros((l, epoch_num))
curr_data = self.data_provider.train_representation(epoch)
curr_embedding = self.projector.batch_project(epoch, curr_data)
for t in range(epoch_num):
data = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s)
embedding = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, data)
high_dist = np.linalg.norm(curr_data - data, axis=1)
low_dist = np.linalg.norm(curr_embedding - embedding, axis=1)
high_dists[:, t] = high_dist
low_dists[:, t] = low_dist
# find the index of top k dists
# argsort descent order
high_orders = np.argsort(high_dists, axis=1)
low_orders = np.argsort(low_dists, axis=1)
high_rankings = high_orders[:, 1:n_neighbors+1]
low_rankings = low_orders[:, 1:n_neighbors+1]
corr = np.zeros(len(high_dists))
for i in range(len(data)):
corr[i] = len(np.intersect1d(high_rankings[i], low_rankings[i]))
if self.verbose:
print("Temporal temporal neighbor preserving (train) for {}-th epoch {}: {:.3f}\t std :{:.3f}".format(epoch, n_neighbors, corr.mean(), corr.std()))
return float(corr.mean())
def eval_temporal_nn_test(self, epoch, n_neighbors):
epoch_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p + 1
l = self.data_provider.test_num
high_dists = np.zeros((l, epoch_num))
low_dists = np.zeros((l, epoch_num))
curr_data = self.data_provider.test_representation(epoch)
curr_embedding = self.projector.batch_project(epoch, curr_data)
for t in range(epoch_num):
data = self.data_provider.test_representation(t * self.data_provider.p + self.data_provider.s)
embedding = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, data)
high_dist = np.linalg.norm(curr_data - data, axis=1)
low_dist = np.linalg.norm(curr_embedding - embedding, axis=1)
high_dists[:, t] = high_dist
low_dists[:,t] = low_dist
# find the index of top k dists
high_orders = np.argsort(high_dists, axis=1)
low_orders = np.argsort(low_dists, axis=1)
high_rankings = high_orders[:, 1:n_neighbors+1]
low_rankings = low_orders[:, 1:n_neighbors+1]
corr = np.zeros(len(high_dists))
for i in range(len(data)):
corr[i] = len(np.intersect1d(high_rankings[i], low_rankings[i]))
if self.verbose:
print("Temporal nn preserving (test) for {}-th epoch {}: {:.3f}\t std:{:.3f}".format(epoch, n_neighbors, corr.mean(), corr.std()))
return float(corr.mean())
def eval_spatial_temporal_nn_train(self, n_neighbors, feature_dim):
"""
evaluate whether vis model can preserve the ranking of close spatial and temporal neighbors
"""
#TODO: scale up to 100 epochs, need to speed up the process...
epoch_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p + 1
train_num = self.data_provider.train_num
high_features = np.zeros((epoch_num*train_num, feature_dim))
low_features = np.zeros((epoch_num*train_num, 2))
for t in range(epoch_num):
data = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s)
high_features[t*train_num:(t+1)*train_num] = np.copy(data)
low_features[t*train_num:(t+1)*train_num] = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, data)
val = evaluate_proj_nn_perseverance_knn(high_features, low_features, n_neighbors)
if self.verbose:
print("Spatial/Temporal nn preserving (train):\t{:.3f}/{:d}".format(val, n_neighbors))
return val
def eval_spatial_temporal_nn_test(self, n_neighbors, feature_dim):
# find n temporal neighbors
epoch_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p + 1
train_num = self.data_provider.train_num
test_num = self.data_provider.test_num
num = train_num + test_num
high_features = np.zeros((epoch_num*num, feature_dim))
low_features = np.zeros((epoch_num*num, 2))
for t in range(epoch_num):
train_data = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s)
test_data = self.data_provider.test_representation(t * self.data_provider.p + self.data_provider.s)
data = np.concatenate((train_data, test_data), axis=0)
low_features[t*num:(t+1)*num] = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, data)
high_features[t*num:(t+1)*num] = np.copy(data)
val =evaluate_proj_nn_perseverance_knn(high_features, low_features, n_neighbors)
if self.verbose:
print("Spatial/Temporal nn preserving (test):\t{:.3f}/{:d}".format(val, n_neighbors))
return val
def eval_temporal_global_corr_train(self, epoch, start=None, end=None, period=None):
# check if we use the default value
if start is None:
start = self.data_provider.s
end = self.data_provider.e
period = self.data_provider.p
# set parameters
LEN = self.data_provider.train_num
EPOCH = (end - start) // period + 1
repr_dim = self.data_provider.representation_dim
all_train_repr = np.zeros((EPOCH,LEN,repr_dim))
low_repr = np.zeros((EPOCH,LEN,2))
# save all representation vectors
for i in range(start,end + 1, period):
index = (i - start) // period
all_train_repr[index] = self.data_provider.train_representation(i)
low_repr[index] = self.projector.batch_project(i, all_train_repr[index])
corrs = np.zeros(LEN)
ps = np.zeros(LEN)
for i in range(LEN):
high_embeddings = all_train_repr[:,i,:].squeeze()
low_embeddings = low_repr[:,i,:].squeeze()
high_dists = np.linalg.norm(high_embeddings - high_embeddings[(epoch - start) // period], axis=1)
low_dists = np.linalg.norm(low_embeddings - low_embeddings[(epoch - start) // period], axis=1)
corr, p = stats.spearmanr(high_dists, low_dists)
corrs[i] = corr
ps[i] = p
return corrs.mean()
def eval_temporal_global_corr_test(self, epoch, start=None, end=None, period=None):
# check if we use the default value
if start is None:
start = self.data_provider.s
end = self.data_provider.e
period = self.data_provider.p
TEST_LEN = self.data_provider.test_num
EPOCH = (end - start) // period + 1
repr_dim = self.data_provider.representation_dim
all_test_repr = np.zeros((EPOCH,TEST_LEN,repr_dim))
low_repr = np.zeros((EPOCH,TEST_LEN,2))
for i in range(start,end + 1, period):
index = (i - start) // period
all_test_repr[index] = self.data_provider.test_representation(i)
low_repr[index] = self.projector.batch_project(i, all_test_repr[index])
corrs = np.zeros(TEST_LEN)
ps = np.zeros(TEST_LEN)
e = (epoch - start) // period
for i in range(TEST_LEN):
high_embeddings = all_test_repr[:,i,:].squeeze()
low_embeddings = low_repr[:,i,:].squeeze()
high_dists = np.linalg.norm(high_embeddings - high_embeddings[e], axis=1)
low_dists = np.linalg.norm(low_embeddings - low_embeddings[e], axis=1)
corr, p = stats.spearmanr(high_dists, low_dists)
corrs[i] = corr
ps[i] = p
return corrs.mean()
def eval_temporal_weighted_global_corr_train(self, epoch, start=None, end=None, period=None):
# check if we use the default value
if start is None:
start = self.data_provider.s
end = self.data_provider.e
period = self.data_provider.p
# set parameters
LEN = self.data_provider.train_num
EPOCH = (end - start) // period + 1
repr_dim = self.data_provider.representation_dim
all_train_repr = np.zeros((EPOCH,LEN,repr_dim))
low_repr = np.zeros((EPOCH,LEN,2))
# save all representation vectors
for i in range(start,end + 1, period):
index = (i - start) // period
all_train_repr[index] = self.data_provider.train_representation(i)
low_repr[index] = self.projector.batch_project(i, all_train_repr[index])
corrs = np.zeros(LEN)
for i in range(LEN):
high_embeddings = all_train_repr[:,i,:].squeeze()
low_embeddings = low_repr[:,i,:].squeeze()
high_dists = np.linalg.norm(high_embeddings - high_embeddings[(epoch - start) // period], axis=1)
low_dists = np.linalg.norm(low_embeddings - low_embeddings[(epoch - start) // period], axis=1)
high_ranking = np.argsort(high_dists)
low_ranking = np.argsort(low_dists)
corr = evaluate_proj_temporal_weighted_global_corr(high_ranking, low_ranking)
corrs[i] = corr
return corrs.mean()
def eval_temporal_weighted_global_corr_test(self, epoch, start=None, end=None, period=None):
# check if we use the default value
if start is None:
start = self.data_provider.s
end = self.data_provider.e
period = self.data_provider.p
TEST_LEN = self.data_provider.test_num
EPOCH = (end - start) // period + 1
repr_dim = self.data_provider.representation_dim
all_test_repr = np.zeros((EPOCH,TEST_LEN,repr_dim))
low_repr = np.zeros((EPOCH,TEST_LEN,2))
for i in range(start,end + 1, period):
index = (i - start) // period
all_test_repr[index] = self.data_provider.test_representation(i)
low_repr[index] = self.projector.batch_project(i, all_test_repr[index])
corrs = np.zeros(TEST_LEN)
e = (epoch - start) // period
for i in range(TEST_LEN):
high_embeddings = all_test_repr[:,i,:].squeeze()
low_embeddings = low_repr[:,i,:].squeeze()
high_dists = np.linalg.norm(high_embeddings - high_embeddings[e], axis=1)
low_dists = np.linalg.norm(low_embeddings - low_embeddings[e], axis=1)
high_ranking = np.argsort(high_dists)
low_ranking = np.argsort(low_dists)
corr = evaluate_proj_temporal_weighted_global_corr(high_ranking, low_ranking)
corrs[i] = corr
return corrs.mean()
def eval_temporal_local_corr_train(self, epoch, stage, start=None, end=None, period=None):
# check if we use the default value
if start is None:
start = self.data_provider.s
end = self.data_provider.e
period = self.data_provider.p
timeline = np.arange(start, end+period, period)
# divide into several stages
stage_idxs = np.array_split(timeline, stage)
selected_stage = stage_idxs[np.where([epoch in i for i in stage_idxs])[0][0]]
# set parameters
LEN = self.data_provider.train_num
EPOCH = len(selected_stage)
repr_dim = self.data_provider.representation_dim
all_train_repr = np.zeros((EPOCH,LEN,repr_dim))
low_repr = np.zeros((EPOCH,LEN,2))
s = selected_stage[0]
# save all representation vectors
for i in selected_stage:
index = (i - s) // period
all_train_repr[index] = self.data_provider.train_representation(i)
low_repr[index] = self.projector.batch_project(i, all_train_repr[index])
corrs = np.zeros(LEN)
for i in range(LEN):
high_embeddings = all_train_repr[:,i,:]
low_embeddings = low_repr[:,i,:]
high_dists = np.linalg.norm(high_embeddings - high_embeddings[(epoch - s) // period], axis=1)
low_dists = np.linalg.norm(low_embeddings - low_embeddings[(epoch - s) // period], axis=1)
corr, _ = stats.spearmanr(high_dists, low_dists)
corrs[i] = corr
return corrs.mean()
def eval_temporal_local_corr_test(self, epoch, stage, start=None, end=None, period=None):
# check if we use the default value
if start is None:
start = self.data_provider.s
end = self.data_provider.e
period = self.data_provider.p
timeline = np.arange(start, end+period, period)
# divide into several stages
stage_idxs = np.array_split(timeline, stage)
selected_stage = stage_idxs[np.where([epoch in i for i in stage_idxs])[0][0]]
s=selected_stage[0]
TEST_LEN = self.data_provider.test_num
EPOCH = len(selected_stage)
repr_dim = self.data_provider.representation_dim
all_test_repr = np.zeros((EPOCH,TEST_LEN,repr_dim))
low_repr = np.zeros((EPOCH,TEST_LEN,2))
for i in selected_stage:
index = (i-s)//period
all_test_repr[index] = self.data_provider.test_representation(i)
low_repr[index] = self.projector.batch_project(i, all_test_repr[index])
corrs = np.zeros(TEST_LEN)
e = (epoch - s) // period
for i in range(TEST_LEN):
high_embeddings = all_test_repr[:,i,:]
low_embeddings = low_repr[:,i,:]
high_dists = np.linalg.norm(high_embeddings - high_embeddings[e], axis=1)
low_dists = np.linalg.norm(low_embeddings - low_embeddings[e], axis=1)
corr, _ = stats.spearmanr(high_dists, low_dists)
corrs[i] = corr
return corrs.mean()
def eval_moving_invariants_train(self, e_s, e_t, resolution=500):
train_data_s = self.data_provider.train_representation(e_s)
train_data_t = self.data_provider.train_representation(e_t)
pred_s = self.data_provider.get_pred(e_s, train_data_s)
pred_t = self.data_provider.get_pred(e_t, train_data_t)
low_s = self.projector.batch_project(e_s, train_data_s)
low_t = self.projector.batch_project(e_t, train_data_t)
s_B = is_B(pred_s)
t_B = is_B(pred_t)
predictions_s = pred_s.argmax(1)
predictions_t = pred_t.argmax(1)
# TODO implement more case where loss is not cross entropy
confident_sample = np.logical_and(np.logical_not(s_B),np.logical_not(t_B))
diff_pred = predictions_s!=predictions_t
# select confident and moving samples
selected = np.logical_and(diff_pred, confident_sample)
# background related
vis = visualizer(self.data_provider, self.projector, resolution, cmap='tab10')
grid_view_s, _ = vis.get_epoch_decision_view(e_s, resolution)
grid_view_t, _ = vis.get_epoch_decision_view(e_t, resolution)
grid_view_s = grid_view_s.reshape(resolution*resolution, -1)
grid_view_t = grid_view_t.reshape(resolution*resolution, -1)
grid_samples_s = self.projector.batch_inverse(e_s, grid_view_s)
grid_samples_t = self.projector.batch_inverse(e_t, grid_view_t)
grid_pred_s = self.data_provider.get_pred(e_s, grid_samples_s)+1e-8
grid_pred_t = self.data_provider.get_pred(e_t, grid_samples_t)+1e-8
grid_s_B = is_B(grid_pred_s)
grid_t_B = is_B(grid_pred_t)
grid_predictions_s = grid_pred_s.argmax(1)
grid_predictions_t = grid_pred_t.argmax(1)
# find nearest grid samples
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4)
high_neigh.fit(grid_view_s)
_, knn_indices = high_neigh.kneighbors(low_s, n_neighbors=1, return_distance=True)
close_s_pred = grid_predictions_s[knn_indices].squeeze()
close_s_B = grid_s_B[knn_indices].squeeze()
s_true = np.logical_and(close_s_pred==predictions_s, close_s_B == s_B)
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4)
high_neigh.fit(grid_view_t)
_, knn_indices = high_neigh.kneighbors(low_t, n_neighbors=1, return_distance=True)
close_t_pred = grid_predictions_t[knn_indices].squeeze()
close_t_B = grid_t_B[knn_indices].squeeze()
t_true = np.logical_and(close_t_pred==predictions_t, close_t_B == t_B)
moving_sample_num = np.sum(selected)
true_num = np.sum(np.logical_and(s_true[selected], t_true[selected]))
print(f'moving invariant Low/High:\t{true_num}/{moving_sample_num}')
return true_num, moving_sample_num
def eval_moving_invariants_test(self, e_s, e_t, resolution=500):
test_data_s = self.data_provider.test_representation(e_s)
test_data_t = self.data_provider.test_representation(e_t)
pred_s = self.data_provider.get_pred(e_s, test_data_s)
pred_t = self.data_provider.get_pred(e_t, test_data_t)
low_s = self.projector.batch_project(e_s, test_data_s)
low_t = self.projector.batch_project(e_t, test_data_t)
s_B = is_B(pred_s)
t_B = is_B(pred_t)
predictions_s = pred_s.argmax(1)
predictions_t = pred_t.argmax(1)
confident_sample = np.logical_and(np.logical_not(s_B),np.logical_not(t_B))
diff_pred = predictions_s!=predictions_t
selected = np.logical_and(diff_pred, confident_sample)
# background related
vis = visualizer(self.data_provider, self.projector, resolution, cmap='tab10')
grid_view_s, _ = vis.get_epoch_decision_view(e_s, resolution)
grid_view_t, _ = vis.get_epoch_decision_view(e_t, resolution)
grid_view_s = grid_view_s.reshape(resolution*resolution, -1)
grid_view_t = grid_view_t.reshape(resolution*resolution, -1)
grid_samples_s = self.projector.batch_inverse(e_s, grid_view_s)
grid_samples_t = self.projector.batch_inverse(e_t, grid_view_t)
grid_pred_s = self.data_provider.get_pred(e_s, grid_samples_s)+1e-8
grid_pred_t = self.data_provider.get_pred(e_t, grid_samples_t)+1e-8
grid_s_B = is_B(grid_pred_s)
grid_t_B = is_B(grid_pred_t)
grid_predictions_s = grid_pred_s.argmax(1)
grid_predictions_t = grid_pred_t.argmax(1)
# find nearest grid samples
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4)
high_neigh.fit(grid_view_s)
_, knn_indices = high_neigh.kneighbors(low_s, n_neighbors=1, return_distance=True)
close_s_pred = grid_predictions_s[knn_indices].squeeze()
close_s_B = grid_s_B[knn_indices].squeeze()
s_true = np.logical_and(close_s_pred==predictions_s, close_s_B == s_B)
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4)
high_neigh.fit(grid_view_t)
_, knn_indices = high_neigh.kneighbors(low_t, n_neighbors=1, return_distance=True)
close_t_pred = grid_predictions_t[knn_indices].squeeze()
close_t_B = grid_t_B[knn_indices].squeeze()
t_true = np.logical_and(close_t_pred==predictions_t, close_t_B == t_B)
moving_sample_num = np.sum(selected)
true_num = np.sum(np.logical_and(s_true[selected], t_true[selected]))
print(f'moving invariant Low/High:\t{true_num}/{moving_sample_num}')
return true_num, moving_sample_num
def eval_fixing_invariants_train(self, e_s, e_t, high_threshold, low_threshold, metric="euclidean"):
train_data_s = self.data_provider.train_representation(e_s)
train_data_t = self.data_provider.train_representation(e_t)
# _, high_threshold = find_nearest(train_data_s)
pred_s = self.data_provider.get_pred(e_s, train_data_s)
pred_t = self.data_provider.get_pred(e_t, train_data_t)
softmax_s = softmax(pred_s, axis=1)
softmax_t = softmax(pred_t, axis=1)
low_s = self.projector.batch_project(e_s, train_data_s)
low_t = self.projector.batch_project(e_t, train_data_t)
# normalize low_t
y_max = max(low_s[:, 1].max(), low_t[:, 1].max())
y_min = max(low_s[:, 1].min(), low_t[:, 1].min())
x_max = max(low_s[:, 0].max(), low_t[:, 0].max())
x_min = max(low_s[:, 0].min(), low_t[:, 0].min())
scale = min(100/(x_max - x_min), 100/(y_max - y_min))
low_t = low_t*scale
low_s = low_s*scale
if metric == "euclidean":
high_dists = np.linalg.norm(train_data_s-train_data_t, axis=1)
elif metric == "cosine":
high_dists = np.array([cosine(low_t[i], low_s[i]) for i in range(len(low_s))])
elif metric == "softmax":
high_dists = np.array([js_div(softmax_s[i], softmax_t[i]) for i in range(len(softmax_t))])
low_dists = np.linalg.norm(low_s-low_t, axis=1)
selected = high_dists<=high_threshold
return np.sum(np.logical_and(selected, low_dists<=low_threshold)), np.sum(selected)
def eval_fixing_invariants_test(self, e_s, e_t, high_threshold, low_threshold, metric="euclidean"):
test_data_s = self.data_provider.test_representation(e_s)
test_data_t = self.data_provider.test_representation(e_t)
# _, high_threshold = find_nearest(test_data_s)
pred_s = self.data_provider.get_pred(e_s, test_data_s)
pred_t = self.data_provider.get_pred(e_t, test_data_t)
softmax_s = softmax(pred_s, axis=1)
softmax_t = softmax(pred_t, axis=1)
low_s = self.projector.batch_project(e_s, test_data_s)
low_t = self.projector.batch_project(e_t, test_data_t)
# normalize low_t
y_max = max(low_s[:, 1].max(), low_t[:, 1].max())
y_min = max(low_s[:, 1].min(), low_t[:, 1].min())
x_max = max(low_s[:, 0].max(), low_t[:, 0].max())
x_min = max(low_s[:, 0].min(), low_t[:, 0].min())
scale = min(100/(x_max - x_min), 100/(y_max - y_min))
low_t = low_t*scale
low_s = low_s*scale
if metric == "euclidean":
high_dists = np.linalg.norm(test_data_s-test_data_t, axis=1)
elif metric == "cosine":
high_dists = np.array([cosine(low_t[i], low_s[i]) for i in range(len(low_s))])
elif metric == "softmax":
high_dists = np.array([js_div(softmax_s[i], softmax_t[i]) for i in range(len(softmax_t))])
low_dists = np.linalg.norm(low_s-low_t, axis=1)
selected = high_dists<=high_threshold
return np.sum(np.logical_and(selected, low_dists<=low_threshold)), np.sum(selected)
def eval_proj_invariants_train(self, e, resolution=500):
train_data = self.data_provider.train_representation(e)
pred_s = self.data_provider.get_pred(e, train_data)
low_s = self.projector.batch_project(e, train_data)
s_B = is_B(pred_s)
predictions_s = pred_s.argmax(1)
# background related
vis = visualizer(self.data_provider, self.projector, resolution, cmap='tab10')
grid_view_s, _ = vis.get_epoch_decision_view(e, resolution)
grid_view_s = grid_view_s.reshape(resolution*resolution, -1)
grid_samples_s = self.projector.batch_inverse(e, grid_view_s)
grid_pred_s = self.data_provider.get_pred(e, grid_samples_s)+1e-8
grid_s_B = is_B(grid_pred_s)
grid_predictions_s = grid_pred_s.argmax(1)
# find nearest grid samples
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4)
high_neigh.fit(grid_view_s)
_, knn_indices = high_neigh.kneighbors(low_s, n_neighbors=1, return_distance=True)
close_s_pred = grid_predictions_s[knn_indices].squeeze()
close_s_B = grid_s_B[knn_indices].squeeze()
border_true = np.logical_and(s_B, close_s_B)
pred_true = np.logical_and(close_s_pred==predictions_s, np.logical_not(s_B))
print("border fixing invariants:\t{}/{}".format(np.sum(border_true), np.sum(s_B)))
print("prediction fixing invariants:\t{}/{}".format(np.sum(pred_true), np.sum(np.logical_not(s_B))))
print("invariants:\t{}/{}".format(np.sum(border_true)+np.sum(pred_true), len(train_data)))
return np.sum(border_true), np.sum(pred_true), len(train_data)
def eval_proj_invariants_test(self, e, resolution=500):
test_data = self.data_provider.test_representation(e)
pred_s = self.data_provider.get_pred(e, test_data)
low_s = self.projector.batch_project(e, test_data)
s_B = is_B(pred_s)
predictions_s = pred_s.argmax(1)
# background related
vis = visualizer(self.data_provider, self.projector, resolution, cmap='tab10')
grid_view_s, _ = vis.get_epoch_decision_view(e, resolution)
grid_view_s = grid_view_s.reshape(resolution*resolution, -1)
grid_samples_s = self.projector.batch_inverse(e, grid_view_s)
grid_pred_s = self.data_provider.get_pred(e, grid_samples_s)+1e-8
grid_s_B = is_B(grid_pred_s)
grid_predictions_s = grid_pred_s.argmax(1)
# find nearest grid samples
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4)
high_neigh.fit(grid_view_s)
_, knn_indices = high_neigh.kneighbors(low_s, n_neighbors=1, return_distance=True)
close_s_pred = grid_predictions_s[knn_indices].squeeze()
close_s_B = grid_s_B[knn_indices].squeeze()
border_true = np.logical_and(s_B, close_s_B)
pred_true = np.logical_and(close_s_pred==predictions_s, np.logical_not(s_B))
print("border fixing invariants:\t{}/{}".format(np.sum(border_true), np.sum(s_B)))
print("prediction fixing invariants:\t{}/{}".format(np.sum(pred_true), np.sum(np.logical_not(s_B))))
print("invariants:\t{}/{}".format(np.sum(border_true)+np.sum(pred_true), len(test_data)))
return np.sum(border_true), np.sum(pred_true), len(test_data)
def train_acc(self, epoch):
data = self.data_provider.train_representation(epoch)
labels = self.data_provider.train_labels(epoch)
pred = self.data_provider.get_pred(epoch, data).argmax(1)
return np.sum(labels==pred)/len(labels)
def test_acc(self, epoch):
data = self.data_provider.test_representation(epoch)
labels = self.data_provider.test_labels(epoch)
pred = self.data_provider.get_pred(epoch, data).argmax(1)
return np.sum(labels==pred)/len(labels)
#################################### helper functions #############################################
def save_epoch_eval(self, n_epoch, n_neighbors, temporal_k=5, file_name="evaluation"):
# save result
save_dir = os.path.join(self.data_provider.model_path)
save_file = os.path.join(save_dir, file_name + ".json")
if not os.path.exists(save_file):
evaluation = dict()
else:
f = open(save_file, "r")
evaluation = json.load(f)
f.close()
n_key = str(n_neighbors)
if "train_acc" not in evaluation.keys():
evaluation["train_acc"] = dict()
if "test_acc" not in evaluation.keys():
evaluation["test_acc"] = dict()
if "nn_train" not in evaluation:
evaluation["nn_train"] = dict()
if "nn_test" not in evaluation:
evaluation["nn_test"] = dict()
if "b_train" not in evaluation:
evaluation["b_train"] = dict()
if "b_test" not in evaluation:
evaluation["b_test"] = dict()
if "ppr_train" not in evaluation.keys():
evaluation["ppr_train"] = dict()
if "ppr_test" not in evaluation.keys():
evaluation["ppr_test"] = dict()
if "ppr_dist_train" not in evaluation.keys():
evaluation["ppr_dist_train"] = dict()
if "ppr_dist_test" not in evaluation.keys():
evaluation["ppr_dist_test"] = dict()
if "tnn_train" not in evaluation.keys():
evaluation["tnn_train"] = dict()
if "tnn_test" not in evaluation.keys():
evaluation["tnn_test"] = dict()
if "tr_train" not in evaluation.keys():
evaluation["tr_train"] = dict()
if "tr_test" not in evaluation.keys():
evaluation["tr_test"] = dict()
if "wtr_train" not in evaluation.keys():
evaluation["wtr_train"] = dict()
if "wtr_test" not in evaluation.keys():
evaluation["wtr_test"] = dict()
if "tlr_train" not in evaluation.keys():
evaluation["tlr_train"] = dict()
if "tlr_test" not in evaluation.keys():
evaluation["tlr_test"] = dict()
if "temporal_train_mean" not in evaluation.keys():
evaluation["temporal_train_mean"] = dict()
if "temporal_test_mean" not in evaluation.keys():
evaluation["temporal_test_mean"] = dict()
epoch_key = str(n_epoch)
if epoch_key not in evaluation["nn_train"]:
evaluation["nn_train"][epoch_key] = dict()
evaluation["nn_train"][epoch_key][n_key] = self.eval_nn_train(n_epoch, n_neighbors)
if epoch_key not in evaluation["nn_test"]:
evaluation["nn_test"][epoch_key] = dict()
evaluation["nn_test"][epoch_key][n_key] = self.eval_nn_test(n_epoch, n_neighbors)
# if epoch_key not in evaluation["b_train"]:
# evaluation["b_train"][epoch_key] = dict()
# evaluation["b_train"][epoch_key][n_key] = self.eval_b_train(n_epoch, n_neighbors)
# if epoch_key not in evaluation["b_test"]:
# evaluation["b_test"][epoch_key] = dict()
# evaluation["b_test"][epoch_key][n_key] = self.eval_b_test(n_epoch, n_neighbors)
if epoch_key not in evaluation["ppr_train"]:
evaluation["ppr_train"][epoch_key] = dict()
evaluation["ppr_train"][epoch_key] = self.eval_inv_train(n_epoch)
if epoch_key not in evaluation["ppr_test"]:
evaluation["ppr_test"][epoch_key] = dict()
evaluation["ppr_test"][epoch_key] = self.eval_inv_test(n_epoch)
# evaluation["ppr_dist_train"][epoch_key] = self.eval_inv_dist_train(n_epoch)
# evaluation["ppr_dist_test"][epoch_key] = self.eval_inv_dist_test(n_epoch)
evaluation["train_acc"][epoch_key] = self.train_acc(n_epoch)
evaluation["test_acc"][epoch_key] = self.test_acc(n_epoch)
# # local temporal
# if epoch_key not in evaluation["tnn_train"].keys():
# evaluation["tnn_train"][epoch_key] = dict()
# if epoch_key not in evaluation["tnn_test"].keys():
# evaluation["tnn_test"][epoch_key] = dict()
# evaluation["tnn_train"][epoch_key][str(temporal_k)] = self.eval_temporal_nn_train(n_epoch, temporal_k)
# evaluation["tnn_test"][epoch_key][str(temporal_k)] = self.eval_temporal_nn_test(n_epoch, temporal_k)
# # global temporal ranking
# evaluation["tr_train"][epoch_key] = self.eval_temporal_global_corr_train(n_epoch)
# evaluation["tr_test"][epoch_key] = self.eval_temporal_global_corr_test(n_epoch)
# weighted global temporal ranking
# evaluation["wtr_train"][epoch_key] = self.eval_temporal_weighted_global_corr_train(n_epoch)
# evaluation["wtr_test"][epoch_key] = self.eval_temporal_weighted_global_corr_test(n_epoch)
# # local temporal ranking
# evaluation["tlr_train"][epoch_key] = self.eval_temporal_local_corr_train(n_epoch, 3)
# evaluation["tlr_test"][epoch_key] = self.eval_temporal_local_corr_test(n_epoch,3)
# # temporal
# t_train_val, _ = self.eval_temporal_train(n_neighbors)
# evaluation["temporal_train_mean"][n_key] = t_train_val
# t_test_val, _ = self.eval_temporal_test(n_neighbors)
# evaluation["temporal_test_mean"][n_key] = t_test_val
with open(save_file, "w") as f:
json.dump(evaluation, f)
if self.verbose:
print("Successfully save evaluation with {:d} neighbors...".format(n_neighbors))
def get_eval(self, file_name="evaluation"):
save_dir = os.path.join(self.data_provider.model_path, file_name + ".json")
f = open(save_dir, "r")
evaluation = json.load(f)
f.close()
return evaluation
class SegEvaluator(Evaluator):
def __init__(self, data_provider, projector, exp, verbose=1):
super().__init__(data_provider, projector, verbose)
self.exp = exp
def save_epoch_eval(self, n_epoch, n_neighbors, temporal_k=5, file_name="evaluation"):
# save result
save_dir = os.path.join(self.data_provider.model_path, "{}".format(self.exp))
save_file = os.path.join(save_dir, file_name + ".json")
if not os.path.exists(save_file):
evaluation = dict()
else:
f = open(save_file, "r")
evaluation = json.load(f)
f.close()
n_key = str(n_neighbors)
# if "train_acc" not in evaluation.keys():
# evaluation["train_acc"] = dict()
# if "test_acc" not in evaluation.keys():
# evaluation["test_acc"] = dict()
if "nn_train" not in evaluation:
evaluation["nn_train"] = dict()
if "nn_test" not in evaluation:
evaluation["nn_test"] = dict()
# if "b_train" not in evaluation:
# evaluation["b_train"] = dict()
# if "b_test" not in evaluation:
# evaluation["b_test"] = dict()
if "ppr_train" not in evaluation.keys():
evaluation["ppr_train"] = dict()
if "ppr_test" not in evaluation.keys():
evaluation["ppr_test"] = dict()
# if "tnn_train" not in evaluation.keys():
# evaluation["tnn_train"] = dict()
# if "tnn_test" not in evaluation.keys():
# evaluation["tnn_test"] = dict()
# if "tr_train" not in evaluation.keys():
# evaluation["tr_train"] = dict()
# if "tr_test" not in evaluation.keys():
# evaluation["tr_test"] = dict()
if "tlr_train" not in evaluation.keys():
evaluation["tlr_train"] = dict()
if "tlr_test" not in evaluation.keys():
evaluation["tlr_test"] = dict()
epoch_key = str(n_epoch)
if epoch_key not in evaluation["nn_train"]:
evaluation["nn_train"][epoch_key] = dict()
evaluation["nn_train"][epoch_key][n_key] = self.eval_nn_train(n_epoch, n_neighbors)
if epoch_key not in evaluation["nn_test"]:
evaluation["nn_test"][epoch_key] = dict()
evaluation["nn_test"][epoch_key][n_key] = self.eval_nn_test(n_epoch, n_neighbors)
# if epoch_key not in evaluation["b_train"]:
# evaluation["b_train"][epoch_key] = dict()
# evaluation["b_train"][epoch_key][n_key] = self.eval_b_train(n_epoch, n_neighbors)
# if epoch_key not in evaluation["b_test"]:
# evaluation["b_test"][epoch_key] = dict()
# evaluation["b_test"][epoch_key][n_key] = self.eval_b_test(n_epoch, n_neighbors)
evaluation["ppr_train"][epoch_key] = self.eval_inv_train(n_epoch)
evaluation["ppr_test"][epoch_key] = self.eval_inv_test(n_epoch)
# local temporal
# if epoch_key not in evaluation["tnn_train"].keys():
# evaluation["tnn_train"][epoch_key] = dict()
# if epoch_key not in evaluation["tnn_test"].keys():
# evaluation["tnn_test"][epoch_key] = dict()
# evaluation["tnn_train"][epoch_key][str(temporal_k)] = self.eval_temporal_nn_train(n_epoch, temporal_k)
# evaluation["tnn_test"][epoch_key][str(temporal_k)] = self.eval_temporal_nn_test(n_epoch, temporal_k)
# global ranking temporal
# evaluation["tr_train"][epoch_key] = self.eval_temporal_global_corr_train(n_epoch)
# evaluation["tr_test"][epoch_key] = self.eval_temporal_global_corr_test(n_epoch)
# local ranking temporal
evaluation["tlr_train"][epoch_key] = self.eval_temporal_local_corr_train(n_epoch, 3)
evaluation["tlr_test"][epoch_key] = self.eval_temporal_local_corr_test(n_epoch, 3)
# evaluation["train_acc"][epoch_key] = self.train_acc(n_epoch)
# evaluation["test_acc"][epoch_key] = self.test_acc(n_epoch)
# temporal
# t_train_val, t_train_std = self.eval_temporal_train(n_neighbors)
# evaluation[n_key]["temporal_train_mean"] = t_train_val
# evaluation[n_key]["temporal_train_std"] = t_train_std
# t_test_val, t_test_std = self.eval_temporal_test(n_neighbors)
# evaluation[n_key]["temporal_test_mean"] = t_test_val
# evaluation[n_key]["temporal_test_std"] = t_test_std
with open(save_file, "w") as f:
json.dump(evaluation, f)
if self.verbose:
print("Successfully save evaluation with {:d} neighbors...".format(n_neighbors))
def get_eval(self, file_name="evaluation"):
save_dir = os.path.join(self.data_provider.model_path, "{}".format(self.exp), file_name + ".json")
f = open(save_dir, "r")
evaluation = json.load(f)
f.close()
return evaluation
class ALEvaluator(Evaluator):
def __init__(self, data_provider, projector, verbose=1):
super().__init__(data_provider, projector, verbose)
def train_acc(self, epoch):
data = self.data_provider.train_representation(epoch)
labels = self.data_provider.train_labels(epoch)
pred = self.data_provider.get_pred(epoch, data).argmax(1)
return np.sum(labels==pred)/len(labels)
#################################### helper functions #############################################
def save_epoch_eval(self, n_epoch, file_name="evaluation"):
# save result
save_dir = os.path.join(self.data_provider.model_path)
save_file = os.path.join(save_dir, file_name + ".json")
if not os.path.exists(save_file):
evaluation = dict()
else:
f = open(save_file, "r")
evaluation = json.load(f)
f.close()
if "train_acc" not in evaluation.keys():
evaluation["train_acc"] = dict()
if "test_acc" not in evaluation.keys():
evaluation["test_acc"] = dict()
epoch_key = str(n_epoch)
evaluation["train_acc"][epoch_key] = self.train_acc(n_epoch)
evaluation["test_acc"][epoch_key] = self.test_acc(n_epoch)
with open(save_file, "w") as f:
json.dump(evaluation, f)
if self.verbose:
print("Successfully save evaluation for Iteration {}".format(epoch_key))
class DenseALEvaluator(Evaluator):
# TODO
def __init__(self, data_provider, projector, verbose=1):
super().__init__(data_provider, projector, verbose)