|
|
|
|
|
|
|
|
|
import json |
|
import logging |
|
from typing import List |
|
|
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import pandas as pd |
|
import plotly.express as px |
|
from imblearn.under_sampling import RandomUnderSampler |
|
from Levenshtein import distance |
|
from sklearn.linear_model import LogisticRegression |
|
from sklearn.model_selection import train_test_split |
|
|
|
from ..utils.file import load, save |
|
from ..utils.tcga_post_analysis_utils import Results_Handler |
|
from .utlis import compute_prc, compute_roc |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
def get_lev_dist(seqs_a_list:List[str],seqs_b_list:List[str]): |
|
''' |
|
compute levenstein distance between two lists of sequences and normalize by the length of the longest sequence |
|
The lev distance is computed between seqs_a_list[i] and seqs_b_list[i] |
|
''' |
|
lev_dist = [] |
|
for i in range(len(seqs_a_list)): |
|
dist = distance(seqs_a_list[i],seqs_b_list[i]) |
|
|
|
dist = dist/max(len(seqs_a_list[i]),len(seqs_b_list[i])) |
|
lev_dist.append(dist) |
|
return lev_dist |
|
|
|
def get_closest_neighbors(results:Results_Handler,query_embedds:np.ndarray,num_neighbors:int=1): |
|
''' |
|
get the closest neighbors to the query embedds using the knn model in results |
|
The closest neighbors are to be found in the training set |
|
''' |
|
|
|
query_embedds = query_embedds/np.linalg.norm(query_embedds,axis=1)[:,None] |
|
|
|
distances, indices = results.knn_model.kneighbors(query_embedds) |
|
distances = distances[:,:num_neighbors].flatten() |
|
|
|
|
|
indices = indices[:,:num_neighbors] |
|
|
|
top_n_seqs = np.array(results.knn_seqs)[indices][:,:num_neighbors] |
|
top_n_seqs = [seq[0] for sublist in top_n_seqs for seq in sublist] |
|
top_n_labels = np.array(results.knn_labels)[indices][:,:num_neighbors] |
|
top_n_labels = [label[0] for sublist in top_n_labels for label in sublist] |
|
|
|
return top_n_seqs,top_n_labels,distances |
|
|
|
def get_closest_ngbr_per_split(results:Results_Handler,split:str,num_neighbors:int=1): |
|
''' |
|
compute levenstein distance between the sequences in split and their closest neighbors in the training set |
|
''' |
|
split_df = results.splits_df_dict[f'{split}_df'] |
|
|
|
logger.debug(f'number of sequences in {split} is {split_df.shape[0]}') |
|
|
|
try: |
|
split_seqs = split_df[results.seq_col].values[:,0] |
|
except: |
|
split_seqs = split_df[results.seq_col].values |
|
try: |
|
split_labels = split_df[results.label_col].values[:,0] |
|
except: |
|
split_labels = None |
|
|
|
embedds = split_df[results.embedds_cols].values |
|
|
|
top_n_seqs,top_n_labels,distances = get_closest_neighbors(results,embedds,num_neighbors) |
|
|
|
|
|
split_seqs = [seq for seq in split_seqs for _ in range(num_neighbors)] |
|
lev_dist = get_lev_dist(split_seqs,top_n_seqs) |
|
return split_seqs,split_labels,top_n_seqs,top_n_labels,distances,lev_dist |
|
|
|
|
|
def log_lev_params(threshold:float,analysis_path:str): |
|
model_params = {"Threshold": threshold} |
|
model_params = eval(json.dumps(model_params)) |
|
save(data = model_params,path=analysis_path+"/novelty_model_coef.yaml") |
|
|
|
def lev_clf(set_a,set_b,random_state): |
|
|
|
y = np.concatenate((np.zeros(len(set_a)),np.ones(len(set_b)))) |
|
|
|
lev_dist = np.concatenate((set_a,set_b)) |
|
|
|
oversample = RandomUnderSampler(sampling_strategy='majority',random_state=random_state) |
|
lev_dist, y = oversample.fit_resample(lev_dist.reshape(-1,1), y) |
|
|
|
lev_dist = lev_dist.reshape(-1,1) |
|
|
|
X_train, X_test, y_train, y_test = train_test_split(lev_dist, y, test_size=0.33, random_state=random_state) |
|
|
|
model = LogisticRegression(solver='lbfgs') |
|
|
|
model.fit(X_train, y_train) |
|
|
|
lr_probs = model.predict_proba(X_test)[:, 1] |
|
|
|
yhat = model.predict(X_test) |
|
return y_test,lr_probs,yhat,model |
|
|
|
|
|
def compute_novelty_clf_metrics(results:Results_Handler,lev_dist_id_set,lev_dist_ood_set): |
|
aucs_roc = [] |
|
aucs_prc = [] |
|
f1s_prc = [] |
|
thresholds = [] |
|
replicates = 10 |
|
show_figure: bool = False |
|
|
|
for random_state in range(replicates): |
|
|
|
if random_state == replicates-1: |
|
show_figure = True |
|
|
|
test_labels,lr_probs,yhat,model = lev_clf(lev_dist_id_set,lev_dist_ood_set,random_state) |
|
thresholds.append(-model.intercept_[0]/model.coef_[0][0]) |
|
mean_thresh = sum(thresholds)/len(thresholds) |
|
|
|
if results.save_results: |
|
log_lev_params(mean_thresh,results.analysis_path) |
|
|
|
auc_roc = compute_roc(test_labels,lr_probs,results,show_figure) |
|
f1_prc,auc_prc = compute_prc(test_labels,lr_probs,yhat,results,show_figure) |
|
aucs_roc.append(auc_roc) |
|
aucs_prc.append(auc_prc) |
|
f1s_prc.append(f1_prc) |
|
|
|
|
|
auc_roc_score = sum(aucs_roc)/len(aucs_roc) |
|
auc_roc_std = np.std(aucs_roc) |
|
auc_prc_score = sum(aucs_prc)/len(aucs_prc) |
|
auc_prc_std = np.std(aucs_prc) |
|
f1_prc_score = sum(f1s_prc)/len(f1s_prc) |
|
f1_prc_std = np.std(f1s_prc) |
|
|
|
logger.info(f"auc roc is {auc_roc_score} +- {auc_roc_std}") |
|
logger.info(f"auc prc is {auc_prc_score} +- {auc_prc_std}") |
|
logger.info(f"f1 prc is {f1_prc_score} +- {f1_prc_std}") |
|
|
|
novelty_clf_metrics = {"AUC ROC score": auc_roc_score,\ |
|
"auc_roc_std": auc_roc_std,\ |
|
"AUC PRC score": auc_prc_score,\ |
|
"auc_prc_std":auc_prc_std,\ |
|
"F1 PRC score": f1_prc_score,\ |
|
"f1_prc_std":f1_prc_std |
|
} |
|
|
|
novelty_clf_metrics = eval(json.dumps(novelty_clf_metrics)) |
|
if results.save_results: |
|
save(data = novelty_clf_metrics,path=results.analysis_path+"/novelty_clf_metrics.yaml") |
|
|
|
return sum(thresholds)/len(thresholds) |
|
|
|
|
|
def compute_nlds(embedds_path): |
|
logger.info("Computing NLD metrics") |
|
|
|
logger.info("Computing novelty clf metrics") |
|
|
|
splits = ['train','valid','test','ood','artificial','no_annotation'] |
|
|
|
run_name = None |
|
|
|
|
|
results:Results_Handler = Results_Handler(embedds_path=embedds_path,splits=splits,read_dataset=True,create_knn_graph=True,save_results=True) |
|
results.append_loco_variants() |
|
|
|
results.get_knn_model() |
|
lev_dist_df = pd.DataFrame() |
|
|
|
|
|
|
|
for split in results.splits_df_dict.keys(): |
|
if len(results.splits_df_dict[f'{split}']) == 0: |
|
continue |
|
split_seqs,split_labels,top_n_seqs,top_n_labels,distances,lev_dist = get_closest_ngbr_per_split(results,'_'.join(split.split('_')[:-1])) |
|
|
|
lev_dist_split_df = pd.DataFrame({'split':split,'lev_dist':lev_dist,'seqs':split_seqs,'labels':split_labels,'top_n_seqs':top_n_seqs,'top_n_labels':top_n_labels}) |
|
|
|
lev_dist_df = lev_dist_df.append(lev_dist_split_df) |
|
|
|
|
|
fig = px.box(lev_dist_df, x="split", y="lev_dist",points="all",hover_data=['seqs','labels','top_n_seqs','top_n_labels']) |
|
|
|
fig.update_traces(marker=dict(size=2)) |
|
fig.show() |
|
|
|
fig.write_html(f'{results.figures_path}/lev_distance_distribution.html') |
|
fig.write_image(f'{results.figures_path}/lev_distance_distribution.png') |
|
|
|
|
|
if 'ood_df' in lev_dist_df['split'].values: |
|
novel_df = lev_dist_df[lev_dist_df['split'] == 'ood_df'] |
|
else: |
|
novel_df = lev_dist_df[lev_dist_df['split'] == 'artificial_affix_df'] |
|
test_df = lev_dist_df[lev_dist_df['split'] == 'test_df'] |
|
|
|
lev_dist_df.to_csv(f'{results.analysis_path}/lev_dist_df.csv') |
|
|
|
|
|
compute_novelty_clf_metrics(results,test_df['lev_dist'].values,novel_df['lev_dist'].values) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|