Spaces:
Runtime error
Runtime error
# coding=utf-8 | |
# Copyright 2021 The Deeplab2 Authors. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
r"""This file contains code to track based on IoU overlaps. | |
The IoUTracker takes frame-by-frame panoptic segmentation prediction and | |
generates video panoptic segmentation with re-ordered identities based on IoU | |
overlaps within consecutive frames. | |
To run this script, you need to install scipy. | |
For example, install it via pip: | |
$pip install scipy | |
""" | |
import collections | |
import os | |
import pprint | |
from typing import List, Text, Tuple, Optional | |
from absl import app | |
from absl import flags | |
from absl import logging | |
import numpy as np | |
from scipy import optimize | |
import tensorflow as tf | |
from deeplab2.data import dataset | |
from deeplab2.evaluation import segmentation_and_tracking_quality as stq | |
from deeplab2.tracker import optical_flow_utils | |
from deeplab2.trainer import vis_utils | |
FLAGS = flags.FLAGS | |
flags.DEFINE_string('gt', None, 'The path to the gt video frames. This folder ' | |
'should contain one folder per sequence.') | |
flags.DEFINE_string('pred', None, 'The path to the prediction video frames. ' | |
'This folder should contain one folder per sequence.') | |
flags.DEFINE_string('output', '', 'The path to store the tracked video frames.' | |
'This folder should contain one folder per sequence.') | |
flags.DEFINE_string('sequence', '', 'The sequence ID to evaluate on.') | |
flags.DEFINE_string( | |
'dataset', 'kitti_step', 'The specified dataset is used' | |
' to interpret the labels. Supported options are: ' + | |
', '.join(dataset.MAP_NAMES)) | |
flags.DEFINE_string('optical_flow', None, | |
'The path to the optical flow predictions. This folder ' | |
'should contain one folder per sequence.') | |
_LABEL_DIVISOR = 10000 | |
_OCCLUSION_EXT = '.occ_forward' | |
_FLOW_EXT = '.flow_forward' | |
def _format_output(output, indent=4): | |
"""Formats `output`, either on one line, or indented across multiple lines.""" | |
formatted = pprint.pformat(output) | |
lines = formatted.splitlines() | |
if len(lines) == 1: | |
return formatted | |
lines = [' ' * indent + line for line in lines] | |
return '\n' + '\n'.join(lines) | |
def _compute_mask_iou(instance_a: np.ndarray, instance_b: np.ndarray) -> int: | |
"""Computes the IoU of two binary masks.""" | |
intersection = np.count_nonzero( | |
np.logical_and(instance_a > 0, instance_b > 0).astype(np.uint8)) | |
non_intersection_a = np.count_nonzero(instance_a > 0) - intersection | |
non_intersection_b = np.count_nonzero(instance_b > 0) - intersection | |
return intersection / ( | |
intersection + non_intersection_a + non_intersection_b) | |
class IoUTracker(object): | |
"""This class computes track IDs based on IoU overlap.""" | |
def __init__(self, | |
classes_to_track: List[int], | |
label_divisor: int, | |
sigma=10, | |
iou_threshold=0.3): | |
"""Initializes the tracker. | |
Args: | |
classes_to_track: A list of class IDs that should be tracked. | |
label_divisor: The divisor to split the label map into semantic classes | |
and instance IDs. | |
sigma: An integer specifying the number of frames that tracks should be | |
kept active while being discontinued. | |
iou_threshold: A float specifying the minimum IoU value for a match. | |
""" | |
self._sigma = sigma | |
self._iou_threshold = iou_threshold | |
self._classes_to_track = classes_to_track | |
self._label_divisor = label_divisor | |
self.reset_states() | |
def reset_states(self): | |
"""Resets all tracking states.""" | |
self._last_mask_per_track = { | |
i: collections.OrderedDict() for i in self._classes_to_track | |
} | |
self._frames_since_last_update = { | |
i: collections.OrderedDict() for i in self._classes_to_track | |
} | |
# `0` is reserved for `crowd`. | |
self._next_track_id = 1 | |
def _add_track(self, object_mask: np.ndarray, class_index: int): | |
"""Adds a new track.""" | |
track_id = self._next_track_id | |
self._last_mask_per_track[class_index][track_id] = object_mask | |
self._frames_since_last_update[class_index][track_id] = 0 | |
self._next_track_id += 1 | |
def _remove_track(self, track_id: int, class_index: int): | |
"""Removes a track.""" | |
del self._last_mask_per_track[class_index][track_id] | |
del self._frames_since_last_update[class_index][track_id] | |
def _increase_inactivity_of_track(self, track_id: int, class_index: int): | |
"""Increases inactivity of track and potentially remove it.""" | |
self._frames_since_last_update[class_index][track_id] += 1 | |
if (self._frames_since_last_update[class_index][track_id] > | |
self._sigma): | |
self._remove_track(track_id, class_index) | |
def _match_instances_to_tracks( | |
self, instances: List[np.ndarray], class_index: int, | |
instances_with_track_id: np.ndarray, | |
warped_instances: List[np.ndarray]) -> np.ndarray: | |
"""Match instances to tracks and update tracks accordingly.""" | |
track_ids = list(self._last_mask_per_track[class_index].keys()) | |
# Match instances to tracks based on IoU overlap. | |
if warped_instances: | |
matches, unmatched_instances, unmatched_tracks = ( | |
self._associate_instances_to_tracks(warped_instances, class_index)) | |
else: | |
matches, unmatched_instances, unmatched_tracks = ( | |
self._associate_instances_to_tracks(instances, class_index)) | |
# Extend existing tracks. | |
for instance_index, track_id_index in matches: | |
track_id = track_ids[track_id_index] | |
instance_mask = instances[instance_index] | |
self._last_mask_per_track[class_index][track_id] = instance_mask | |
self._frames_since_last_update[class_index][track_id] = 0 | |
instances_with_track_id[instance_mask] = track_id | |
# Add new tracks. | |
for instance_index in unmatched_instances: | |
instance_mask = instances[instance_index] | |
self._add_track(instance_mask, class_index) | |
instances_with_track_id[instance_mask] = self._next_track_id - 1 | |
# Remove tracks that are inactive for more than `sigma` frames. | |
for track_id_index in unmatched_tracks: | |
track_id = track_ids[track_id_index] | |
self._increase_inactivity_of_track(track_id, class_index) | |
return instances_with_track_id | |
def update(self, predicted_frame: np.ndarray, | |
predicted_flow: Optional[np.ndarray], | |
predicted_occlusion: Optional[np.ndarray]) -> np.ndarray: | |
"""Updates the tracking states and computes the track IDs. | |
Args: | |
predicted_frame: The panoptic label map for a particular video frame. | |
predicted_flow: An optional np.array containing the optical flow. | |
predicted_occlusion: An optional np.array containing the predicted | |
occlusion map. | |
Returns: | |
The updated panoptic label map for the input frame containing track IDs. | |
""" | |
predicted_classes = predicted_frame // self._label_divisor | |
predicted_instances = predicted_frame % self._label_divisor | |
instances_with_track_id = np.zeros_like(predicted_instances) | |
for class_index in self._classes_to_track: | |
instances_mask = np.logical_and(predicted_classes == class_index, | |
predicted_instances > 0) | |
instance_ids = np.unique(predicted_instances[instances_mask]) | |
instances = [ | |
np.logical_and(instances_mask, predicted_instances == i) | |
for i in instance_ids | |
] | |
# If current class has no instances, check if tracks needs to be removed, | |
# because they are inactive for more than `sigma` frames. | |
if not instances: | |
immutable_key_list = list(self._frames_since_last_update[class_index]) | |
for track_id in immutable_key_list: | |
self._increase_inactivity_of_track(track_id, class_index) | |
continue | |
# If there are no tracks recorded yet, all all instances as new tracks. | |
if not self._last_mask_per_track[class_index]: | |
for instance_mask in instances: | |
self._add_track(instance_mask, class_index) | |
instances_with_track_id[instance_mask] = self._next_track_id - 1 | |
else: | |
# If optical flow is used, warp all instances. | |
warped_instances = [] | |
if predicted_occlusion is not None and predicted_flow is not None: | |
for instance in instances: | |
warped_instance = optical_flow_utils.warp_flow( | |
instance.astype(np.float32), predicted_flow) | |
warped_instances.append( | |
optical_flow_utils.remove_occlusions(warped_instance, | |
predicted_occlusion)) | |
instances_with_track_id = self._match_instances_to_tracks( | |
instances, class_index, instances_with_track_id, warped_instances) | |
if self._next_track_id >= self._label_divisor: | |
raise ValueError('To many tracks were detected for the given ' | |
'label_divisor. Please increase the label_divisor to ' | |
'make sure that the track Ids are less than the ' | |
'label_divisor.') | |
return predicted_classes * self._label_divisor + instances_with_track_id | |
def _associate_instances_to_tracks( | |
self, instances: List[np.ndarray], | |
class_index: int) -> Tuple[List[Tuple[int, int]], List[int], List[int]]: | |
"""Matches the instances to existing tracks. | |
Args: | |
instances: A list of numpy arrays specifying the instance masks. | |
class_index: An integer specifying the class index. | |
Returns: | |
A tuple of Lists: | |
- Containing all indices of matches between instances and tracks. | |
- Containing all indices of unmatched instances. | |
- Containing all indices of unmatched tracks. | |
""" | |
number_of_instances = len(instances) | |
number_of_tracks = len(self._last_mask_per_track[class_index]) | |
iou_matrix = np.zeros((number_of_instances, number_of_tracks)) | |
for i, instance_mask in enumerate(instances): | |
for j, last_mask in enumerate( | |
self._last_mask_per_track[class_index].values()): | |
iou_matrix[i, j] = _compute_mask_iou(instance_mask, last_mask) | |
matches_indices = np.stack( | |
list(optimize.linear_sum_assignment(-iou_matrix)), axis=1) | |
unmatched_instances = [ | |
inst_id for inst_id in range(number_of_instances) | |
if inst_id not in matches_indices[:, 0] | |
] | |
unmatched_tracks = [ | |
inst_id for inst_id in range(number_of_tracks) | |
if inst_id not in matches_indices[:, 1] | |
] | |
list_of_matches = [] | |
for m in matches_indices: | |
if iou_matrix[m[0], m[1]] > self._iou_threshold: | |
list_of_matches.append(m) | |
else: | |
unmatched_instances.append(m[0]) | |
unmatched_tracks.append(m[1]) | |
return list_of_matches, unmatched_instances, unmatched_tracks | |
def read_panoptic_image(path: Text, label_divisor: int) -> np.ndarray: | |
"""Reads in a panoptic image in 2 channel format and returns as np array.""" | |
with tf.io.gfile.GFile(path, 'rb') as f: | |
image = tf.cast(tf.io.decode_image(f.read()), tf.int32).numpy() | |
return image[..., 0] * label_divisor + image[..., 1] | |
def read_numpy_tensor(path: Text) -> np.ndarray: | |
"""Reads a numpy array from `path` and returns it.""" | |
with tf.io.gfile.GFile(path, 'rb') as f: | |
return np.load(f) | |
def main(unused_args): | |
if FLAGS.dataset not in dataset.MAP_NAME_TO_DATASET_INFO: | |
raise ValueError('Given dataset option is not a valid dataset. Please use ' | |
'--help to see available options.') | |
dataset_info = dataset.MAP_NAME_TO_DATASET_INFO[FLAGS.dataset] | |
thing_classes = dataset_info.class_has_instances_list | |
ignore_label = dataset_info.ignore_label | |
num_classes = dataset_info.num_classes | |
colormap_name = dataset_info.colormap | |
use_optical_flow = FLAGS.optical_flow is not None | |
# Create Tracker and metric. | |
tracker = IoUTracker(thing_classes, _LABEL_DIVISOR) | |
metric = stq.STQuality(num_classes, thing_classes, ignore_label, | |
_LABEL_DIVISOR, 256*256*256) | |
# Get ground-truth files. | |
for gt_sequence_folder in tf.io.gfile.glob(os.path.join(FLAGS.gt, '*')): | |
tracker.reset_states() | |
color_map = dict() | |
sequence = os.path.basename(gt_sequence_folder) | |
if FLAGS.sequence and FLAGS.sequence != sequence: | |
continue | |
pred_sequence_folder = os.path.join(FLAGS.pred, sequence) | |
if use_optical_flow: | |
optical_flow_sequence_folder = os.path.join(FLAGS.optical_flow, sequence) | |
for gt_frame_path in sorted(tf.io.gfile.glob( | |
os.path.join(gt_sequence_folder, '*.png'))): | |
gt_frame_name = gt_frame_path.split('/')[-1] | |
pred_frame_name = os.path.join(pred_sequence_folder, gt_frame_name) | |
flow = None | |
occlusion = None | |
logging.info('Processing sequence %s: frame %s.', sequence, gt_frame_name) | |
gt_frame = read_panoptic_image(gt_frame_path, _LABEL_DIVISOR) | |
pred_frame = read_panoptic_image(pred_frame_name, _LABEL_DIVISOR) | |
if use_optical_flow: | |
frame_id = int(os.path.splitext(gt_frame_name)[0]) | |
flow_path = os.path.join(optical_flow_sequence_folder, | |
'%06d%s' % (frame_id - 1, _FLOW_EXT)) | |
occlusion_path = os.path.join(optical_flow_sequence_folder, | |
'%06d%s' % (frame_id - 1, _OCCLUSION_EXT)) | |
if tf.io.gfile.exists(flow_path): | |
flow = read_numpy_tensor(flow_path) | |
occlusion = read_numpy_tensor(occlusion_path)[0, ..., 0] | |
else: | |
logging.info('Could not find optical flow for current frame.') | |
h, w = gt_frame.shape | |
flow = np.zeros_like((h, w, 2), np.float32) | |
occlusion = np.zeros_like((h, w), np.float32) | |
pred_frame = tracker.update(pred_frame, flow, occlusion) | |
if FLAGS.output: | |
output_folder = os.path.join(FLAGS.output, sequence) | |
tf.io.gfile.makedirs(output_folder) | |
color_map = vis_utils.save_parsing_result(pred_frame, _LABEL_DIVISOR, | |
thing_classes, output_folder, | |
os.path.splitext( | |
gt_frame_name)[0], | |
color_map, | |
colormap_name=colormap_name) | |
metric.update_state( | |
tf.convert_to_tensor(gt_frame), tf.convert_to_tensor(pred_frame), | |
sequence) | |
logging.info('Final results:') | |
logging.info(_format_output(metric.result())) | |
if __name__ == '__main__': | |
flags.mark_flags_as_required(['gt', 'pred']) | |
app.run(main) | |