|
""" |
|
Help functions to evaluate our visualization system |
|
""" |
|
|
|
import numpy as np |
|
from pynndescent import NNDescent |
|
from sklearn.neighbors import NearestNeighbors |
|
from sklearn.manifold import trustworthiness |
|
from scipy.stats import kendalltau, spearmanr, pearsonr, rankdata |
|
|
|
|
|
def evaluate_isAlign(embeddingLeft, embeddingRight, align_metric=1): |
|
lens = len(embeddingLeft) |
|
align_indices = [] |
|
for i in range(lens): |
|
dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[i]) |
|
if dist < align_metric: |
|
align_indices.append(i) |
|
return align_indices |
|
|
|
|
|
def evaluate_isAlign_single(embeddingLeft, embeddingRight, selected_left, selected_right,align_metric=1): |
|
lens = len(embeddingLeft) |
|
align_indices_left = [] |
|
align_indices_right = [] |
|
|
|
if selected_left != -1: |
|
for i in range(lens): |
|
dist = np.linalg.norm( embeddingLeft[selected_left]-embeddingRight[i]) |
|
if dist < align_metric: |
|
align_indices_right.append(i) |
|
if selected_right != -1: |
|
for i in range(lens): |
|
dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[selected_right]) |
|
if dist < align_metric: |
|
align_indices_left.append(i) |
|
|
|
return align_indices_left, align_indices_right |
|
|
|
def evaluate_isNearestNeighbour(embeddingLeft, embeddingRight, n_neighbors=15, metric="euclidean"): |
|
""" |
|
Find indices where none of the nearest neighbors in embeddingLeft are preserved in embeddingRight. |
|
:param embeddingLeft: ndarray, first set of low dimensional representations |
|
:param embeddingRight: ndarray, second set of low dimensional representations |
|
:param n_neighbors: int, number of nearest neighbors to consider |
|
:param metric: str, metric for nearest neighbor calculation, default "euclidean" |
|
:return: list of indices where none of the neighbors are preserved |
|
""" |
|
n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) |
|
n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) |
|
|
|
nnd_left = NNDescent( |
|
embeddingLeft, |
|
n_neighbors=n_neighbors, |
|
metric=metric, |
|
n_trees=n_trees, |
|
n_iters=n_iters, |
|
max_candidates=60, |
|
verbose=True |
|
) |
|
left_neighbors, _ = nnd_left.neighbor_graph |
|
|
|
nnd_right = NNDescent( |
|
embeddingRight, |
|
n_neighbors=n_neighbors, |
|
metric=metric, |
|
n_trees=n_trees, |
|
n_iters=n_iters, |
|
max_candidates=60, |
|
verbose=True |
|
) |
|
right_neighbors, _ = nnd_right.neighbor_graph |
|
|
|
non_preserved_indices = [] |
|
for i in range(len(embeddingLeft)): |
|
if len(np.intersect1d(left_neighbors[i], right_neighbors[i])) == 0: |
|
non_preserved_indices.append(i) |
|
|
|
return non_preserved_indices |
|
|
|
def evaluate_isNearestNeighbour_single(embeddingLeft, embeddingRight, selected_left, selected_right, n_neighbors=15, metric="euclidean"): |
|
|
|
n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) |
|
n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) |
|
|
|
nnd_left = NNDescent( |
|
embeddingLeft, |
|
n_neighbors=n_neighbors, |
|
metric=metric, |
|
n_trees=n_trees, |
|
n_iters=n_iters, |
|
max_candidates=60, |
|
verbose=True |
|
) |
|
left_neighbors, _ = nnd_left.neighbor_graph |
|
|
|
nnd_right = NNDescent( |
|
embeddingRight, |
|
n_neighbors=n_neighbors, |
|
metric=metric, |
|
n_trees=n_trees, |
|
n_iters=n_iters, |
|
max_candidates=60, |
|
verbose=True |
|
) |
|
right_neighbors, _ = nnd_right.neighbor_graph |
|
|
|
nearest_neighbors_leftE_rightS = [] |
|
nearest_neighbors_rightE_rightS = [] |
|
nearest_neighbors_leftE_leftS = [] |
|
nearest_neighbors_rightE_leftS = [] |
|
if selected_left != -1: |
|
nearest_neighbors_leftE_leftS = left_neighbors[selected_left].tolist() |
|
nearest_neighbors_rightE_leftS = right_neighbors[selected_left].tolist() |
|
|
|
if selected_right != -1: |
|
|
|
nearest_neighbors_leftE_rightS = left_neighbors[selected_right].tolist() |
|
nearest_neighbors_rightE_rightS = right_neighbors[selected_right].tolist() |
|
|
|
return nearest_neighbors_leftE_leftS, nearest_neighbors_leftE_rightS, nearest_neighbors_rightE_leftS, nearest_neighbors_rightE_rightS |
|
|
|
def evaluate_proj_nn_perseverance_knn(data, embedding, n_neighbors, metric="euclidean", selected_indices=None): |
|
""" |
|
evaluate projection function, nn preserving property using knn algorithm |
|
:param data: ndarray, high dimensional representations |
|
:param embedding: ndarray, low dimensional representations |
|
:param n_neighbors: int, the number of neighbors |
|
:param metric: str, by default "euclidean" |
|
:return nn property: float, nn preserving property |
|
""" |
|
n_trees = 5 + int(round((data.shape[0]) ** 0.5 / 20.0)) |
|
n_iters = max(5, int(round(np.log2(data.shape[0])))) |
|
|
|
nnd = NNDescent( |
|
data, |
|
n_neighbors=n_neighbors, |
|
metric=metric, |
|
n_trees=n_trees, |
|
n_iters=n_iters, |
|
max_candidates=60, |
|
verbose=True |
|
) |
|
high_ind, _ = nnd.neighbor_graph |
|
nnd = NNDescent( |
|
embedding, |
|
n_neighbors=n_neighbors, |
|
metric=metric, |
|
n_trees=n_trees, |
|
n_iters=n_iters, |
|
max_candidates=60, |
|
verbose=True |
|
) |
|
low_ind, _ = nnd.neighbor_graph |
|
lens_data = len(data) |
|
|
|
if selected_indices: |
|
lens_data = len(selected_indices) |
|
high_ind = high_ind[selected_indices] |
|
low_ind = low_ind[selected_indices] |
|
|
|
border_pres = np.zeros(lens_data) |
|
for i in range(lens_data): |
|
border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) |
|
|
|
|
|
return border_pres.mean() |
|
|
|
from sklearn.metrics import pairwise_distances |
|
import numpy as np |
|
|
|
def subset_trustworthiness(X, X_subset, X_embedded, X_subset_embedded, n_neighbors=5): |
|
|
|
dist_X = pairwise_distances(X) |
|
dist_X_embedded = pairwise_distances(X_embedded) |
|
|
|
|
|
subset_indices = np.where(np.isin(X, X_subset))[0] |
|
|
|
|
|
neighbors_X = np.argsort(dist_X)[:, 1:n_neighbors+1] |
|
subset_neighbors_X = neighbors_X[subset_indices] |
|
|
|
|
|
neighbors_X_embedded = np.argsort(dist_X_embedded)[:, 1:n_neighbors+1] |
|
subset_neighbors_X_embedded = neighbors_X_embedded[subset_indices] |
|
|
|
|
|
trustworthiness_val = 0 |
|
for i, subset_index in enumerate(subset_indices): |
|
for j, neighbor in enumerate(subset_neighbors_X_embedded[i]): |
|
rank_in_X = np.where(neighbors_X[subset_index] == neighbor)[0][0] |
|
if rank_in_X >= n_neighbors: |
|
trustworthiness_val += rank_in_X - n_neighbors + 1 |
|
|
|
trustworthiness_val = trustworthiness_val / (X_subset.shape[0] * n_neighbors) |
|
trustworthiness_val = 1 - (2. / n_neighbors) * trustworthiness_val |
|
|
|
return trustworthiness_val |
|
|
|
|
|
|
|
|
|
|
|
|
|
def evaluate_proj_nn_perseverance_trustworthiness(data, embedding, n_neighbors, metric="euclidean"): |
|
""" |
|
evaluate projection function, nn preserving property using trustworthiness formula |
|
:param data: ndarray, high dimensional representations |
|
:param embedding: ndarray, low dimensional representations |
|
:param n_neighbors: int, the number of neighbors |
|
:param metric: str, by default "euclidean" |
|
:return nn property: float, nn preserving property |
|
""" |
|
t = trustworthiness(data, embedding, n_neighbors=n_neighbors, metric=metric) |
|
return t |
|
|
|
|
|
def evaluate_proj_boundary_perseverance_knn(data, embedding, high_centers, low_centers, n_neighbors): |
|
""" |
|
evaluate projection function, boundary preserving property |
|
:param data: ndarray, high dimensional representations |
|
:param embedding: ndarray, low dimensional representations |
|
:param high_centers: ndarray, border points high dimensional representations |
|
:param low_centers: ndarray, border points low dimensional representations |
|
:param n_neighbors: int, the number of neighbors |
|
:return boundary preserving property: float,boundary preserving property |
|
""" |
|
high_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) |
|
high_neigh.fit(high_centers) |
|
high_ind = high_neigh.kneighbors(data, n_neighbors=n_neighbors, return_distance=False) |
|
|
|
low_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) |
|
low_neigh.fit(low_centers) |
|
low_ind = low_neigh.kneighbors(embedding, n_neighbors=n_neighbors, return_distance=False) |
|
|
|
border_pres = np.zeros(len(data)) |
|
for i in range(len(data)): |
|
border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) |
|
|
|
|
|
return border_pres.mean() |
|
|
|
|
|
def evaluate_proj_temporal_perseverance_corr(alpha, delta_x): |
|
""" |
|
Evaluate temporal preserving property, |
|
calculate the correlation between neighbor preserving rate and moving distance in low dim in a time sequence |
|
:param alpha: ndarray, shape(N,) neighbor preserving rate |
|
:param delta_x: ndarray, shape(N,), moved distance in low dim for each point |
|
:return corr: ndarray, shape(N,), correlation for each point from temporal point of view |
|
""" |
|
alpha = alpha.T |
|
delta_x = delta_x.T |
|
shape = alpha.shape |
|
data_num = shape[0] |
|
corr = np.zeros(data_num) |
|
for i in range(data_num): |
|
|
|
correlation, pvalue = pearsonr(alpha[i], delta_x[i]) |
|
if np.isnan(correlation): |
|
correlation = 0.0 |
|
corr[i] = correlation |
|
return corr.mean(), corr.std() |
|
|
|
|
|
def evaluate_inv_distance(data, inv_data): |
|
""" |
|
The distance between original data and reconstruction data |
|
:param data: ndarray, high dimensional data |
|
:param inv_data: ndarray, reconstruction data |
|
:return err: mse, reconstruction error |
|
""" |
|
return np.linalg.norm(data-inv_data, axis=1).mean() |
|
|
|
|
|
def evaluate_inv_accu(labels, pred): |
|
""" |
|
prediction accuracy of reconstruction data |
|
:param labels: ndarray, shape(N,), label for each point |
|
:param pred: ndarray, shape(N,), prediction for each point |
|
:return accu: float, the reconstruction accuracy |
|
""" |
|
return np.sum(labels == pred) / len(labels) |
|
|
|
|
|
def evaluate_inv_conf(labels, ori_pred, new_pred): |
|
""" |
|
the confidence difference between original data and reconstruction data |
|
:param labels: ndarray, shape(N,), the original prediction for each point |
|
:param ori_pred: ndarray, shape(N,10), the prediction of original data |
|
:param new_pred: ndarray, shape(N,10), the prediction of reconstruction data |
|
:return diff: float, the mean of confidence difference for each point |
|
""" |
|
old_conf = [ori_pred[i, labels[i]] for i in range(len(labels))] |
|
new_conf = [new_pred[i, labels[i]] for i in range(len(labels))] |
|
old_conf = np.array(old_conf) |
|
new_conf = np.array(new_conf) |
|
|
|
diff = np.abs(old_conf - new_conf) |
|
|
|
return diff.mean() |
|
|
|
|
|
def evaluate_proj_temporal_perseverance_entropy(alpha, delta_x): |
|
""" |
|
(discard) |
|
calculate the temporal preserving property |
|
based on the correlation between the entropy of moved distance and neighbor preserving rate(alpha) |
|
:param alpha: ndarray, shape(N,), neighbor preserving rate for each point |
|
:param delta_x: ndarray, shape(N,), the moved distance in low dim for each point |
|
:return corr: float, the mean of all correlation |
|
""" |
|
alpha = alpha.T |
|
delta_x = delta_x.T |
|
shape = alpha.shape |
|
data_num = shape[0] |
|
|
|
|
|
|
|
delta_x_norm = delta_x.max() |
|
delta_x_norm = delta_x / delta_x_norm |
|
|
|
alpha = np.floor(alpha*10) |
|
delta_x_norm = np.floor(delta_x_norm*10) |
|
|
|
corr = np.zeros(len(alpha)) |
|
|
|
for i in range(len(alpha)): |
|
|
|
index = list() |
|
entropy = list() |
|
for j in range(11): |
|
dx = delta_x_norm[i][np.where(alpha[i] == j)] |
|
entropy_x = np.zeros(11) |
|
for k in range(11): |
|
entropy_x[k] = np.sum(dx == k) |
|
if np.sum(entropy_x) == 0: |
|
continue |
|
else: |
|
entropy_x = entropy_x / np.sum(entropy_x+10e-8) |
|
entropy_x = np.sum(entropy_x*np.log(entropy_x+10e-8)) |
|
entropy.append(-entropy_x) |
|
index.append(j) |
|
if len(index) < 2: |
|
print("no enough data to form a correlation, setting correlation to be 0") |
|
corr[i] = 0 |
|
else: |
|
correlation, _ = pearsonr(index, entropy) |
|
corr[i] = correlation |
|
|
|
return corr.mean() |
|
|
|
|
|
def evaluate_proj_temporal_global_corr(high_rank, low_rank): |
|
l = len(high_rank) |
|
tau_l = np.zeros(l) |
|
p_l = np.zeros(l) |
|
for i in range(l): |
|
r1 = high_rank[i] |
|
r2 = low_rank[i] |
|
tau, p = spearmanr(r1, r2) |
|
tau_l[i] = tau |
|
p_l[i] = p |
|
return tau_l, p_l |
|
|
|
|
|
def _wcov(x, y, w, ms): |
|
return np.sum(w * (x - ms[0]) * (y - ms[1])) |
|
def _wpearson(x, y, w): |
|
mx, my = (np.sum(i * w) / np.sum(w) for i in [x, y]) |
|
return _wcov(x, y, w, [mx, my]) / np.sqrt(_wcov(x, x, w, [mx, mx]) * _wcov(y, y, w, [my, my])) |
|
def evaluate_proj_temporal_weighted_global_corr(high_rank, low_rank): |
|
k = len(high_rank) |
|
r = rankdata(high_rank).astype("int")-1 |
|
tau = _wpearson(high_rank[r], low_rank[r], 1/np.arange(1, k+1)) |
|
return tau |
|
|
|
|
|
def evaluate_keep_B(low_B, grid_view, decision_view, threshold=0.8): |
|
""" |
|
evaluate whether high dimensional boundary points still lying on Boundary in low-dimensional space or not |
|
find the nearest grid point of boundary points, and check whether the color of corresponding grid point is white or not |
|
|
|
:param low_B: ndarray, (n, 2), low dimension position of boundary points |
|
:param grid_view: ndarray, (resolution^2, 2), the position array of grid points |
|
:param decision_view: ndarray, (resolution^2, 3), the RGB color of grid points |
|
:param threshold: |
|
:return: |
|
""" |
|
if len(low_B) == 0 or low_B is None: |
|
return .0 |
|
|
|
grid_view = grid_view.reshape(-1, 2) |
|
decision_view = decision_view.reshape(-1, 3) |
|
|
|
|
|
nbs = NearestNeighbors(n_neighbors=1, algorithm="ball_tree").fit(grid_view) |
|
_, indices = nbs.kneighbors(low_B) |
|
indices = indices.squeeze() |
|
sample_colors = decision_view[indices] |
|
|
|
|
|
c1 = np.zeros(indices.shape[0], dtype=np.bool) |
|
c1[sample_colors[:, 0] > threshold] = 1 |
|
|
|
c2 = np.zeros(indices.shape[0], dtype=np.bool) |
|
c2[sample_colors[:, 1] > threshold] = 1 |
|
|
|
c3 = np.zeros(indices.shape[0], dtype=np.bool) |
|
c3[sample_colors[:, 2] > threshold] = 1 |
|
c = np.logical_and(c1, c2) |
|
c = np.logical_and(c, c3) |
|
|
|
|
|
return np.sum(c)/len(c) |