Spaces:
Runtime error
Runtime error
# coding=utf-8 | |
# Copyright 2021 The Deeplab2 Authors. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""This file contains the ViP-DeepLab meta architecture.""" | |
import collections | |
import functools | |
from typing import Any, Dict, Text, Tuple | |
from absl import logging | |
import tensorflow as tf | |
from deeplab2 import common | |
from deeplab2 import config_pb2 | |
from deeplab2.data import dataset | |
from deeplab2.model import builder | |
from deeplab2.model import utils | |
from deeplab2.model.post_processor import post_processor_builder | |
from deeplab2.model.post_processor import vip_deeplab | |
_OFFSET_OUTPUT = 'offset' | |
class ViPDeepLab(tf.keras.Model): | |
"""This class represents the ViP-DeepLab meta architecture. | |
This class supports the architecture of ViP-DeepLab. | |
""" | |
def __init__(self, config: config_pb2.ExperimentOptions, | |
dataset_descriptor: dataset.DatasetDescriptor): | |
"""Initializes a ViP-DeepLab architecture. | |
Args: | |
config: A config_pb2.ExperimentOptions configuration. | |
dataset_descriptor: A dataset.DatasetDescriptor. | |
""" | |
super(ViPDeepLab, self).__init__(name='ViPDeepLab') | |
if config.trainer_options.solver_options.use_sync_batchnorm: | |
logging.info('Synchronized Batchnorm is used.') | |
bn_layer = functools.partial( | |
tf.keras.layers.experimental.SyncBatchNormalization, | |
momentum=config.trainer_options.solver_options.batchnorm_momentum, | |
epsilon=config.trainer_options.solver_options.batchnorm_epsilon) | |
else: | |
logging.info('Standard (unsynchronized) Batchnorm is used.') | |
bn_layer = functools.partial( | |
tf.keras.layers.BatchNormalization, | |
momentum=config.trainer_options.solver_options.batchnorm_momentum, | |
epsilon=config.trainer_options.solver_options.batchnorm_epsilon) | |
self._encoder = builder.create_encoder( | |
config.model_options.backbone, | |
bn_layer, | |
conv_kernel_weight_decay=( | |
config.trainer_options.solver_options.weight_decay / 2)) | |
self._decoder = builder.create_decoder(config.model_options, bn_layer, | |
dataset_descriptor.ignore_label) | |
self._post_processor = post_processor_builder.get_post_processor( | |
config, dataset_descriptor) | |
pool_size = config.train_dataset_options.crop_size | |
output_stride = float(config.model_options.backbone.output_stride) | |
pool_size = tuple( | |
utils.scale_mutable_sequence(pool_size, 1.0 / output_stride)) | |
logging.info('Setting pooling size to %s', pool_size) | |
self.set_pool_size(pool_size) | |
# Variables for multi-scale inference. | |
self._add_flipped_images = config.evaluator_options.add_flipped_images | |
if not config.evaluator_options.eval_scales: | |
self._eval_scales = [1.0] | |
else: | |
self._eval_scales = config.evaluator_options.eval_scales | |
self._label_divisor = dataset_descriptor.panoptic_label_divisor | |
def _inference(self, input_tensor: tf.Tensor, next_input_tensor: tf.Tensor, | |
training: bool) -> Dict[Text, Any]: | |
"""Performs an inference pass and returns raw predictions.""" | |
_, input_h, input_w, _ = input_tensor.get_shape().as_list() | |
result_dict = collections.defaultdict(list) | |
# Evaluation mode where one could perform multi-scale inference. | |
scale_1_pool_size = self.get_pool_size() | |
logging.info('Eval with scales %s', self._eval_scales) | |
for eval_scale in self._eval_scales: | |
# Get the scaled images/pool_size for each scale. | |
scaled_images, scaled_pool_size = ( | |
self._scale_images_and_pool_size(input_tensor, | |
list(scale_1_pool_size), eval_scale)) | |
next_scaled_images, _ = ( | |
self._scale_images_and_pool_size(next_input_tensor, | |
list(scale_1_pool_size), eval_scale)) | |
# Update the ASPP pool size for different eval scales. | |
self.set_pool_size(tuple(scaled_pool_size)) | |
logging.info('Eval scale %s; setting pooling size to %s', eval_scale, | |
scaled_pool_size) | |
pred_dict = self._decoder( | |
self._encoder(scaled_images, training=training), | |
self._encoder(next_scaled_images, training=training), | |
training=training) | |
pred_dict = self._resize_predictions( | |
pred_dict, target_h=input_h, target_w=input_w) | |
# Change the semantic logits to probabilities with softmax. Note | |
# one should remove semantic logits for faster inference. We still | |
# keep them since they will be used to compute evaluation loss. | |
pred_dict[common.PRED_SEMANTIC_PROBS_KEY] = tf.nn.softmax( | |
pred_dict[common.PRED_SEMANTIC_LOGITS_KEY]) | |
# Store the predictions from each scale. | |
for output_type, output_value in pred_dict.items(): | |
result_dict[output_type].append(output_value) | |
if self._add_flipped_images: | |
pred_dict_reverse = self._decoder( | |
self._encoder(tf.reverse(scaled_images, [2]), training=training), | |
self._encoder( | |
tf.reverse(next_scaled_images, [2]), training=training), | |
training=training) | |
pred_dict_reverse = self._resize_predictions( | |
pred_dict_reverse, target_h=input_h, target_w=input_w, reverse=True) | |
# Change the semantic logits to probabilities with softmax. | |
pred_dict_reverse[common.PRED_SEMANTIC_PROBS_KEY] = tf.nn.softmax( | |
pred_dict_reverse[common.PRED_SEMANTIC_LOGITS_KEY]) | |
# Store the predictions from each scale. | |
for output_type, output_value in pred_dict_reverse.items(): | |
result_dict[output_type].append(output_value) | |
# Set back the pool_size for scale 1.0, the original setting. | |
self.set_pool_size(tuple(scale_1_pool_size)) | |
# Average results across scales. | |
for output_type, output_value in result_dict.items(): | |
result_dict[output_type] = tf.reduce_mean( | |
tf.stack(output_value, axis=0), axis=0) | |
return result_dict | |
def call(self, | |
input_tensor: tf.Tensor, | |
training: bool = False) -> Dict[Text, Any]: | |
"""Performs a forward pass. | |
Args: | |
input_tensor: An input tensor of type tf.Tensor with shape [batch, height, | |
width, channels]. The input tensor should contain batches of RGB images | |
pairs. The channel dimension is expected to encode two RGB pixels. | |
training: A boolean flag indicating whether training behavior should be | |
used (default: False). | |
Returns: | |
A dictionary containing the results of the specified DeepLab architecture. | |
The results are bilinearly upsampled to input size before returning. | |
""" | |
# Normalize the input in the same way as Inception. We normalize it outside | |
# the encoder so that we can extend encoders to different backbones without | |
# copying the normalization to each encoder. We normalize it after data | |
# preprocessing because it is faster on TPUs than on host CPUs. The | |
# normalization should not increase TPU memory consumption because it does | |
# not require gradient. | |
input_tensor = input_tensor / 127.5 - 1.0 | |
# Get the static spatial shape of the input tensor. | |
_, input_h, input_w, _ = input_tensor.get_shape().as_list() | |
# Splits the input_tensor into the current and the next frames. | |
input_tensor, next_input_tensor = tf.split(input_tensor, 2, axis=3) | |
if training: | |
encoder_features = self._encoder(input_tensor, training=training) | |
next_encoder_features = self._encoder( | |
next_input_tensor, training=training) | |
result_dict = self._decoder( | |
encoder_features, next_encoder_features, training=training) | |
result_dict = self._resize_predictions( | |
result_dict, target_h=input_h, target_w=input_w) | |
else: | |
result_dict = self._inference(input_tensor, next_input_tensor, training) | |
# To get panoptic prediction of the next frame, we reverse the | |
# input_tensor and next_input_tensor and use them as the input. | |
# The second input can be anything. In sequence evaluation, we can wait | |
# for the results of the next pair. Here, we need to compute the panoptic | |
# predictions of the next frame to do pair evaluation. | |
# pylint: disable=arguments-out-of-order | |
next_result_dict = self._inference( | |
next_input_tensor, input_tensor, training) | |
# Here, we horizontally concat the raw predictions of the current frame | |
# and the next frame to perform two-frame panoptic post-processing. | |
concat_result_dict = collections.defaultdict(list) | |
concat_result_dict[common.PRED_SEMANTIC_PROBS_KEY] = tf.concat([ | |
result_dict[common.PRED_SEMANTIC_PROBS_KEY], | |
next_result_dict[common.PRED_SEMANTIC_PROBS_KEY] | |
], | |
axis=2) | |
concat_result_dict[common.PRED_CENTER_HEATMAP_KEY] = tf.concat([ | |
result_dict[common.PRED_CENTER_HEATMAP_KEY], | |
tf.zeros_like(next_result_dict[common.PRED_CENTER_HEATMAP_KEY]) | |
], | |
axis=2) | |
next_regression_y, next_regression_x = tf.split( | |
result_dict[common.PRED_NEXT_OFFSET_MAP_KEY], | |
num_or_size_splits=2, | |
axis=3) | |
# The predicted horizontal offsets of the next frame need to subtract the | |
# image width to point to the object centers in the current frame because | |
# the two frames are horizontally concatenated. | |
next_regression_x -= tf.constant(input_w, dtype=tf.float32) | |
next_regression = tf.concat([next_regression_y, next_regression_x], | |
axis=3) | |
concat_result_dict[common.PRED_OFFSET_MAP_KEY] = tf.concat( | |
[result_dict[common.PRED_OFFSET_MAP_KEY], next_regression], axis=2) | |
concat_result_dict.update(self._post_processor(concat_result_dict)) | |
next_result_dict.update(self._post_processor(next_result_dict)) | |
result_dict[common.PRED_NEXT_PANOPTIC_KEY] = next_result_dict[ | |
common.PRED_PANOPTIC_KEY] | |
for result_key in [ | |
common.PRED_PANOPTIC_KEY, common.PRED_SEMANTIC_KEY, | |
common.PRED_INSTANCE_KEY, common.PRED_INSTANCE_CENTER_KEY, | |
common.PRED_INSTANCE_SCORES_KEY | |
]: | |
result_dict[result_key], next_result_dict[result_key] = tf.split( | |
concat_result_dict[result_key], num_or_size_splits=2, axis=2) | |
result_dict[common.PRED_CONCAT_NEXT_PANOPTIC_KEY] = next_result_dict[ | |
common.PRED_PANOPTIC_KEY] | |
result_dict[common.PRED_NEXT_PANOPTIC_KEY] = tf.numpy_function( | |
func=vip_deeplab.stitch_video_panoptic_prediction, | |
inp=[ | |
result_dict[common.PRED_CONCAT_NEXT_PANOPTIC_KEY], | |
result_dict[common.PRED_NEXT_PANOPTIC_KEY], self._label_divisor | |
], | |
Tout=tf.int32) | |
result_dict[common.PRED_NEXT_PANOPTIC_KEY].set_shape( | |
result_dict[common.PRED_CONCAT_NEXT_PANOPTIC_KEY].get_shape()) | |
if common.PRED_CENTER_HEATMAP_KEY in result_dict: | |
result_dict[common.PRED_CENTER_HEATMAP_KEY] = tf.squeeze( | |
result_dict[common.PRED_CENTER_HEATMAP_KEY], axis=3) | |
return result_dict | |
def reset_pooling_layer(self): | |
"""Resets the ASPP pooling layer to global average pooling.""" | |
self._decoder.reset_pooling_layer() | |
def set_pool_size(self, pool_size: Tuple[int, int]): | |
"""Sets the pooling size of the ASPP pooling layer. | |
Args: | |
pool_size: A tuple specifying the pooling size of the ASPP pooling layer. | |
""" | |
self._decoder.set_pool_size(pool_size) | |
def get_pool_size(self): | |
return self._decoder.get_pool_size() | |
def checkpoint_items(self) -> Dict[Text, Any]: | |
items = dict(encoder=self._encoder) | |
items.update(self._decoder.checkpoint_items) | |
return items | |
def _resize_predictions(self, result_dict, target_h, target_w, reverse=False): | |
"""Resizes predictions to the target height and width. | |
This function resizes the items in the result_dict to the target height and | |
width. The items are optionally reversed w.r.t width if `reverse` is True. | |
Args: | |
result_dict: A dictionary storing prediction results to be resized. | |
target_h: An integer, the target height. | |
target_w: An integer, the target width. | |
reverse: A boolean, reversing the prediction result w.r.t. width. | |
Returns: | |
Resized (or optionally reversed) result_dict. | |
""" | |
for key, value in result_dict.items(): | |
if reverse: | |
value = tf.reverse(value, [2]) | |
# Special care to offsets: need to flip x-offsets. | |
if _OFFSET_OUTPUT in key: | |
offset_y, offset_x = tf.split( | |
value=value, num_or_size_splits=2, axis=3) | |
offset_x *= -1 | |
value = tf.concat([offset_y, offset_x], 3) | |
if _OFFSET_OUTPUT in key: | |
result_dict[key] = utils.resize_and_rescale_offsets( | |
value, [target_h, target_w]) | |
else: | |
result_dict[key] = utils.resize_bilinear(value, [target_h, target_w]) | |
return result_dict | |
def _scale_images_and_pool_size(self, images, pool_size, scale): | |
"""Scales images and pool_size w.r.t. | |
scale. | |
Args: | |
images: An input tensor with shape [batch, height, width, 3]. | |
pool_size: A list with two elements, specifying the pooling size of ASPP | |
pooling layer. | |
scale: A float, used to scale the input images and pool_size. | |
Returns: | |
Scaled images, and pool_size. | |
""" | |
if scale == 1.0: | |
scaled_images = images | |
scaled_pool_size = pool_size | |
else: | |
image_size = images.get_shape().as_list()[1:3] | |
scaled_image_size = utils.scale_mutable_sequence(image_size, scale) | |
scaled_images = utils.resize_bilinear(images, scaled_image_size) | |
scaled_pool_size = [None, None] | |
if pool_size != [None, None]: | |
scaled_pool_size = utils.scale_mutable_sequence(pool_size, scale) | |
return scaled_images, scaled_pool_size | |