Spaces:
Build error
Build error
| import csv | |
| import io | |
| import zipfile | |
| import os | |
| import traceback | |
| import numpy as np | |
| from copy import deepcopy | |
| from abc import ABC, abstractmethod | |
| from .. import _timing | |
| from ..utils import TrackEvalException | |
| class _BaseDataset(ABC): | |
| def __init__(self): | |
| self.tracker_list = None | |
| self.seq_list = None | |
| self.class_list = None | |
| self.output_fol = None | |
| self.output_sub_fol = None | |
| self.should_classes_combine = True | |
| self.use_super_categories = False | |
| # Functions to implement: | |
| def get_default_dataset_config(): | |
| ... | |
| def _load_raw_file(self, tracker, seq, is_gt): | |
| ... | |
| def get_preprocessed_seq_data(self, raw_data, cls): | |
| ... | |
| def _calculate_similarities(self, gt_dets_t, tracker_dets_t): | |
| ... | |
| # Helper functions for all datasets: | |
| def get_class_name(cls): | |
| return cls.__name__ | |
| def get_name(self): | |
| return self.get_class_name() | |
| def get_output_fol(self, tracker): | |
| return os.path.join(self.output_fol, tracker, self.output_sub_fol) | |
| def get_display_name(self, tracker): | |
| """ Can be overwritten if the trackers name (in files) is different to how it should be displayed. | |
| By default this method just returns the trackers name as is. | |
| """ | |
| return tracker | |
| def get_eval_info(self): | |
| """Return info about the dataset needed for the Evaluator""" | |
| return self.tracker_list, self.seq_list, self.class_list | |
| def get_raw_seq_data(self, tracker, seq): | |
| """ Loads raw data (tracker and ground-truth) for a single tracker on a single sequence. | |
| Raw data includes all of the information needed for both preprocessing and evaluation, for all classes. | |
| A later function (get_processed_seq_data) will perform such preprocessing and extract relevant information for | |
| the evaluation of each class. | |
| This returns a dict which contains the fields: | |
| [num_timesteps]: integer | |
| [gt_ids, tracker_ids, gt_classes, tracker_classes, tracker_confidences]: | |
| list (for each timestep) of 1D NDArrays (for each det). | |
| [gt_dets, tracker_dets, gt_crowd_ignore_regions]: list (for each timestep) of lists of detections. | |
| [similarity_scores]: list (for each timestep) of 2D NDArrays. | |
| [gt_extras]: dict (for each extra) of lists (for each timestep) of 1D NDArrays (for each det). | |
| gt_extras contains dataset specific information used for preprocessing such as occlusion and truncation levels. | |
| Note that similarities are extracted as part of the dataset and not the metric, because almost all metrics are | |
| independent of the exact method of calculating the similarity. However datasets are not (e.g. segmentation | |
| masks vs 2D boxes vs 3D boxes). | |
| We calculate the similarity before preprocessing because often both preprocessing and evaluation require it and | |
| we don't wish to calculate this twice. | |
| We calculate similarity between all gt and tracker classes (not just each class individually) to allow for | |
| calculation of metrics such as class confusion matrices. Typically the impact of this on performance is low. | |
| """ | |
| # Load raw data. | |
| raw_gt_data = self._load_raw_file(tracker, seq, is_gt=True) | |
| raw_tracker_data = self._load_raw_file(tracker, seq, is_gt=False) | |
| raw_data = {**raw_tracker_data, **raw_gt_data} # Merges dictionaries | |
| # Calculate similarities for each timestep. | |
| similarity_scores = [] | |
| for t, (gt_dets_t, tracker_dets_t) in enumerate(zip(raw_data['gt_dets'], raw_data['tracker_dets'])): | |
| ious = self._calculate_similarities(gt_dets_t, tracker_dets_t) | |
| similarity_scores.append(ious) | |
| raw_data['similarity_scores'] = similarity_scores | |
| return raw_data | |
| def _load_simple_text_file(file, time_col=0, id_col=None, remove_negative_ids=False, valid_filter=None, | |
| crowd_ignore_filter=None, convert_filter=None, is_zipped=False, zip_file=None, | |
| force_delimiters=None): | |
| """ Function that loads data which is in a commonly used text file format. | |
| Assumes each det is given by one row of a text file. | |
| There is no limit to the number or meaning of each column, | |
| however one column needs to give the timestep of each det (time_col) which is default col 0. | |
| The file dialect (deliminator, num cols, etc) is determined automatically. | |
| This function automatically separates dets by timestep, | |
| and is much faster than alternatives such as np.loadtext or pandas. | |
| If remove_negative_ids is True and id_col is not None, dets with negative values in id_col are excluded. | |
| These are not excluded from ignore data. | |
| valid_filter can be used to only include certain classes. | |
| It is a dict with ints as keys, and lists as values, | |
| such that a row is included if "row[key].lower() is in value" for all key/value pairs in the dict. | |
| If None, all classes are included. | |
| crowd_ignore_filter can be used to read crowd_ignore regions separately. It has the same format as valid filter. | |
| convert_filter can be used to convert value read to another format. | |
| This is used most commonly to convert classes given as string to a class id. | |
| This is a dict such that the key is the column to convert, and the value is another dict giving the mapping. | |
| Optionally, input files could be a zip of multiple text files for storage efficiency. | |
| Returns read_data and ignore_data. | |
| Each is a dict (with keys as timesteps as strings) of lists (over dets) of lists (over column values). | |
| Note that all data is returned as strings, and must be converted to float/int later if needed. | |
| Note that timesteps will not be present in the returned dict keys if there are no dets for them | |
| """ | |
| if remove_negative_ids and id_col is None: | |
| raise TrackEvalException('remove_negative_ids is True, but id_col is not given.') | |
| if crowd_ignore_filter is None: | |
| crowd_ignore_filter = {} | |
| if convert_filter is None: | |
| convert_filter = {} | |
| try: | |
| if is_zipped: # Either open file directly or within a zip. | |
| if zip_file is None: | |
| raise TrackEvalException('is_zipped set to True, but no zip_file is given.') | |
| archive = zipfile.ZipFile(os.path.join(zip_file), 'r') | |
| fp = io.TextIOWrapper(archive.open(file, 'r')) | |
| else: | |
| fp = open(file) | |
| read_data = {} | |
| crowd_ignore_data = {} | |
| fp.seek(0, os.SEEK_END) | |
| # check if file is empty | |
| if fp.tell(): | |
| fp.seek(0) | |
| dialect = csv.Sniffer().sniff(fp.readline(), delimiters=force_delimiters) # Auto determine structure. | |
| dialect.skipinitialspace = True # Deal with extra spaces between columns | |
| fp.seek(0) | |
| reader = csv.reader(fp, dialect) | |
| for row in reader: | |
| try: | |
| # Deal with extra trailing spaces at the end of rows | |
| if row[-1] in '': | |
| row = row[:-1] | |
| timestep = str(int(float(row[time_col]))) | |
| # Read ignore regions separately. | |
| is_ignored = False | |
| for ignore_key, ignore_value in crowd_ignore_filter.items(): | |
| if row[ignore_key].lower() in ignore_value: | |
| # Convert values in one column (e.g. string to id) | |
| for convert_key, convert_value in convert_filter.items(): | |
| row[convert_key] = convert_value[row[convert_key].lower()] | |
| # Save data separated by timestep. | |
| if timestep in crowd_ignore_data.keys(): | |
| crowd_ignore_data[timestep].append(row) | |
| else: | |
| crowd_ignore_data[timestep] = [row] | |
| is_ignored = True | |
| if is_ignored: # if det is an ignore region, it cannot be a normal det. | |
| continue | |
| # Exclude some dets if not valid. | |
| if valid_filter is not None: | |
| for key, value in valid_filter.items(): | |
| if row[key].lower() not in value: | |
| continue | |
| if remove_negative_ids: | |
| if int(float(row[id_col])) < 0: | |
| continue | |
| # Convert values in one column (e.g. string to id) | |
| for convert_key, convert_value in convert_filter.items(): | |
| row[convert_key] = convert_value[row[convert_key].lower()] | |
| # Save data separated by timestep. | |
| if timestep in read_data.keys(): | |
| read_data[timestep].append(row) | |
| else: | |
| read_data[timestep] = [row] | |
| except Exception: | |
| exc_str_init = 'In file %s the following line cannot be read correctly: \n' % os.path.basename( | |
| file) | |
| exc_str = ' '.join([exc_str_init]+row) | |
| raise TrackEvalException(exc_str) | |
| fp.close() | |
| except Exception: | |
| print('Error loading file: %s, printing traceback.' % file) | |
| traceback.print_exc() | |
| raise TrackEvalException( | |
| 'File %s cannot be read because it is either not present or invalidly formatted' % os.path.basename( | |
| file)) | |
| return read_data, crowd_ignore_data | |
| def _calculate_mask_ious(masks1, masks2, is_encoded=False, do_ioa=False): | |
| """ Calculates the IOU (intersection over union) between two arrays of segmentation masks. | |
| If is_encoded a run length encoding with pycocotools is assumed as input format, otherwise an input of numpy | |
| arrays of the shape (num_masks, height, width) is assumed and the encoding is performed. | |
| If do_ioa (intersection over area) , then calculates the intersection over the area of masks1 - this is commonly | |
| used to determine if detections are within crowd ignore region. | |
| :param masks1: first set of masks (numpy array of shape (num_masks, height, width) if not encoded, | |
| else pycocotools rle encoded format) | |
| :param masks2: second set of masks (numpy array of shape (num_masks, height, width) if not encoded, | |
| else pycocotools rle encoded format) | |
| :param is_encoded: whether the input is in pycocotools rle encoded format | |
| :param do_ioa: whether to perform IoA computation | |
| :return: the IoU/IoA scores | |
| """ | |
| # Only loaded when run to reduce minimum requirements | |
| from pycocotools import mask as mask_utils | |
| # use pycocotools for run length encoding of masks | |
| if not is_encoded: | |
| masks1 = mask_utils.encode(np.array(np.transpose(masks1, (1, 2, 0)), order='F')) | |
| masks2 = mask_utils.encode(np.array(np.transpose(masks2, (1, 2, 0)), order='F')) | |
| # use pycocotools for iou computation of rle encoded masks | |
| ious = mask_utils.iou(masks1, masks2, [do_ioa]*len(masks2)) | |
| if len(masks1) == 0 or len(masks2) == 0: | |
| ious = np.asarray(ious).reshape(len(masks1), len(masks2)) | |
| assert (ious >= 0 - np.finfo('float').eps).all() | |
| assert (ious <= 1 + np.finfo('float').eps).all() | |
| return ious | |
| def _calculate_box_ious(bboxes1, bboxes2, box_format='xywh', do_ioa=False): | |
| """ Calculates the IOU (intersection over union) between two arrays of boxes. | |
| Allows variable box formats ('xywh' and 'x0y0x1y1'). | |
| If do_ioa (intersection over area) , then calculates the intersection over the area of boxes1 - this is commonly | |
| used to determine if detections are within crowd ignore region. | |
| """ | |
| if box_format in 'xywh': | |
| # layout: (x0, y0, w, h) | |
| bboxes1 = deepcopy(bboxes1) | |
| bboxes2 = deepcopy(bboxes2) | |
| bboxes1[:, 2] = bboxes1[:, 0] + bboxes1[:, 2] | |
| bboxes1[:, 3] = bboxes1[:, 1] + bboxes1[:, 3] | |
| bboxes2[:, 2] = bboxes2[:, 0] + bboxes2[:, 2] | |
| bboxes2[:, 3] = bboxes2[:, 1] + bboxes2[:, 3] | |
| elif box_format not in 'x0y0x1y1': | |
| raise (TrackEvalException('box_format %s is not implemented' % box_format)) | |
| # layout: (x0, y0, x1, y1) | |
| min_ = np.minimum(bboxes1[:, np.newaxis, :], bboxes2[np.newaxis, :, :]) | |
| max_ = np.maximum(bboxes1[:, np.newaxis, :], bboxes2[np.newaxis, :, :]) | |
| intersection = np.maximum(min_[..., 2] - max_[..., 0], 0) * np.maximum(min_[..., 3] - max_[..., 1], 0) | |
| area1 = (bboxes1[..., 2] - bboxes1[..., 0]) * (bboxes1[..., 3] - bboxes1[..., 1]) | |
| if do_ioa: | |
| ioas = np.zeros_like(intersection) | |
| valid_mask = area1 > 0 + np.finfo('float').eps | |
| ioas[valid_mask, :] = intersection[valid_mask, :] / area1[valid_mask][:, np.newaxis] | |
| return ioas | |
| else: | |
| area2 = (bboxes2[..., 2] - bboxes2[..., 0]) * (bboxes2[..., 3] - bboxes2[..., 1]) | |
| union = area1[:, np.newaxis] + area2[np.newaxis, :] - intersection | |
| intersection[area1 <= 0 + np.finfo('float').eps, :] = 0 | |
| intersection[:, area2 <= 0 + np.finfo('float').eps] = 0 | |
| intersection[union <= 0 + np.finfo('float').eps] = 0 | |
| union[union <= 0 + np.finfo('float').eps] = 1 | |
| ious = intersection / union | |
| return ious | |
| def _calculate_euclidean_similarity(dets1, dets2, zero_distance=2.0): | |
| """ Calculates the euclidean distance between two sets of detections, and then converts this into a similarity | |
| measure with values between 0 and 1 using the following formula: sim = max(0, 1 - dist/zero_distance). | |
| The default zero_distance of 2.0, corresponds to the default used in MOT15_3D, such that a 0.5 similarity | |
| threshold corresponds to a 1m distance threshold for TPs. | |
| """ | |
| dist = np.linalg.norm(dets1[:, np.newaxis]-dets2[np.newaxis, :], axis=2) | |
| sim = np.maximum(0, 1 - dist/zero_distance) | |
| return sim | |
| def _check_unique_ids(data, after_preproc=False): | |
| """Check the requirement that the tracker_ids and gt_ids are unique per timestep""" | |
| gt_ids = data['gt_ids'] | |
| tracker_ids = data['tracker_ids'] | |
| for t, (gt_ids_t, tracker_ids_t) in enumerate(zip(gt_ids, tracker_ids)): | |
| if len(tracker_ids_t) > 0: | |
| unique_ids, counts = np.unique(tracker_ids_t, return_counts=True) | |
| if np.max(counts) != 1: | |
| duplicate_ids = unique_ids[counts > 1] | |
| exc_str_init = 'Tracker predicts the same ID more than once in a single timestep ' \ | |
| '(seq: %s, frame: %i, ids:' % (data['seq'], t+1) | |
| exc_str = ' '.join([exc_str_init] + [str(d) for d in duplicate_ids]) + ')' | |
| if after_preproc: | |
| exc_str_init += '\n Note that this error occurred after preprocessing (but not before), ' \ | |
| 'so ids may not be as in file, and something seems wrong with preproc.' | |
| raise TrackEvalException(exc_str) | |
| if len(gt_ids_t) > 0: | |
| unique_ids, counts = np.unique(gt_ids_t, return_counts=True) | |
| if np.max(counts) != 1: | |
| duplicate_ids = unique_ids[counts > 1] | |
| exc_str_init = 'Ground-truth has the same ID more than once in a single timestep ' \ | |
| '(seq: %s, frame: %i, ids:' % (data['seq'], t+1) | |
| exc_str = ' '.join([exc_str_init] + [str(d) for d in duplicate_ids]) + ')' | |
| if after_preproc: | |
| exc_str_init += '\n Note that this error occurred after preprocessing (but not before), ' \ | |
| 'so ids may not be as in file, and something seems wrong with preproc.' | |
| raise TrackEvalException(exc_str) | |