""" Help functions to evaluate our visualization system """ import numpy as np from pynndescent import NNDescent from sklearn.neighbors import NearestNeighbors from sklearn.manifold import trustworthiness from scipy.stats import kendalltau, spearmanr, pearsonr, rankdata def evaluate_isAlign(embeddingLeft, embeddingRight, align_metric=1): lens = len(embeddingLeft) align_indices = [] for i in range(lens): dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[i]) if dist < align_metric: align_indices.append(i) return align_indices def evaluate_isAlign_single(embeddingLeft, embeddingRight, selected_left, selected_right,align_metric=1): lens = len(embeddingLeft) align_indices_left = [] align_indices_right = [] if selected_left != -1: for i in range(lens): dist = np.linalg.norm( embeddingLeft[selected_left]-embeddingRight[i]) if dist < align_metric: align_indices_right.append(i) if selected_right != -1: for i in range(lens): dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[selected_right]) if dist < align_metric: align_indices_left.append(i) return align_indices_left, align_indices_right def evaluate_isNearestNeighbour(embeddingLeft, embeddingRight, n_neighbors=15, metric="euclidean"): """ Find indices where none of the nearest neighbors in embeddingLeft are preserved in embeddingRight. :param embeddingLeft: ndarray, first set of low dimensional representations :param embeddingRight: ndarray, second set of low dimensional representations :param n_neighbors: int, number of nearest neighbors to consider :param metric: str, metric for nearest neighbor calculation, default "euclidean" :return: list of indices where none of the neighbors are preserved """ n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) nnd_left = NNDescent( embeddingLeft, n_neighbors=n_neighbors, metric=metric, n_trees=n_trees, n_iters=n_iters, max_candidates=60, verbose=True ) left_neighbors, _ = nnd_left.neighbor_graph nnd_right = NNDescent( embeddingRight, n_neighbors=n_neighbors, metric=metric, n_trees=n_trees, n_iters=n_iters, max_candidates=60, verbose=True ) right_neighbors, _ = nnd_right.neighbor_graph non_preserved_indices = [] for i in range(len(embeddingLeft)): if len(np.intersect1d(left_neighbors[i], right_neighbors[i])) == 0: non_preserved_indices.append(i) return non_preserved_indices def evaluate_isNearestNeighbour_single(embeddingLeft, embeddingRight, selected_left, selected_right, n_neighbors=15, metric="euclidean"): n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) nnd_left = NNDescent( embeddingLeft, n_neighbors=n_neighbors, metric=metric, n_trees=n_trees, n_iters=n_iters, max_candidates=60, verbose=True ) left_neighbors, _ = nnd_left.neighbor_graph nnd_right = NNDescent( embeddingRight, n_neighbors=n_neighbors, metric=metric, n_trees=n_trees, n_iters=n_iters, max_candidates=60, verbose=True ) right_neighbors, _ = nnd_right.neighbor_graph nearest_neighbors_leftE_rightS = [] nearest_neighbors_rightE_rightS = [] nearest_neighbors_leftE_leftS = [] nearest_neighbors_rightE_leftS = [] if selected_left != -1: nearest_neighbors_leftE_leftS = left_neighbors[selected_left].tolist() nearest_neighbors_rightE_leftS = right_neighbors[selected_left].tolist() if selected_right != -1: nearest_neighbors_leftE_rightS = left_neighbors[selected_right].tolist() nearest_neighbors_rightE_rightS = right_neighbors[selected_right].tolist() return nearest_neighbors_leftE_leftS, nearest_neighbors_leftE_rightS, nearest_neighbors_rightE_leftS, nearest_neighbors_rightE_rightS def evaluate_proj_nn_perseverance_knn(data, embedding, n_neighbors, metric="euclidean", selected_indices=None): """ evaluate projection function, nn preserving property using knn algorithm :param data: ndarray, high dimensional representations :param embedding: ndarray, low dimensional representations :param n_neighbors: int, the number of neighbors :param metric: str, by default "euclidean" :return nn property: float, nn preserving property """ n_trees = 5 + int(round((data.shape[0]) ** 0.5 / 20.0)) n_iters = max(5, int(round(np.log2(data.shape[0])))) # get nearest neighbors nnd = NNDescent( data, n_neighbors=n_neighbors, metric=metric, n_trees=n_trees, n_iters=n_iters, max_candidates=60, verbose=True ) high_ind, _ = nnd.neighbor_graph nnd = NNDescent( embedding, n_neighbors=n_neighbors, metric=metric, n_trees=n_trees, n_iters=n_iters, max_candidates=60, verbose=True ) low_ind, _ = nnd.neighbor_graph lens_data = len(data) if selected_indices: lens_data = len(selected_indices) high_ind = high_ind[selected_indices] low_ind = low_ind[selected_indices] border_pres = np.zeros(lens_data) for i in range(lens_data): border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) # return border_pres.mean(), border_pres.max(), border_pres.min() return border_pres.mean() from sklearn.metrics import pairwise_distances import numpy as np def subset_trustworthiness(X, X_subset, X_embedded, X_subset_embedded, n_neighbors=5): # Compute pairwise distances in the original and embedded spaces dist_X = pairwise_distances(X) dist_X_embedded = pairwise_distances(X_embedded) # Find the indices of the subset in the original dataset subset_indices = np.where(np.isin(X, X_subset))[0] # Neighbors in the original space for the subset data neighbors_X = np.argsort(dist_X)[:, 1:n_neighbors+1] subset_neighbors_X = neighbors_X[subset_indices] # Neighbors in the embedded space for the subset embedded data neighbors_X_embedded = np.argsort(dist_X_embedded)[:, 1:n_neighbors+1] subset_neighbors_X_embedded = neighbors_X_embedded[subset_indices] # Compute trustworthiness trustworthiness_val = 0 for i, subset_index in enumerate(subset_indices): for j, neighbor in enumerate(subset_neighbors_X_embedded[i]): rank_in_X = np.where(neighbors_X[subset_index] == neighbor)[0][0] if rank_in_X >= n_neighbors: trustworthiness_val += rank_in_X - n_neighbors + 1 trustworthiness_val = trustworthiness_val / (X_subset.shape[0] * n_neighbors) trustworthiness_val = 1 - (2. / n_neighbors) * trustworthiness_val return trustworthiness_val def evaluate_proj_nn_perseverance_trustworthiness(data, embedding, n_neighbors, metric="euclidean"): """ evaluate projection function, nn preserving property using trustworthiness formula :param data: ndarray, high dimensional representations :param embedding: ndarray, low dimensional representations :param n_neighbors: int, the number of neighbors :param metric: str, by default "euclidean" :return nn property: float, nn preserving property """ t = trustworthiness(data, embedding, n_neighbors=n_neighbors, metric=metric) return t def evaluate_proj_boundary_perseverance_knn(data, embedding, high_centers, low_centers, n_neighbors): """ evaluate projection function, boundary preserving property :param data: ndarray, high dimensional representations :param embedding: ndarray, low dimensional representations :param high_centers: ndarray, border points high dimensional representations :param low_centers: ndarray, border points low dimensional representations :param n_neighbors: int, the number of neighbors :return boundary preserving property: float,boundary preserving property """ high_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) high_neigh.fit(high_centers) high_ind = high_neigh.kneighbors(data, n_neighbors=n_neighbors, return_distance=False) low_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) low_neigh.fit(low_centers) low_ind = low_neigh.kneighbors(embedding, n_neighbors=n_neighbors, return_distance=False) border_pres = np.zeros(len(data)) for i in range(len(data)): border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) # return border_pres.mean(), border_pres.max(), border_pres.min() return border_pres.mean() def evaluate_proj_temporal_perseverance_corr(alpha, delta_x): """ Evaluate temporal preserving property, calculate the correlation between neighbor preserving rate and moving distance in low dim in a time sequence :param alpha: ndarray, shape(N,) neighbor preserving rate :param delta_x: ndarray, shape(N,), moved distance in low dim for each point :return corr: ndarray, shape(N,), correlation for each point from temporal point of view """ alpha = alpha.T delta_x = delta_x.T shape = alpha.shape data_num = shape[0] corr = np.zeros(data_num) for i in range(data_num): # correlation, pvalue = spearmanr(alpha[:, i], delta_x[:, i]) correlation, pvalue = pearsonr(alpha[i], delta_x[i]) if np.isnan(correlation): correlation = 0.0 corr[i] = correlation return corr.mean(), corr.std() def evaluate_inv_distance(data, inv_data): """ The distance between original data and reconstruction data :param data: ndarray, high dimensional data :param inv_data: ndarray, reconstruction data :return err: mse, reconstruction error """ return np.linalg.norm(data-inv_data, axis=1).mean() def evaluate_inv_accu(labels, pred): """ prediction accuracy of reconstruction data :param labels: ndarray, shape(N,), label for each point :param pred: ndarray, shape(N,), prediction for each point :return accu: float, the reconstruction accuracy """ return np.sum(labels == pred) / len(labels) def evaluate_inv_conf(labels, ori_pred, new_pred): """ the confidence difference between original data and reconstruction data :param labels: ndarray, shape(N,), the original prediction for each point :param ori_pred: ndarray, shape(N,10), the prediction of original data :param new_pred: ndarray, shape(N,10), the prediction of reconstruction data :return diff: float, the mean of confidence difference for each point """ old_conf = [ori_pred[i, labels[i]] for i in range(len(labels))] new_conf = [new_pred[i, labels[i]] for i in range(len(labels))] old_conf = np.array(old_conf) new_conf = np.array(new_conf) diff = np.abs(old_conf - new_conf) # return diff.mean(), diff.max(), diff.min() return diff.mean() def evaluate_proj_temporal_perseverance_entropy(alpha, delta_x): """ (discard) calculate the temporal preserving property based on the correlation between the entropy of moved distance and neighbor preserving rate(alpha) :param alpha: ndarray, shape(N,), neighbor preserving rate for each point :param delta_x: ndarray, shape(N,), the moved distance in low dim for each point :return corr: float, the mean of all correlation """ alpha = alpha.T delta_x = delta_x.T shape = alpha.shape data_num = shape[0] # normalize # delta_x_norm = delta_x.max(-1) # delta_x_norm = (delta_x.T/delta_x_norm).T delta_x_norm = delta_x.max() delta_x_norm = delta_x / delta_x_norm alpha = np.floor(alpha*10) delta_x_norm = np.floor(delta_x_norm*10) corr = np.zeros(len(alpha)) # samples for i in range(len(alpha)): # alpha0-alpha9 index = list() entropy = list() for j in range(11): dx = delta_x_norm[i][np.where(alpha[i] == j)] entropy_x = np.zeros(11) for k in range(11): entropy_x[k] = np.sum(dx == k) if np.sum(entropy_x) == 0: continue else: entropy_x = entropy_x / np.sum(entropy_x+10e-8) entropy_x = np.sum(entropy_x*np.log(entropy_x+10e-8)) entropy.append(-entropy_x) index.append(j) if len(index) < 2: print("no enough data to form a correlation, setting correlation to be 0") corr[i] = 0 else: correlation, _ = pearsonr(index, entropy) corr[i] = correlation return corr.mean() def evaluate_proj_temporal_global_corr(high_rank, low_rank): l = len(high_rank) tau_l = np.zeros(l) p_l = np.zeros(l) for i in range(l): r1 = high_rank[i] r2 = low_rank[i] tau, p = spearmanr(r1, r2) tau_l[i] = tau p_l[i] = p return tau_l, p_l def _wcov(x, y, w, ms): return np.sum(w * (x - ms[0]) * (y - ms[1])) def _wpearson(x, y, w): mx, my = (np.sum(i * w) / np.sum(w) for i in [x, y]) return _wcov(x, y, w, [mx, my]) / np.sqrt(_wcov(x, x, w, [mx, mx]) * _wcov(y, y, w, [my, my])) def evaluate_proj_temporal_weighted_global_corr(high_rank, low_rank): k = len(high_rank) r = rankdata(high_rank).astype("int")-1 tau = _wpearson(high_rank[r], low_rank[r], 1/np.arange(1, k+1)) return tau def evaluate_keep_B(low_B, grid_view, decision_view, threshold=0.8): """ evaluate whether high dimensional boundary points still lying on Boundary in low-dimensional space or not find the nearest grid point of boundary points, and check whether the color of corresponding grid point is white or not :param low_B: ndarray, (n, 2), low dimension position of boundary points :param grid_view: ndarray, (resolution^2, 2), the position array of grid points :param decision_view: ndarray, (resolution^2, 3), the RGB color of grid points :param threshold: :return: """ if len(low_B) == 0 or low_B is None: return .0 # reshape grid and decision view grid_view = grid_view.reshape(-1, 2) decision_view = decision_view.reshape(-1, 3) # find the color of nearest grid view nbs = NearestNeighbors(n_neighbors=1, algorithm="ball_tree").fit(grid_view) _, indices = nbs.kneighbors(low_B) indices = indices.squeeze() sample_colors = decision_view[indices] # check whether 3 channel are above a predefined threshold c1 = np.zeros(indices.shape[0], dtype=np.bool) c1[sample_colors[:, 0] > threshold] = 1 c2 = np.zeros(indices.shape[0], dtype=np.bool) c2[sample_colors[:, 1] > threshold] = 1 c3 = np.zeros(indices.shape[0], dtype=np.bool) c3[sample_colors[:, 2] > threshold] = 1 c = np.logical_and(c1, c2) c = np.logical_and(c, c3) # return the ratio of boundary points that still lie on boundary after dimension reduction return np.sum(c)/len(c)