import numpy as np from ._base_metric import _BaseMetric from .. import _timing from functools import partial from .. import utils from ..utils import TrackEvalException class TrackMAP(_BaseMetric): """Class which implements the TrackMAP metrics""" @staticmethod def get_default_metric_config(): """Default class config values""" default_config = { 'USE_AREA_RANGES': True, # whether to evaluate for certain area ranges 'AREA_RANGES': [[0 ** 2, 32 ** 2], # additional area range sets for which TrackMAP is evaluated [32 ** 2, 96 ** 2], # (all area range always included), default values for TAO [96 ** 2, 1e5 ** 2]], # evaluation 'AREA_RANGE_LABELS': ["area_s", "area_m", "area_l"], # the labels for the area ranges 'USE_TIME_RANGES': True, # whether to evaluate for certain time ranges (length of tracks) 'TIME_RANGES': [[0, 3], [3, 10], [10, 1e5]], # additional time range sets for which TrackMAP is evaluated # (all time range always included) , default values for TAO evaluation 'TIME_RANGE_LABELS': ["time_s", "time_m", "time_l"], # the labels for the time ranges 'IOU_THRESHOLDS': np.arange(0.5, 0.96, 0.05), # the IoU thresholds 'RECALL_THRESHOLDS': np.linspace(0.0, 1.00, int(np.round((1.00 - 0.0) / 0.01) + 1), endpoint=True), # recall thresholds at which precision is evaluated 'MAX_DETECTIONS': 0, # limit the maximum number of considered tracks per sequence (0 for unlimited) 'PRINT_CONFIG': True } return default_config def __init__(self, config=None): super().__init__() self.config = utils.init_config(config, self.get_default_metric_config(), self.get_name()) self.num_ig_masks = 1 self.lbls = ['all'] self.use_area_rngs = self.config['USE_AREA_RANGES'] if self.use_area_rngs: self.area_rngs = self.config['AREA_RANGES'] self.area_rng_lbls = self.config['AREA_RANGE_LABELS'] self.num_ig_masks += len(self.area_rng_lbls) self.lbls += self.area_rng_lbls self.use_time_rngs = self.config['USE_TIME_RANGES'] if self.use_time_rngs: self.time_rngs = self.config['TIME_RANGES'] self.time_rng_lbls = self.config['TIME_RANGE_LABELS'] self.num_ig_masks += len(self.time_rng_lbls) self.lbls += self.time_rng_lbls self.array_labels = self.config['IOU_THRESHOLDS'] self.rec_thrs = self.config['RECALL_THRESHOLDS'] self.maxDet = self.config['MAX_DETECTIONS'] self.float_array_fields = ['AP_' + lbl for lbl in self.lbls] + ['AR_' + lbl for lbl in self.lbls] self.fields = self.float_array_fields self.summary_fields = self.float_array_fields @_timing.time def eval_sequence(self, data): """Calculates GT and Tracker matches for one sequence for TrackMAP metrics. Adapted from https://github.com/TAO-Dataset/""" # Initialise results to zero for each sequence as the fields are only defined over the set of all sequences res = {} for field in self.fields: res[field] = [0 for _ in self.array_labels] gt_ids, dt_ids = data['gt_track_ids'], data['dt_track_ids'] if len(gt_ids) == 0 and len(dt_ids) == 0: for idx in range(self.num_ig_masks): res[idx] = None return res # get track data gt_tr_areas = data.get('gt_track_areas', None) if self.use_area_rngs else None gt_tr_lengths = data.get('gt_track_lengths', None) if self.use_time_rngs else None gt_tr_iscrowd = data.get('gt_track_iscrowd', None) dt_tr_areas = data.get('dt_track_areas', None) if self.use_area_rngs else None dt_tr_lengths = data.get('dt_track_lengths', None) if self.use_time_rngs else None is_nel = data.get('not_exhaustively_labeled', False) # compute ignore masks for different track sets to eval gt_ig_masks = self._compute_track_ig_masks(len(gt_ids), track_lengths=gt_tr_lengths, track_areas=gt_tr_areas, iscrowd=gt_tr_iscrowd) dt_ig_masks = self._compute_track_ig_masks(len(dt_ids), track_lengths=dt_tr_lengths, track_areas=dt_tr_areas, is_not_exhaustively_labeled=is_nel, is_gt=False) boxformat = data.get('boxformat', 'xywh') ious = self._compute_track_ious(data['dt_tracks'], data['gt_tracks'], iou_function=data['iou_type'], boxformat=boxformat) for mask_idx in range(self.num_ig_masks): gt_ig_mask = gt_ig_masks[mask_idx] # Sort gt ignore last gt_idx = np.argsort([g for g in gt_ig_mask], kind="mergesort") gt_ids = [gt_ids[i] for i in gt_idx] ious_sorted = ious[:, gt_idx] if len(ious) > 0 else ious num_thrs = len(self.array_labels) num_gt = len(gt_ids) num_dt = len(dt_ids) # Array to store the "id" of the matched dt/gt gt_m = np.zeros((num_thrs, num_gt)) - 1 dt_m = np.zeros((num_thrs, num_dt)) - 1 gt_ig = np.array([gt_ig_mask[idx] for idx in gt_idx]) dt_ig = np.zeros((num_thrs, num_dt)) for iou_thr_idx, iou_thr in enumerate(self.array_labels): if len(ious_sorted) == 0: break for dt_idx, _dt in enumerate(dt_ids): iou = min([iou_thr, 1 - 1e-10]) # information about best match so far (m=-1 -> unmatched) # store the gt_idx which matched for _dt m = -1 for gt_idx, _ in enumerate(gt_ids): # if this gt already matched continue if gt_m[iou_thr_idx, gt_idx] > 0: continue # if _dt matched to reg gt, and on ignore gt, stop if m > -1 and gt_ig[m] == 0 and gt_ig[gt_idx] == 1: break # continue to next gt unless better match made if ious_sorted[dt_idx, gt_idx] < iou - np.finfo('float').eps: continue # if match successful and best so far, store appropriately iou = ious_sorted[dt_idx, gt_idx] m = gt_idx # No match found for _dt, go to next _dt if m == -1: continue # if gt to ignore for some reason update dt_ig. # Should not be used in evaluation. dt_ig[iou_thr_idx, dt_idx] = gt_ig[m] # _dt match found, update gt_m, and dt_m with "id" dt_m[iou_thr_idx, dt_idx] = gt_ids[m] gt_m[iou_thr_idx, m] = _dt dt_ig_mask = dt_ig_masks[mask_idx] dt_ig_mask = np.array(dt_ig_mask).reshape((1, num_dt)) # 1 X num_dt dt_ig_mask = np.repeat(dt_ig_mask, num_thrs, 0) # num_thrs X num_dt # Based on dt_ig_mask ignore any unmatched detection by updating dt_ig dt_ig = np.logical_or(dt_ig, np.logical_and(dt_m == -1, dt_ig_mask)) # store results for given video and category res[mask_idx] = { "dt_ids": dt_ids, "gt_ids": gt_ids, "dt_matches": dt_m, "gt_matches": gt_m, "dt_scores": data['dt_track_scores'], "gt_ignore": gt_ig, "dt_ignore": dt_ig, } return res def combine_sequences(self, all_res): """Combines metrics across all sequences. Computes precision and recall values based on track matches. Adapted from https://github.com/TAO-Dataset/ """ num_thrs = len(self.array_labels) num_recalls = len(self.rec_thrs) # -1 for absent categories precision = -np.ones( (num_thrs, num_recalls, self.num_ig_masks) ) recall = -np.ones((num_thrs, self.num_ig_masks)) for ig_idx in range(self.num_ig_masks): ig_idx_results = [res[ig_idx] for res in all_res.values() if res[ig_idx] is not None] # Remove elements which are None if len(ig_idx_results) == 0: continue # Append all scores: shape (N,) # limit considered tracks for each sequence if maxDet > 0 if self.maxDet == 0: dt_scores = np.concatenate([res["dt_scores"] for res in ig_idx_results], axis=0) dt_idx = np.argsort(-dt_scores, kind="mergesort") dt_m = np.concatenate([e["dt_matches"] for e in ig_idx_results], axis=1)[:, dt_idx] dt_ig = np.concatenate([e["dt_ignore"] for e in ig_idx_results], axis=1)[:, dt_idx] elif self.maxDet > 0: dt_scores = np.concatenate([res["dt_scores"][0:self.maxDet] for res in ig_idx_results], axis=0) dt_idx = np.argsort(-dt_scores, kind="mergesort") dt_m = np.concatenate([e["dt_matches"][:, 0:self.maxDet] for e in ig_idx_results], axis=1)[:, dt_idx] dt_ig = np.concatenate([e["dt_ignore"][:, 0:self.maxDet] for e in ig_idx_results], axis=1)[:, dt_idx] else: raise Exception("Number of maximum detections must be >= 0, but is set to %i" % self.maxDet) gt_ig = np.concatenate([res["gt_ignore"] for res in ig_idx_results]) # num gt anns to consider num_gt = np.count_nonzero(gt_ig == 0) if num_gt == 0: continue tps = np.logical_and(dt_m != -1, np.logical_not(dt_ig)) fps = np.logical_and(dt_m == -1, np.logical_not(dt_ig)) tp_sum = np.cumsum(tps, axis=1).astype(dtype=np.float) fp_sum = np.cumsum(fps, axis=1).astype(dtype=np.float) for iou_thr_idx, (tp, fp) in enumerate(zip(tp_sum, fp_sum)): tp = np.array(tp) fp = np.array(fp) num_tp = len(tp) rc = tp / num_gt if num_tp: recall[iou_thr_idx, ig_idx] = rc[-1] else: recall[iou_thr_idx, ig_idx] = 0 # np.spacing(1) ~= eps pr = tp / (fp + tp + np.spacing(1)) pr = pr.tolist() # Ensure precision values are monotonically decreasing for i in range(num_tp - 1, 0, -1): if pr[i] > pr[i - 1]: pr[i - 1] = pr[i] # find indices at the predefined recall values rec_thrs_insert_idx = np.searchsorted(rc, self.rec_thrs, side="left") pr_at_recall = [0.0] * num_recalls try: for _idx, pr_idx in enumerate(rec_thrs_insert_idx): pr_at_recall[_idx] = pr[pr_idx] except IndexError: pass precision[iou_thr_idx, :, ig_idx] = (np.array(pr_at_recall)) res = {'precision': precision, 'recall': recall} # compute the precision and recall averages for the respective alpha thresholds and ignore masks for lbl in self.lbls: res['AP_' + lbl] = np.zeros((len(self.array_labels)), dtype=np.float) res['AR_' + lbl] = np.zeros((len(self.array_labels)), dtype=np.float) for a_id, alpha in enumerate(self.array_labels): for lbl_idx, lbl in enumerate(self.lbls): p = precision[a_id, :, lbl_idx] if len(p[p > -1]) == 0: mean_p = -1 else: mean_p = np.mean(p[p > -1]) res['AP_' + lbl][a_id] = mean_p res['AR_' + lbl][a_id] = recall[a_id, lbl_idx] return res def combine_classes_class_averaged(self, all_res, ignore_empty_classes=True): """Combines metrics across all classes by averaging over the class values Note mAP is not well defined for 'empty classes' so 'ignore empty classes' is always true here. """ res = {} for field in self.fields: res[field] = np.zeros((len(self.array_labels)), dtype=np.float) field_stacked = np.array([res[field] for res in all_res.values()]) for a_id, alpha in enumerate(self.array_labels): values = field_stacked[:, a_id] if len(values[values > -1]) == 0: mean = -1 else: mean = np.mean(values[values > -1]) res[field][a_id] = mean return res def combine_classes_det_averaged(self, all_res): """Combines metrics across all classes by averaging over the detection values""" res = {} for field in self.fields: res[field] = np.zeros((len(self.array_labels)), dtype=np.float) field_stacked = np.array([res[field] for res in all_res.values()]) for a_id, alpha in enumerate(self.array_labels): values = field_stacked[:, a_id] if len(values[values > -1]) == 0: mean = -1 else: mean = np.mean(values[values > -1]) res[field][a_id] = mean return res def _compute_track_ig_masks(self, num_ids, track_lengths=None, track_areas=None, iscrowd=None, is_not_exhaustively_labeled=False, is_gt=True): """ Computes ignore masks for different track sets to evaluate :param num_ids: the number of track IDs :param track_lengths: the lengths of the tracks (number of timesteps) :param track_areas: the average area of a track :param iscrowd: whether a track is marked as crowd :param is_not_exhaustively_labeled: whether the track category is not exhaustively labeled :param is_gt: whether it is gt :return: the track ignore masks """ # for TAO tracks for classes which are not exhaustively labeled are not evaluated if not is_gt and is_not_exhaustively_labeled: track_ig_masks = [[1 for _ in range(num_ids)] for i in range(self.num_ig_masks)] else: # consider all tracks track_ig_masks = [[0 for _ in range(num_ids)]] # consider tracks with certain area if self.use_area_rngs: for rng in self.area_rngs: track_ig_masks.append([0 if rng[0] - np.finfo('float').eps <= area <= rng[1] + np.finfo('float').eps else 1 for area in track_areas]) # consider tracks with certain duration if self.use_time_rngs: for rng in self.time_rngs: track_ig_masks.append([0 if rng[0] - np.finfo('float').eps <= length <= rng[1] + np.finfo('float').eps else 1 for length in track_lengths]) # for YouTubeVIS evaluation tracks with crowd tag are not evaluated if is_gt and iscrowd: track_ig_masks = [np.logical_or(mask, iscrowd) for mask in track_ig_masks] return track_ig_masks @staticmethod def _compute_bb_track_iou(dt_track, gt_track, boxformat='xywh'): """ Calculates the track IoU for one detected track and one ground truth track for bounding boxes :param dt_track: the detected track (format: dictionary with frame index as keys and numpy arrays as values) :param gt_track: the ground truth track (format: dictionary with frame index as keys and numpy array as values) :param boxformat: the format of the boxes :return: the track IoU """ intersect = 0 union = 0 image_ids = set(gt_track.keys()) | set(dt_track.keys()) for image in image_ids: g = gt_track.get(image, None) d = dt_track.get(image, None) if boxformat == 'xywh': if d is not None and g is not None: dx, dy, dw, dh = d gx, gy, gw, gh = g w = max(min(dx + dw, gx + gw) - max(dx, gx), 0) h = max(min(dy + dh, gy + gh) - max(dy, gy), 0) i = w * h u = dw * dh + gw * gh - i intersect += i union += u elif d is None and g is not None: union += g[2] * g[3] elif d is not None and g is None: union += d[2] * d[3] elif boxformat == 'x0y0x1y1': if d is not None and g is not None: dx0, dy0, dx1, dy1 = d gx0, gy0, gx1, gy1 = g w = max(min(dx1, gx1) - max(dx0, gx0), 0) h = max(min(dy1, gy1) - max(dy0, gy0), 0) i = w * h u = (dx1 - dx0) * (dy1 - dy0) + (gx1 - gx0) * (gy1 - gy0) - i intersect += i union += u elif d is None and g is not None: union += (g[2] - g[0]) * (g[3] - g[1]) elif d is not None and g is None: union += (d[2] - d[0]) * (d[3] - d[1]) else: raise TrackEvalException('BoxFormat not implemented') if intersect > union: raise TrackEvalException("Intersection value > union value. Are the box values corrupted?") return intersect / union if union > 0 else 0 @staticmethod def _compute_mask_track_iou(dt_track, gt_track): """ Calculates the track IoU for one detected track and one ground truth track for segmentation masks :param dt_track: the detected track (format: dictionary with frame index as keys and pycocotools rle encoded masks as values) :param gt_track: the ground truth track (format: dictionary with frame index as keys and pycocotools rle encoded masks as values) :return: the track IoU """ # only loaded when needed to reduce minimum requirements from pycocotools import mask as mask_utils intersect = .0 union = .0 image_ids = set(gt_track.keys()) | set(dt_track.keys()) for image in image_ids: g = gt_track.get(image, None) d = dt_track.get(image, None) if d and g: intersect += mask_utils.area(mask_utils.merge([d, g], True)) union += mask_utils.area(mask_utils.merge([d, g], False)) elif not d and g: union += mask_utils.area(g) elif d and not g: union += mask_utils.area(d) if union < 0.0 - np.finfo('float').eps: raise TrackEvalException("Union value < 0. Are the segmentaions corrupted?") if intersect > union: raise TrackEvalException("Intersection value > union value. Are the segmentations corrupted?") iou = intersect / union if union > 0.0 + np.finfo('float').eps else 0.0 return iou @staticmethod def _compute_track_ious(dt, gt, iou_function='bbox', boxformat='xywh'): """ Calculate track IoUs for a set of ground truth tracks and a set of detected tracks """ if len(gt) == 0 and len(dt) == 0: return [] if iou_function == 'bbox': track_iou_function = partial(TrackMAP._compute_bb_track_iou, boxformat=boxformat) elif iou_function == 'mask': track_iou_function = partial(TrackMAP._compute_mask_track_iou) else: raise Exception('IoU function not implemented') ious = np.zeros([len(dt), len(gt)]) for i, j in np.ndindex(ious.shape): ious[i, j] = track_iou_function(dt[i], gt[j]) return ious @staticmethod def _row_print(*argv): """Prints results in an evenly spaced rows, with more space in first row""" if len(argv) == 1: argv = argv[0] to_print = '%-40s' % argv[0] for v in argv[1:]: to_print += '%-12s' % str(v) print(to_print)