wenruifan's picture
Upload 115 files
a256709 verified
import argparse
import os
import ruamel_yaml as yaml
import numpy as np
import json
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torch.nn.functional as F
from dataset.dataset_RSNA import RSNA2018_Dataset
from models.model_MeDSLIP import MeDSLIP
from models.tokenization_bert import BertTokenizer
from sklearn.metrics import roc_auc_score, precision_recall_curve, accuracy_score
from tqdm import tqdm
original_class = [
"normal",
"clear",
"sharp",
"sharply",
"unremarkable",
"intact",
"stable",
"free",
"effusion",
"opacity",
"pneumothorax",
"edema",
"atelectasis",
"tube",
"consolidation",
"process",
"abnormality",
"enlarge",
"tip",
"low",
"pneumonia",
"line",
"congestion",
"catheter",
"cardiomegaly",
"fracture",
"air",
"tortuous",
"lead",
"disease",
"calcification",
"prominence",
"device",
"engorgement",
"picc",
"clip",
"elevation",
"expand",
"nodule",
"wire",
"fluid",
"degenerative",
"pacemaker",
"thicken",
"marking",
"scar",
"hyperinflate",
"blunt",
"loss",
"widen",
"collapse",
"density",
"emphysema",
"aerate",
"mass",
"crowd",
"infiltrate",
"obscure",
"deformity",
"hernia",
"drainage",
"distention",
"shift",
"stent",
"pressure",
"lesion",
"finding",
"borderline",
"hardware",
"dilation",
"chf",
"redistribution",
"aspiration",
"tail_abnorm_obs",
"excluded_obs",
]
def get_tokenizer(tokenizer, target_text):
target_tokenizer = tokenizer(
list(target_text),
padding="max_length",
truncation=True,
max_length=64,
return_tensors="pt",
)
return target_tokenizer
def compute_AUCs(gt, pred, n_class):
"""Computes Area Under the Curve (AUC) from prediction scores.
Args:
gt: Pytorch tensor on GPU, shape = [n_samples, n_classes]
true binary labels.
pred: Pytorch tensor on GPU, shape = [n_samples, n_classes]
can either be probability estimates of the positive class,
confidence values, or binary decisions.
Returns:
List of AUROCs of all classes.
"""
AUROCs = []
gt_np = gt.cpu().numpy()
pred_np = pred.cpu().numpy()
for i in range(n_class):
AUROCs.append(roc_auc_score(gt_np[:, i], pred_np[:, i]))
return AUROCs
def main(args, config):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Total CUDA devices: ", torch.cuda.device_count())
torch.set_default_tensor_type("torch.FloatTensor")
#### Dataset ####
print("Creating dataset")
test_dataset = RSNA2018_Dataset(config["test_file"])
test_dataloader = DataLoader(
test_dataset,
batch_size=config["test_batch_size"],
num_workers=8,
pin_memory=True,
sampler=None,
shuffle=False,
collate_fn=None,
drop_last=False,
)
json_book = json.load(open(config["disease_book"], "r"))
disease_book = [json_book[i] for i in json_book]
ana_book = [
"It is located at " + i
for i in [
"trachea",
"left_hilar",
"right_hilar",
"hilar_unspec",
"left_pleural",
"right_pleural",
"pleural_unspec",
"heart_size",
"heart_border",
"left_diaphragm",
"right_diaphragm",
"diaphragm_unspec",
"retrocardiac",
"lower_left_lobe",
"upper_left_lobe",
"lower_right_lobe",
"middle_right_lobe",
"upper_right_lobe",
"left_lower_lung",
"left_mid_lung",
"left_upper_lung",
"left_apical_lung",
"left_lung_unspec",
"right_lower_lung",
"right_mid_lung",
"right_upper_lung",
"right_apical_lung",
"right_lung_unspec",
"lung_apices",
"lung_bases",
"left_costophrenic",
"right_costophrenic",
"costophrenic_unspec",
"cardiophrenic_sulcus",
"mediastinal",
"spine",
"clavicle",
"rib",
"stomach",
"right_atrium",
"right_ventricle",
"aorta",
"svc",
"interstitium",
"parenchymal",
"cavoatrial_junction",
"cardiopulmonary",
"pulmonary",
"lung_volumes",
"unspecified",
"other",
]
]
tokenizer = BertTokenizer.from_pretrained(config["text_encoder"])
ana_book_tokenizer = get_tokenizer(tokenizer, ana_book).to(device)
disease_book_tokenizer = get_tokenizer(tokenizer, disease_book).to(device)
print("Creating model")
model = MeDSLIP(config, disease_book_tokenizer)
if args.ddp:
model = nn.DataParallel(
model, device_ids=[i for i in range(torch.cuda.device_count())]
)
model = model.to(device)
checkpoint = torch.load(args.checkpoint, map_location="cpu")
state_dict = checkpoint["model"]
model.load_state_dict(state_dict, strict=False)
print("load checkpoint from %s" % args.checkpoint)
print("Start testing")
model.eval()
gt = torch.FloatTensor()
gt = gt.to(device)
pred = torch.FloatTensor()
pred = pred.to(device)
loop = tqdm(test_dataloader)
for i, sample in enumerate(loop):
loop.set_description(f"Testing: {i+1}/{len(test_dataloader)}")
images = sample["image"].to(device)
labels = sample["label"].to(device)
gt = torch.cat((gt, labels), 0)
with torch.no_grad():
pred_class = model(images)
pred_class = pred_class[:, original_class.index("pneumonia"), :]
pred_class = 1 - F.softmax(pred_class)
pred = torch.cat((pred, pred_class), 0)
AUROC = compute_AUCs(gt, pred, 1)
print("The AUROC of {} is {}".format("pneumonia", AUROC[0]))
max_f1s = []
accs = []
gt_np = gt[:, 0].cpu().numpy()
pred_np = pred[:, 0].cpu().numpy()
precision, recall, thresholds = precision_recall_curve(gt_np, pred_np)
numerator = 2 * recall * precision
denom = recall + precision
f1_scores = np.divide(
numerator, denom, out=np.zeros_like(denom), where=(denom != 0)
)
max_f1 = np.max(f1_scores)
max_f1_thresh = thresholds[np.argmax(f1_scores)]
max_f1s.append(max_f1)
accs.append(accuracy_score(gt_np, pred_np > max_f1_thresh))
f1_avg = np.array(max_f1s).mean()
acc_avg = np.array(accs).mean()
print("The average f1 is {F1_avg:.4f}".format(F1_avg=f1_avg))
print("The average ACC is {ACC_avg:.4f}".format(ACC_avg=acc_avg))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--config",
default="Sample_zero-Shot_Classification_RSNA/configs/MeDSLIP_config.yaml",
)
parser.add_argument("--checkpoint", default="MeDSLIP_resnet50.pth")
parser.add_argument("--device", default="cuda")
parser.add_argument("--gpu", type=str, default="0", help="gpu")
parser.add_argument("--ddp", action="store_true", help="use ddp")
args = parser.parse_args()
config = yaml.load(open(args.config, "r"), Loader=yaml.Loader)
os.environ["CUDA_VISIBLE_DEVICES"] = args.gpu
if args.gpu != "-1":
torch.cuda.current_device()
torch.cuda._initialized = True
main(args, config)