File size: 15,303 Bytes
f291f4a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
"""
Help functions to evaluate our visualization system
"""
import numpy as np
from pynndescent import NNDescent
from sklearn.neighbors import NearestNeighbors
from sklearn.manifold import trustworthiness
from scipy.stats import kendalltau, spearmanr, pearsonr, rankdata
def evaluate_isAlign(embeddingLeft, embeddingRight, align_metric=1):
lens = len(embeddingLeft)
align_indices = []
for i in range(lens):
dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[i])
if dist < align_metric:
align_indices.append(i)
return align_indices
def evaluate_isAlign_single(embeddingLeft, embeddingRight, selected_left, selected_right,align_metric=1):
lens = len(embeddingLeft)
align_indices_left = []
align_indices_right = []
if selected_left != -1:
for i in range(lens):
dist = np.linalg.norm( embeddingLeft[selected_left]-embeddingRight[i])
if dist < align_metric:
align_indices_right.append(i)
if selected_right != -1:
for i in range(lens):
dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[selected_right])
if dist < align_metric:
align_indices_left.append(i)
return align_indices_left, align_indices_right
def evaluate_isNearestNeighbour(embeddingLeft, embeddingRight, n_neighbors=15, metric="euclidean"):
"""
Find indices where none of the nearest neighbors in embeddingLeft are preserved in embeddingRight.
:param embeddingLeft: ndarray, first set of low dimensional representations
:param embeddingRight: ndarray, second set of low dimensional representations
:param n_neighbors: int, number of nearest neighbors to consider
:param metric: str, metric for nearest neighbor calculation, default "euclidean"
:return: list of indices where none of the neighbors are preserved
"""
n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0))
n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0]))))
nnd_left = NNDescent(
embeddingLeft,
n_neighbors=n_neighbors,
metric=metric,
n_trees=n_trees,
n_iters=n_iters,
max_candidates=60,
verbose=True
)
left_neighbors, _ = nnd_left.neighbor_graph
nnd_right = NNDescent(
embeddingRight,
n_neighbors=n_neighbors,
metric=metric,
n_trees=n_trees,
n_iters=n_iters,
max_candidates=60,
verbose=True
)
right_neighbors, _ = nnd_right.neighbor_graph
non_preserved_indices = []
for i in range(len(embeddingLeft)):
if len(np.intersect1d(left_neighbors[i], right_neighbors[i])) == 0:
non_preserved_indices.append(i)
return non_preserved_indices
def evaluate_isNearestNeighbour_single(embeddingLeft, embeddingRight, selected_left, selected_right, n_neighbors=15, metric="euclidean"):
n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0))
n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0]))))
nnd_left = NNDescent(
embeddingLeft,
n_neighbors=n_neighbors,
metric=metric,
n_trees=n_trees,
n_iters=n_iters,
max_candidates=60,
verbose=True
)
left_neighbors, _ = nnd_left.neighbor_graph
nnd_right = NNDescent(
embeddingRight,
n_neighbors=n_neighbors,
metric=metric,
n_trees=n_trees,
n_iters=n_iters,
max_candidates=60,
verbose=True
)
right_neighbors, _ = nnd_right.neighbor_graph
nearest_neighbors_leftE_rightS = []
nearest_neighbors_rightE_rightS = []
nearest_neighbors_leftE_leftS = []
nearest_neighbors_rightE_leftS = []
if selected_left != -1:
nearest_neighbors_leftE_leftS = left_neighbors[selected_left].tolist()
nearest_neighbors_rightE_leftS = right_neighbors[selected_left].tolist()
if selected_right != -1:
nearest_neighbors_leftE_rightS = left_neighbors[selected_right].tolist()
nearest_neighbors_rightE_rightS = right_neighbors[selected_right].tolist()
return nearest_neighbors_leftE_leftS, nearest_neighbors_leftE_rightS, nearest_neighbors_rightE_leftS, nearest_neighbors_rightE_rightS
def evaluate_proj_nn_perseverance_knn(data, embedding, n_neighbors, metric="euclidean", selected_indices=None):
"""
evaluate projection function, nn preserving property using knn algorithm
:param data: ndarray, high dimensional representations
:param embedding: ndarray, low dimensional representations
:param n_neighbors: int, the number of neighbors
:param metric: str, by default "euclidean"
:return nn property: float, nn preserving property
"""
n_trees = 5 + int(round((data.shape[0]) ** 0.5 / 20.0))
n_iters = max(5, int(round(np.log2(data.shape[0]))))
# get nearest neighbors
nnd = NNDescent(
data,
n_neighbors=n_neighbors,
metric=metric,
n_trees=n_trees,
n_iters=n_iters,
max_candidates=60,
verbose=True
)
high_ind, _ = nnd.neighbor_graph
nnd = NNDescent(
embedding,
n_neighbors=n_neighbors,
metric=metric,
n_trees=n_trees,
n_iters=n_iters,
max_candidates=60,
verbose=True
)
low_ind, _ = nnd.neighbor_graph
lens_data = len(data)
if selected_indices:
lens_data = len(selected_indices)
high_ind = high_ind[selected_indices]
low_ind = low_ind[selected_indices]
border_pres = np.zeros(lens_data)
for i in range(lens_data):
border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i]))
# return border_pres.mean(), border_pres.max(), border_pres.min()
return border_pres.mean()
from sklearn.metrics import pairwise_distances
import numpy as np
def subset_trustworthiness(X, X_subset, X_embedded, X_subset_embedded, n_neighbors=5):
# Compute pairwise distances in the original and embedded spaces
dist_X = pairwise_distances(X)
dist_X_embedded = pairwise_distances(X_embedded)
# Find the indices of the subset in the original dataset
subset_indices = np.where(np.isin(X, X_subset))[0]
# Neighbors in the original space for the subset data
neighbors_X = np.argsort(dist_X)[:, 1:n_neighbors+1]
subset_neighbors_X = neighbors_X[subset_indices]
# Neighbors in the embedded space for the subset embedded data
neighbors_X_embedded = np.argsort(dist_X_embedded)[:, 1:n_neighbors+1]
subset_neighbors_X_embedded = neighbors_X_embedded[subset_indices]
# Compute trustworthiness
trustworthiness_val = 0
for i, subset_index in enumerate(subset_indices):
for j, neighbor in enumerate(subset_neighbors_X_embedded[i]):
rank_in_X = np.where(neighbors_X[subset_index] == neighbor)[0][0]
if rank_in_X >= n_neighbors:
trustworthiness_val += rank_in_X - n_neighbors + 1
trustworthiness_val = trustworthiness_val / (X_subset.shape[0] * n_neighbors)
trustworthiness_val = 1 - (2. / n_neighbors) * trustworthiness_val
return trustworthiness_val
def evaluate_proj_nn_perseverance_trustworthiness(data, embedding, n_neighbors, metric="euclidean"):
"""
evaluate projection function, nn preserving property using trustworthiness formula
:param data: ndarray, high dimensional representations
:param embedding: ndarray, low dimensional representations
:param n_neighbors: int, the number of neighbors
:param metric: str, by default "euclidean"
:return nn property: float, nn preserving property
"""
t = trustworthiness(data, embedding, n_neighbors=n_neighbors, metric=metric)
return t
def evaluate_proj_boundary_perseverance_knn(data, embedding, high_centers, low_centers, n_neighbors):
"""
evaluate projection function, boundary preserving property
:param data: ndarray, high dimensional representations
:param embedding: ndarray, low dimensional representations
:param high_centers: ndarray, border points high dimensional representations
:param low_centers: ndarray, border points low dimensional representations
:param n_neighbors: int, the number of neighbors
:return boundary preserving property: float,boundary preserving property
"""
high_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4)
high_neigh.fit(high_centers)
high_ind = high_neigh.kneighbors(data, n_neighbors=n_neighbors, return_distance=False)
low_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4)
low_neigh.fit(low_centers)
low_ind = low_neigh.kneighbors(embedding, n_neighbors=n_neighbors, return_distance=False)
border_pres = np.zeros(len(data))
for i in range(len(data)):
border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i]))
# return border_pres.mean(), border_pres.max(), border_pres.min()
return border_pres.mean()
def evaluate_proj_temporal_perseverance_corr(alpha, delta_x):
"""
Evaluate temporal preserving property,
calculate the correlation between neighbor preserving rate and moving distance in low dim in a time sequence
:param alpha: ndarray, shape(N,) neighbor preserving rate
:param delta_x: ndarray, shape(N,), moved distance in low dim for each point
:return corr: ndarray, shape(N,), correlation for each point from temporal point of view
"""
alpha = alpha.T
delta_x = delta_x.T
shape = alpha.shape
data_num = shape[0]
corr = np.zeros(data_num)
for i in range(data_num):
# correlation, pvalue = spearmanr(alpha[:, i], delta_x[:, i])
correlation, pvalue = pearsonr(alpha[i], delta_x[i])
if np.isnan(correlation):
correlation = 0.0
corr[i] = correlation
return corr.mean(), corr.std()
def evaluate_inv_distance(data, inv_data):
"""
The distance between original data and reconstruction data
:param data: ndarray, high dimensional data
:param inv_data: ndarray, reconstruction data
:return err: mse, reconstruction error
"""
return np.linalg.norm(data-inv_data, axis=1).mean()
def evaluate_inv_accu(labels, pred):
"""
prediction accuracy of reconstruction data
:param labels: ndarray, shape(N,), label for each point
:param pred: ndarray, shape(N,), prediction for each point
:return accu: float, the reconstruction accuracy
"""
return np.sum(labels == pred) / len(labels)
def evaluate_inv_conf(labels, ori_pred, new_pred):
"""
the confidence difference between original data and reconstruction data
:param labels: ndarray, shape(N,), the original prediction for each point
:param ori_pred: ndarray, shape(N,10), the prediction of original data
:param new_pred: ndarray, shape(N,10), the prediction of reconstruction data
:return diff: float, the mean of confidence difference for each point
"""
old_conf = [ori_pred[i, labels[i]] for i in range(len(labels))]
new_conf = [new_pred[i, labels[i]] for i in range(len(labels))]
old_conf = np.array(old_conf)
new_conf = np.array(new_conf)
diff = np.abs(old_conf - new_conf)
# return diff.mean(), diff.max(), diff.min()
return diff.mean()
def evaluate_proj_temporal_perseverance_entropy(alpha, delta_x):
"""
(discard)
calculate the temporal preserving property
based on the correlation between the entropy of moved distance and neighbor preserving rate(alpha)
:param alpha: ndarray, shape(N,), neighbor preserving rate for each point
:param delta_x: ndarray, shape(N,), the moved distance in low dim for each point
:return corr: float, the mean of all correlation
"""
alpha = alpha.T
delta_x = delta_x.T
shape = alpha.shape
data_num = shape[0]
# normalize
# delta_x_norm = delta_x.max(-1)
# delta_x_norm = (delta_x.T/delta_x_norm).T
delta_x_norm = delta_x.max()
delta_x_norm = delta_x / delta_x_norm
alpha = np.floor(alpha*10)
delta_x_norm = np.floor(delta_x_norm*10)
corr = np.zeros(len(alpha))
# samples
for i in range(len(alpha)):
# alpha0-alpha9
index = list()
entropy = list()
for j in range(11):
dx = delta_x_norm[i][np.where(alpha[i] == j)]
entropy_x = np.zeros(11)
for k in range(11):
entropy_x[k] = np.sum(dx == k)
if np.sum(entropy_x) == 0:
continue
else:
entropy_x = entropy_x / np.sum(entropy_x+10e-8)
entropy_x = np.sum(entropy_x*np.log(entropy_x+10e-8))
entropy.append(-entropy_x)
index.append(j)
if len(index) < 2:
print("no enough data to form a correlation, setting correlation to be 0")
corr[i] = 0
else:
correlation, _ = pearsonr(index, entropy)
corr[i] = correlation
return corr.mean()
def evaluate_proj_temporal_global_corr(high_rank, low_rank):
l = len(high_rank)
tau_l = np.zeros(l)
p_l = np.zeros(l)
for i in range(l):
r1 = high_rank[i]
r2 = low_rank[i]
tau, p = spearmanr(r1, r2)
tau_l[i] = tau
p_l[i] = p
return tau_l, p_l
def _wcov(x, y, w, ms):
return np.sum(w * (x - ms[0]) * (y - ms[1]))
def _wpearson(x, y, w):
mx, my = (np.sum(i * w) / np.sum(w) for i in [x, y])
return _wcov(x, y, w, [mx, my]) / np.sqrt(_wcov(x, x, w, [mx, mx]) * _wcov(y, y, w, [my, my]))
def evaluate_proj_temporal_weighted_global_corr(high_rank, low_rank):
k = len(high_rank)
r = rankdata(high_rank).astype("int")-1
tau = _wpearson(high_rank[r], low_rank[r], 1/np.arange(1, k+1))
return tau
def evaluate_keep_B(low_B, grid_view, decision_view, threshold=0.8):
"""
evaluate whether high dimensional boundary points still lying on Boundary in low-dimensional space or not
find the nearest grid point of boundary points, and check whether the color of corresponding grid point is white or not
:param low_B: ndarray, (n, 2), low dimension position of boundary points
:param grid_view: ndarray, (resolution^2, 2), the position array of grid points
:param decision_view: ndarray, (resolution^2, 3), the RGB color of grid points
:param threshold:
:return:
"""
if len(low_B) == 0 or low_B is None:
return .0
# reshape grid and decision view
grid_view = grid_view.reshape(-1, 2)
decision_view = decision_view.reshape(-1, 3)
# find the color of nearest grid view
nbs = NearestNeighbors(n_neighbors=1, algorithm="ball_tree").fit(grid_view)
_, indices = nbs.kneighbors(low_B)
indices = indices.squeeze()
sample_colors = decision_view[indices]
# check whether 3 channel are above a predefined threshold
c1 = np.zeros(indices.shape[0], dtype=np.bool)
c1[sample_colors[:, 0] > threshold] = 1
c2 = np.zeros(indices.shape[0], dtype=np.bool)
c2[sample_colors[:, 1] > threshold] = 1
c3 = np.zeros(indices.shape[0], dtype=np.bool)
c3[sample_colors[:, 2] > threshold] = 1
c = np.logical_and(c1, c2)
c = np.logical_and(c, c3)
# return the ratio of boundary points that still lie on boundary after dimension reduction
return np.sum(c)/len(c) |