Roopansh's picture
Initial Commit
73c83cf
raw
history blame
7.5 kB
# Copyright (c) Facebook, Inc. and its affiliates.
import contextlib
import io
import itertools
import json
import logging
import numpy as np
import os
import tempfile
from collections import OrderedDict
from typing import Optional
from PIL import Image
from tabulate import tabulate
from detectron2.data import MetadataCatalog
from detectron2.utils import comm
from detectron2.utils.file_io import PathManager
from .evaluator import DatasetEvaluator
logger = logging.getLogger(__name__)
class COCOPanopticEvaluator(DatasetEvaluator):
"""
Evaluate Panoptic Quality metrics on COCO using PanopticAPI.
It saves panoptic segmentation prediction in `output_dir`
It contains a synchronize call and has to be called from all workers.
"""
def __init__(self, dataset_name: str, output_dir: Optional[str] = None):
"""
Args:
dataset_name: name of the dataset
output_dir: output directory to save results for evaluation.
"""
self._metadata = MetadataCatalog.get(dataset_name)
self._thing_contiguous_id_to_dataset_id = {
v: k for k, v in self._metadata.thing_dataset_id_to_contiguous_id.items()
}
self._stuff_contiguous_id_to_dataset_id = {
v: k for k, v in self._metadata.stuff_dataset_id_to_contiguous_id.items()
}
self._output_dir = output_dir
if self._output_dir is not None:
PathManager.mkdirs(self._output_dir)
def reset(self):
self._predictions = []
def _convert_category_id(self, segment_info):
isthing = segment_info.pop("isthing", None)
if isthing is None:
# the model produces panoptic category id directly. No more conversion needed
return segment_info
if isthing is True:
segment_info["category_id"] = self._thing_contiguous_id_to_dataset_id[
segment_info["category_id"]
]
else:
segment_info["category_id"] = self._stuff_contiguous_id_to_dataset_id[
segment_info["category_id"]
]
return segment_info
def process(self, inputs, outputs):
from panopticapi.utils import id2rgb
for input, output in zip(inputs, outputs):
panoptic_img, segments_info = output["panoptic_seg"]
panoptic_img = panoptic_img.cpu().numpy()
if segments_info is None:
# If "segments_info" is None, we assume "panoptic_img" is a
# H*W int32 image storing the panoptic_id in the format of
# category_id * label_divisor + instance_id. We reserve -1 for
# VOID label, and add 1 to panoptic_img since the official
# evaluation script uses 0 for VOID label.
label_divisor = self._metadata.label_divisor
segments_info = []
for panoptic_label in np.unique(panoptic_img):
if panoptic_label == -1:
# VOID region.
continue
pred_class = panoptic_label // label_divisor
isthing = (
pred_class in self._metadata.thing_dataset_id_to_contiguous_id.values()
)
segments_info.append(
{
"id": int(panoptic_label) + 1,
"category_id": int(pred_class),
"isthing": bool(isthing),
}
)
# Official evaluation script uses 0 for VOID label.
panoptic_img += 1
file_name = os.path.basename(input["file_name"])
file_name_png = os.path.splitext(file_name)[0] + ".png"
with io.BytesIO() as out:
Image.fromarray(id2rgb(panoptic_img)).save(out, format="PNG")
segments_info = [self._convert_category_id(x) for x in segments_info]
self._predictions.append(
{
"image_id": input["image_id"],
"file_name": file_name_png,
"png_string": out.getvalue(),
"segments_info": segments_info,
}
)
def evaluate(self):
comm.synchronize()
self._predictions = comm.gather(self._predictions)
self._predictions = list(itertools.chain(*self._predictions))
if not comm.is_main_process():
return
# PanopticApi requires local files
gt_json = PathManager.get_local_path(self._metadata.panoptic_json)
gt_folder = PathManager.get_local_path(self._metadata.panoptic_root)
with tempfile.TemporaryDirectory(prefix="panoptic_eval") as pred_dir:
logger.info("Writing all panoptic predictions to {} ...".format(pred_dir))
for p in self._predictions:
with open(os.path.join(pred_dir, p["file_name"]), "wb") as f:
f.write(p.pop("png_string"))
with open(gt_json, "r") as f:
json_data = json.load(f)
json_data["annotations"] = self._predictions
output_dir = self._output_dir or pred_dir
predictions_json = os.path.join(output_dir, "predictions.json")
with PathManager.open(predictions_json, "w") as f:
f.write(json.dumps(json_data))
from panopticapi.evaluation import pq_compute
with contextlib.redirect_stdout(io.StringIO()):
pq_res = pq_compute(
gt_json,
PathManager.get_local_path(predictions_json),
gt_folder=gt_folder,
pred_folder=pred_dir,
)
res = {}
res["PQ"] = 100 * pq_res["All"]["pq"]
res["SQ"] = 100 * pq_res["All"]["sq"]
res["RQ"] = 100 * pq_res["All"]["rq"]
res["PQ_th"] = 100 * pq_res["Things"]["pq"]
res["SQ_th"] = 100 * pq_res["Things"]["sq"]
res["RQ_th"] = 100 * pq_res["Things"]["rq"]
res["PQ_st"] = 100 * pq_res["Stuff"]["pq"]
res["SQ_st"] = 100 * pq_res["Stuff"]["sq"]
res["RQ_st"] = 100 * pq_res["Stuff"]["rq"]
results = OrderedDict({"panoptic_seg": res})
_print_panoptic_results(pq_res)
return results
def _print_panoptic_results(pq_res):
headers = ["", "PQ", "SQ", "RQ", "#categories"]
data = []
for name in ["All", "Things", "Stuff"]:
row = [name] + [pq_res[name][k] * 100 for k in ["pq", "sq", "rq"]] + [pq_res[name]["n"]]
data.append(row)
table = tabulate(
data, headers=headers, tablefmt="pipe", floatfmt=".3f", stralign="center", numalign="center"
)
logger.info("Panoptic Evaluation Results:\n" + table)
if __name__ == "__main__":
from detectron2.utils.logger import setup_logger
logger = setup_logger()
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--gt-json")
parser.add_argument("--gt-dir")
parser.add_argument("--pred-json")
parser.add_argument("--pred-dir")
args = parser.parse_args()
from panopticapi.evaluation import pq_compute
with contextlib.redirect_stdout(io.StringIO()):
pq_res = pq_compute(
args.gt_json, args.pred_json, gt_folder=args.gt_dir, pred_folder=args.pred_dir
)
_print_panoptic_results(pq_res)