""" |
Help functions to evaluate our visualization system |
""" |
import numpy as np |
from pynndescent import NNDescent |
from sklearn.neighbors import NearestNeighbors |
from sklearn.manifold import trustworthiness |
from scipy.stats import kendalltau, spearmanr, pearsonr, rankdata |
def evaluate_isAlign(embeddingLeft, embeddingRight, align_metric=1): |
lens = len(embeddingLeft) |
align_indices = [] |
for i in range(lens): |
dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[i]) |
if dist < align_metric: |
align_indices.append(i) |
return align_indices |
def evaluate_isAlign_single(embeddingLeft, embeddingRight, selected_left, selected_right,align_metric=1): |
lens = len(embeddingLeft) |
align_indices_left = [] |
align_indices_right = [] |
if selected_left != -1: |
for i in range(lens): |
dist = np.linalg.norm( embeddingLeft[selected_left]-embeddingRight[i]) |
if dist < align_metric: |
align_indices_right.append(i) |
if selected_right != -1: |
for i in range(lens): |
dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[selected_right]) |
if dist < align_metric: |
align_indices_left.append(i) |
return align_indices_left, align_indices_right |
def evaluate_isNearestNeighbour(embeddingLeft, embeddingRight, n_neighbors=15, metric="euclidean"): |
""" |
Find indices where none of the nearest neighbors in embeddingLeft are preserved in embeddingRight. |
:param embeddingLeft: ndarray, first set of low dimensional representations |
:param embeddingRight: ndarray, second set of low dimensional representations |
:param n_neighbors: int, number of nearest neighbors to consider |
:param metric: str, metric for nearest neighbor calculation, default "euclidean" |
:return: list of indices where none of the neighbors are preserved |
""" |
n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) |
n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) |
nnd_left = NNDescent( |
embeddingLeft, |
n_neighbors=n_neighbors, |
metric=metric, |
n_trees=n_trees, |
n_iters=n_iters, |
max_candidates=60, |
verbose=True |
) |
left_neighbors, _ = nnd_left.neighbor_graph |
nnd_right = NNDescent( |
embeddingRight, |
n_neighbors=n_neighbors, |
metric=metric, |
n_trees=n_trees, |
n_iters=n_iters, |
max_candidates=60, |
verbose=True |
) |
right_neighbors, _ = nnd_right.neighbor_graph |
non_preserved_indices = [] |
for i in range(len(embeddingLeft)): |
if len(np.intersect1d(left_neighbors[i], right_neighbors[i])) == 0: |
non_preserved_indices.append(i) |
return non_preserved_indices |
def evaluate_isNearestNeighbour_single(embeddingLeft, embeddingRight, selected_left, selected_right, n_neighbors=15, metric="euclidean"): |
n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) |
n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) |
nnd_left = NNDescent( |
embeddingLeft, |
n_neighbors=n_neighbors, |
metric=metric, |
n_trees=n_trees, |
n_iters=n_iters, |
max_candidates=60, |
verbose=True |
) |
left_neighbors, _ = nnd_left.neighbor_graph |
nnd_right = NNDescent( |
embeddingRight, |
n_neighbors=n_neighbors, |
metric=metric, |
n_trees=n_trees, |
n_iters=n_iters, |
max_candidates=60, |
verbose=True |
) |
right_neighbors, _ = nnd_right.neighbor_graph |
nearest_neighbors_leftE_rightS = [] |
nearest_neighbors_rightE_rightS = [] |
nearest_neighbors_leftE_leftS = [] |
nearest_neighbors_rightE_leftS = [] |
if selected_left != -1: |
nearest_neighbors_leftE_leftS = left_neighbors[selected_left].tolist() |
nearest_neighbors_rightE_leftS = right_neighbors[selected_left].tolist() |
if selected_right != -1: |
nearest_neighbors_leftE_rightS = left_neighbors[selected_right].tolist() |
nearest_neighbors_rightE_rightS = right_neighbors[selected_right].tolist() |
return nearest_neighbors_leftE_leftS, nearest_neighbors_leftE_rightS, nearest_neighbors_rightE_leftS, nearest_neighbors_rightE_rightS |
def evaluate_proj_nn_perseverance_knn(data, embedding, n_neighbors, metric="euclidean", selected_indices=None): |
""" |
evaluate projection function, nn preserving property using knn algorithm |
:param data: ndarray, high dimensional representations |
:param embedding: ndarray, low dimensional representations |
:param n_neighbors: int, the number of neighbors |
:param metric: str, by default "euclidean" |
:return nn property: float, nn preserving property |
""" |
n_trees = 5 + int(round((data.shape[0]) ** 0.5 / 20.0)) |
n_iters = max(5, int(round(np.log2(data.shape[0])))) |
nnd = NNDescent( |
data, |
n_neighbors=n_neighbors, |
metric=metric, |
n_trees=n_trees, |
n_iters=n_iters, |
max_candidates=60, |
verbose=True |
) |
high_ind, _ = nnd.neighbor_graph |
nnd = NNDescent( |
embedding, |
n_neighbors=n_neighbors, |
metric=metric, |
n_trees=n_trees, |
n_iters=n_iters, |
max_candidates=60, |
verbose=True |
) |
low_ind, _ = nnd.neighbor_graph |
lens_data = len(data) |
if selected_indices: |
lens_data = len(selected_indices) |
high_ind = high_ind[selected_indices] |
low_ind = low_ind[selected_indices] |
border_pres = np.zeros(lens_data) |
for i in range(lens_data): |
border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) |
return border_pres.mean() |
from sklearn.metrics import pairwise_distances |
import numpy as np |
def subset_trustworthiness(X, X_subset, X_embedded, X_subset_embedded, n_neighbors=5): |
dist_X = pairwise_distances(X) |
dist_X_embedded = pairwise_distances(X_embedded) |
subset_indices = np.where(np.isin(X, X_subset))[0] |
neighbors_X = np.argsort(dist_X)[:, 1:n_neighbors+1] |
subset_neighbors_X = neighbors_X[subset_indices] |
neighbors_X_embedded = np.argsort(dist_X_embedded)[:, 1:n_neighbors+1] |
subset_neighbors_X_embedded = neighbors_X_embedded[subset_indices] |
trustworthiness_val = 0 |
for i, subset_index in enumerate(subset_indices): |
for j, neighbor in enumerate(subset_neighbors_X_embedded[i]): |
rank_in_X = np.where(neighbors_X[subset_index] == neighbor)[0][0] |
if rank_in_X >= n_neighbors: |
trustworthiness_val += rank_in_X - n_neighbors + 1 |
trustworthiness_val = trustworthiness_val / (X_subset.shape[0] * n_neighbors) |
trustworthiness_val = 1 - (2. / n_neighbors) * trustworthiness_val |
return trustworthiness_val |
def evaluate_proj_nn_perseverance_trustworthiness(data, embedding, n_neighbors, metric="euclidean"): |
""" |
evaluate projection function, nn preserving property using trustworthiness formula |
:param data: ndarray, high dimensional representations |
:param embedding: ndarray, low dimensional representations |
:param n_neighbors: int, the number of neighbors |
:param metric: str, by default "euclidean" |
:return nn property: float, nn preserving property |
""" |
t = trustworthiness(data, embedding, n_neighbors=n_neighbors, metric=metric) |
return t |
def evaluate_proj_boundary_perseverance_knn(data, embedding, high_centers, low_centers, n_neighbors): |
""" |
evaluate projection function, boundary preserving property |
:param data: ndarray, high dimensional representations |
:param embedding: ndarray, low dimensional representations |
:param high_centers: ndarray, border points high dimensional representations |
:param low_centers: ndarray, border points low dimensional representations |
:param n_neighbors: int, the number of neighbors |
:return boundary preserving property: float,boundary preserving property |
""" |
high_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) |
high_neigh.fit(high_centers) |
high_ind = high_neigh.kneighbors(data, n_neighbors=n_neighbors, return_distance=False) |
low_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) |
low_neigh.fit(low_centers) |
low_ind = low_neigh.kneighbors(embedding, n_neighbors=n_neighbors, return_distance=False) |
border_pres = np.zeros(len(data)) |
for i in range(len(data)): |
border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) |
return border_pres.mean() |
def evaluate_proj_temporal_perseverance_corr(alpha, delta_x): |
""" |
Evaluate temporal preserving property, |
calculate the correlation between neighbor preserving rate and moving distance in low dim in a time sequence |
:param alpha: ndarray, shape(N,) neighbor preserving rate |
:param delta_x: ndarray, shape(N,), moved distance in low dim for each point |
:return corr: ndarray, shape(N,), correlation for each point from temporal point of view |
""" |
alpha = alpha.T |
delta_x = delta_x.T |
shape = alpha.shape |
data_num = shape[0] |
corr = np.zeros(data_num) |
for i in range(data_num): |
correlation, pvalue = pearsonr(alpha[i], delta_x[i]) |
if np.isnan(correlation): |
correlation = 0.0 |
corr[i] = correlation |
return corr.mean(), corr.std() |
def evaluate_inv_distance(data, inv_data): |
""" |
The distance between original data and reconstruction data |
:param data: ndarray, high dimensional data |
:param inv_data: ndarray, reconstruction data |
:return err: mse, reconstruction error |
""" |
return np.linalg.norm(data-inv_data, axis=1).mean() |
def evaluate_inv_accu(labels, pred): |
""" |
prediction accuracy of reconstruction data |
:param labels: ndarray, shape(N,), label for each point |
:param pred: ndarray, shape(N,), prediction for each point |
:return accu: float, the reconstruction accuracy |
""" |
return np.sum(labels == pred) / len(labels) |
def evaluate_inv_conf(labels, ori_pred, new_pred): |
""" |
the confidence difference between original data and reconstruction data |
:param labels: ndarray, shape(N,), the original prediction for each point |
:param ori_pred: ndarray, shape(N,10), the prediction of original data |
:param new_pred: ndarray, shape(N,10), the prediction of reconstruction data |
:return diff: float, the mean of confidence difference for each point |
""" |
old_conf = [ori_pred[i, labels[i]] for i in range(len(labels))] |
new_conf = [new_pred[i, labels[i]] for i in range(len(labels))] |
old_conf = np.array(old_conf) |
new_conf = np.array(new_conf) |
diff = np.abs(old_conf - new_conf) |
return diff.mean() |
def evaluate_proj_temporal_perseverance_entropy(alpha, delta_x): |
""" |
(discard) |
calculate the temporal preserving property |
based on the correlation between the entropy of moved distance and neighbor preserving rate(alpha) |
:param alpha: ndarray, shape(N,), neighbor preserving rate for each point |
:param delta_x: ndarray, shape(N,), the moved distance in low dim for each point |
:return corr: float, the mean of all correlation |
""" |
alpha = alpha.T |
delta_x = delta_x.T |
shape = alpha.shape |
data_num = shape[0] |
delta_x_norm = delta_x.max() |
delta_x_norm = delta_x / delta_x_norm |
alpha = np.floor(alpha*10) |
delta_x_norm = np.floor(delta_x_norm*10) |
corr = np.zeros(len(alpha)) |
for i in range(len(alpha)): |
index = list() |
entropy = list() |
for j in range(11): |
dx = delta_x_norm[i][np.where(alpha[i] == j)] |
entropy_x = np.zeros(11) |
for k in range(11): |
entropy_x[k] = np.sum(dx == k) |
if np.sum(entropy_x) == 0: |
continue |
else: |
entropy_x = entropy_x / np.sum(entropy_x+10e-8) |
entropy_x = np.sum(entropy_x*np.log(entropy_x+10e-8)) |
entropy.append(-entropy_x) |
index.append(j) |
if len(index) < 2: |
print("no enough data to form a correlation, setting correlation to be 0") |
corr[i] = 0 |
else: |
correlation, _ = pearsonr(index, entropy) |
corr[i] = correlation |
return corr.mean() |
def evaluate_proj_temporal_global_corr(high_rank, low_rank): |
l = len(high_rank) |
tau_l = np.zeros(l) |
p_l = np.zeros(l) |
for i in range(l): |
r1 = high_rank[i] |
r2 = low_rank[i] |
tau, p = spearmanr(r1, r2) |
tau_l[i] = tau |
p_l[i] = p |
return tau_l, p_l |
def _wcov(x, y, w, ms): |
return np.sum(w * (x - ms[0]) * (y - ms[1])) |
def _wpearson(x, y, w): |
mx, my = (np.sum(i * w) / np.sum(w) for i in [x, y]) |
return _wcov(x, y, w, [mx, my]) / np.sqrt(_wcov(x, x, w, [mx, mx]) * _wcov(y, y, w, [my, my])) |
def evaluate_proj_temporal_weighted_global_corr(high_rank, low_rank): |
k = len(high_rank) |
r = rankdata(high_rank).astype("int")-1 |
tau = _wpearson(high_rank[r], low_rank[r], 1/np.arange(1, k+1)) |
return tau |
def evaluate_keep_B(low_B, grid_view, decision_view, threshold=0.8): |
""" |
evaluate whether high dimensional boundary points still lying on Boundary in low-dimensional space or not |
find the nearest grid point of boundary points, and check whether the color of corresponding grid point is white or not |
:param low_B: ndarray, (n, 2), low dimension position of boundary points |
:param grid_view: ndarray, (resolution^2, 2), the position array of grid points |
:param decision_view: ndarray, (resolution^2, 3), the RGB color of grid points |
:param threshold: |
:return: |
""" |
if len(low_B) == 0 or low_B is None: |
return .0 |
grid_view = grid_view.reshape(-1, 2) |
decision_view = decision_view.reshape(-1, 3) |
nbs = NearestNeighbors(n_neighbors=1, algorithm="ball_tree").fit(grid_view) |
_, indices = nbs.kneighbors(low_B) |
indices = indices.squeeze() |
sample_colors = decision_view[indices] |
c1 = np.zeros(indices.shape[0], dtype=np.bool) |
c1[sample_colors[:, 0] > threshold] = 1 |
c2 = np.zeros(indices.shape[0], dtype=np.bool) |
c2[sample_colors[:, 1] > threshold] = 1 |
c3 = np.zeros(indices.shape[0], dtype=np.bool) |
c3[sample_colors[:, 2] > threshold] = 1 |
c = np.logical_and(c1, c2) |
c = np.logical_and(c, c3) |
return np.sum(c)/len(c) |