Spaces:
Runtime error
Runtime error
# coding=utf-8 | |
# Copyright 2021 The Deeplab2 Authors. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""This file contains utility functions for the model code.""" | |
from typing import Any, List, MutableMapping, MutableSequence, Optional, Set | |
import tensorflow as tf | |
from deeplab2 import common | |
from deeplab2 import config_pb2 | |
layers = tf.keras.layers | |
_PREDICTION_WITH_NEAREST_UPSAMPLING = ( | |
common.PRED_INSTANCE_KEY, | |
common.PRED_INSTANCE_CENTER_KEY, | |
common.PRED_INSTANCE_SCORES_KEY, | |
common.PRED_PANOPTIC_KEY, | |
common.PRED_SEMANTIC_KEY, | |
common.PRED_NEXT_PANOPTIC_KEY, | |
common.PRED_CONCAT_NEXT_PANOPTIC_KEY, | |
common.PRED_CENTER_HEATMAP_KEY, | |
) | |
_PREDICTION_WITH_BILINEAR_UPSAMPLING = ( | |
common.PRED_SEMANTIC_PROBS_KEY, | |
common.PRED_OFFSET_MAP_KEY, | |
) | |
_INPUT_WITH_NEAREST_UPSAMPLING = ( | |
common.GT_INSTANCE_CENTER_KEY, | |
) | |
_INPUT_WITH_BILINEAR_UPSAMPLING = ( | |
common.IMAGE, | |
common.GT_INSTANCE_REGRESSION_KEY | |
) | |
def _scale_helper(value, scale): | |
if isinstance(value, tf.Tensor): | |
return tf.cast( | |
(tf.cast(value, dtype=tf.float32) - 1.0) * scale + 1.0, | |
dtype=tf.int32) | |
else: | |
return int((float(value) - 1.0) * scale + 1.0) | |
def scale_mutable_sequence(input_sequence: MutableSequence[int], | |
scale: float) -> MutableSequence[int]: | |
return [_scale_helper(x, scale) for x in input_sequence] | |
def scale_int_list(int_list, scale): | |
return [int(x * scale) for x in int_list] | |
def undo_image_preprocessing(image_in: tf.Tensor, method: str, | |
perform_crop: bool, | |
regions_to_crop: List[int], | |
output_shape: List[int]) -> tf.Tensor: | |
"""Undoes the image preprocessing. | |
In particular, this function slices out the valid regions (determined by | |
`regions_to_crop`) in the input when perform_crop is True. After | |
that, we resize the results to the desired `output_shape`. | |
Args: | |
image_in: Input image Tensor with shape [batch, height, width, n_channels]. | |
method: Image resize method. | |
perform_crop: Boolean, performing crop or not. | |
regions_to_crop: The regions to crop [height, width]. Will only apply | |
cropping at the bottom right. | |
output_shape: Desired shape after resizing [height, width]. | |
Returns: | |
Outputs after cropping (if perform_crop = True) and resizing. | |
""" | |
if perform_crop: | |
image_out = image_in[ | |
:, :regions_to_crop[0], :regions_to_crop[1], :] | |
else: | |
image_out = image_in | |
return resize_align_corners(image_out, output_shape, method=method) | |
def undo_preprocessing(input_or_prediction_dict: MutableMapping[str, Any], | |
regions_to_crop: List[int], | |
output_shape: List[int]) -> MutableMapping[str, Any]: | |
"""Undoes preprocessing for predictions. | |
Args: | |
input_or_prediction_dict: A dictionary storing different types of inputs or | |
predictions. | |
regions_to_crop: The regions to crop [height, width]. Will only apply | |
cropping at the bottom right. | |
output_shape: Desired shape after resizing [height, width]. | |
Returns: | |
inputs or predictions after cropping (if perform_crop = True) and resizing. | |
""" | |
for key in input_or_prediction_dict.keys(): | |
if key in _PREDICTION_WITH_NEAREST_UPSAMPLING or key in _INPUT_WITH_NEAREST_UPSAMPLING: | |
input_or_prediction_dict[key] = tf.squeeze( | |
undo_image_preprocessing( | |
tf.expand_dims(input_or_prediction_dict[key], 3), | |
'nearest', | |
perform_crop=True, | |
regions_to_crop=regions_to_crop, | |
output_shape=output_shape), | |
axis=3) | |
elif key in _PREDICTION_WITH_BILINEAR_UPSAMPLING or key in _INPUT_WITH_BILINEAR_UPSAMPLING: | |
input_or_prediction_dict[key] = undo_image_preprocessing( | |
input_or_prediction_dict[key], | |
'bilinear', | |
perform_crop=True, | |
regions_to_crop=regions_to_crop, | |
output_shape=output_shape) | |
else: | |
# We only undo preprocessing for those defined in | |
# _{PREDICTION,INPUT}_WITH_{NEAREST,BILINEAR}_UPSAMPLING. | |
# Other intermediate results are skipped. | |
continue | |
return input_or_prediction_dict | |
def add_zero_padding(input_tensor: tf.Tensor, kernel_size: int, | |
rank: int) -> tf.Tensor: | |
"""Adds zero-padding to the input_tensor.""" | |
pad_total = kernel_size - 1 | |
pad_begin = pad_total // 2 | |
pad_end = pad_total - pad_begin | |
if rank == 3: | |
return tf.pad( | |
input_tensor, | |
paddings=[[pad_begin, pad_end], [pad_begin, pad_end], [0, 0]]) | |
else: | |
return tf.pad( | |
input_tensor, | |
paddings=[[0, 0], [pad_begin, pad_end], [pad_begin, pad_end], [0, 0]]) | |
def resize_and_rescale_offsets(input_tensor: tf.Tensor, target_size): | |
"""Bilinearly resizes and rescales the offsets. | |
Args: | |
input_tensor: A tf.Tensor of shape [batch, height, width, 2]. | |
target_size: A list or tuple or 1D tf.Tensor that specifies the height and | |
width after resizing. | |
Returns: | |
The input_tensor resized to shape `[batch, target_height, target_width, 2]`. | |
Moreover, the offsets along the y-axis are rescaled by a factor equal to | |
(target_height - 1) / (reference_height - 1) and the offsets along the | |
x-axis are rescaled by a factor equal to | |
(target_width - 1) / (reference_width - 1). | |
""" | |
input_size_y = tf.shape(input_tensor)[1] | |
input_size_x = tf.shape(input_tensor)[2] | |
scale_y = tf.cast(target_size[0] - 1, tf.float32) / tf.cast( | |
input_size_y - 1, tf.float32) | |
scale_x = tf.cast(target_size[1] - 1, tf.float32) / tf.cast( | |
input_size_x - 1, tf.float32) | |
target_y, target_x = tf.split( | |
value=input_tensor, num_or_size_splits=2, axis=3) | |
target_y *= scale_y | |
target_x *= scale_x | |
target = tf.concat([target_y, target_x], 3) | |
return resize_bilinear(target, target_size) | |
def resize_align_corners(input_tensor, target_size, method='bilinear'): | |
"""Resizes the input_tensor to target_size. | |
This returns the same output as tf.compat.v1.image.resize(input_tensor, | |
target_size, align_corners=True). | |
Args: | |
input_tensor: A tf.Tensor of shape [batch, height, width, channels]. | |
target_size: A list or tuple or 1D tf.Tensor that specifies the height and | |
width after resizing. | |
method: An optional string specifying the method used for resizing. | |
Supported options are 'nearest' and 'bilinear'. | |
Returns: | |
The resized tensor. | |
Raises: | |
ValueError: An error occurs if 1) the input tensor's rank is not 4 or 2) the | |
resizing method is not supported. | |
""" | |
if method == 'bilinear': | |
tf_method = tf.compat.v1.image.ResizeMethod.BILINEAR | |
elif method == 'nearest': | |
tf_method = tf.compat.v1.image.ResizeMethod.NEAREST_NEIGHBOR | |
else: | |
raise ValueError('The given method %s is not supported. Please use bilinear' | |
' or nearest.' % method) | |
tf.debugging.assert_rank( | |
input_tensor, 4, | |
message='Input tensor to resize method should have rank of 4.') | |
return tf.compat.v1.image.resize( | |
input_tensor, | |
target_size, | |
method=tf_method, | |
align_corners=True, | |
name='resize_align_corners') | |
def resize_bilinear(images, | |
size, | |
align_corners=True, | |
name=None): | |
"""TPU memory efficient version of tf.compat.v1.image.resize_bilinear. | |
ResizeBilinear on TPU requires padded batch and channel dimensions. On a | |
TPUv3, the worst case could lead to 256x memory consumption, if the | |
input is, for example, [1, 257, 513, 1]. In this function, we replace the | |
default resize_bilinear by two resize_bilinear operations, which put one image | |
axis on the channel axis. This reduces TPU padding when batch * channel is | |
small and height * width is large. | |
Args: | |
images: Input image of shape [B, H, W, C]. | |
size: A list of two elements: [height, width]. The new size for the images. | |
align_corners: Whether to align corners of the image. | |
name: Name of the operation. | |
Returns: | |
Resized image. | |
""" | |
_, height, width, channel = images.get_shape().as_list() | |
if height == size[0] and width == size[1]: | |
return images | |
dtype = images.dtype | |
images = tf.cast(images, tf.float32) | |
# We check the channel axis only since the batch size is similar (usually 1 or | |
# 2). In this way, this if-else easily supports dynamic batch size without | |
# using tf.cond(). | |
if channel > 32 or not align_corners: | |
images = tf.compat.v1.image.resize_bilinear( | |
images, size, | |
align_corners=align_corners, | |
name=name) | |
else: | |
images = tf.transpose(images, [0, 3, 1, 2]) | |
images = tf.compat.v1.image.resize_bilinear( | |
images, [channel, size[0]], | |
align_corners=align_corners, | |
name=name + '_height' if name else None) | |
images = tf.transpose(images, [0, 1, 3, 2]) | |
images = tf.compat.v1.image.resize_bilinear( | |
images, [channel, size[1]], | |
align_corners=align_corners, | |
name=name + '_width' if name else None) | |
images = tf.transpose(images, [0, 3, 2, 1]) | |
return tf.cast(images, dtype) | |
def make_divisible(value: float, | |
divisor: int, | |
min_value: Optional[float] = None) -> int: | |
"""Ensures all layers have channels that are divisible by the divisor. | |
Args: | |
value: A `float` of original value. | |
divisor: An `int` of the divisor that needs to be checked upon. | |
min_value: A `float` of minimum value threshold. | |
Returns: | |
The adjusted value in `int` that is divisible by divisor. | |
Raises: | |
ValueError: Minimual value should be divisible by divisor. | |
""" | |
if min_value is None: | |
min_value = divisor | |
elif min_value % divisor != 0: | |
raise ValueError('Minimual value should be divisible by divisor.') | |
new_value = max(min_value, int(value + divisor / 2) // divisor * divisor) | |
# Make sure that round down does not go down by more than 10%. | |
if new_value < 0.9 * value: | |
new_value += divisor | |
return int(new_value) | |
def transpose_and_reshape_for_attention_operation(inputs): | |
"""Sequentially transposes and reshapes the tensor. | |
Args: | |
inputs: An input [batch, num_heads, length, channel] tensor. | |
Returns: | |
output: An output [batch, length, num_heads * channel] tensor. | |
""" | |
_, num_heads, length, channel = inputs.get_shape().as_list() | |
transposed_inputs = tf.transpose(inputs, [0, 2, 1, 3]) | |
return tf.reshape(transposed_inputs, [-1, length, num_heads * channel]) | |
def reshape_and_transpose_for_attention_operation(inputs, num_heads): | |
"""Sequentially reshapes and transposes the tensor. | |
Args: | |
inputs: An input [batch, length, num_heads * channel] tensor. | |
num_heads: An integer, the number of attention heads. | |
Returns: | |
output: An output [batch, num_heads, length, channel] tensor. | |
""" | |
_, length, channels = inputs.get_shape().as_list() | |
inputs = tf.reshape(inputs, [-1, length, num_heads, channels // num_heads]) | |
return tf.transpose(inputs, [0, 2, 1, 3]) | |
def get_layer_name(private_attribute_name): | |
if private_attribute_name[0] != '_': | |
raise ValueError('Private attribute name should start with a \'_\'.') | |
return private_attribute_name[1:] | |
def get_stem_current_name(index): | |
return '_basic_block{}'.format(index + 1) | |
def get_low_level_conv_fusion_conv_current_names(index): | |
return ('_low_level_conv{}'.format(index + 1), | |
'_fusion_conv{}'.format(index + 1)) | |
def get_conv_bn_act_current_name(index, use_bn, activation): | |
name = '_conv{}'.format(index + 1) | |
if use_bn: | |
name += '_bn' | |
if (activation is not None and | |
activation.lower() != 'none' and | |
activation.lower() != 'linear'): | |
name += '_act' | |
return name | |
def safe_setattr(obj, name, value): | |
"""A conflict-safe version of setattr(). | |
Different from setattr(), this function raises ValueError if the object | |
already has an attribute with the same name. | |
Args: | |
obj: An object whose attribute has to be set. | |
name: A string, the name of the attribute. | |
value: Any type, the value given to the attribute. | |
Raises: | |
ValueError: If the object already has an attribute with the same name. | |
""" | |
if hasattr(obj, name): | |
raise ValueError('The object already has an attribute with the same name.') | |
setattr(obj, name, value) | |
def pad_sequence_with_none(sequence, target_length): | |
return list(sequence) + [None] * (target_length - len(sequence)) | |
def strided_downsample(input_tensor, target_size): | |
"""Strided downsamples a tensor to the target size. | |
The stride_height and stride_width is computed by (height - 1) // | |
(target_height - 1) and (width - 1) // (target_width - 1). We raise an error | |
if stride_height != stride_width, since this is not intended in our current | |
use cases. But this check can be removed if different strides are desired. | |
This function supports static shape only. | |
Args: | |
input_tensor: A [batch, height, width] tf.Tensor to be downsampled. | |
target_size: A list of two integers, [target_height, target_width], the | |
target size after downsampling. | |
Returns: | |
output_tensor: A [batch, target_height, target_width] tf.Tensor, the | |
downsampled result. | |
Raises: | |
ValueError: If the input cannot be downsampled with integer stride, i.e., | |
(height - 1) % (target_height - 1) != 0, or (width - 1) % (target_width - | |
1) != 0. | |
ValueError: If the height axis stride does not equal to the width axis | |
stride. | |
""" | |
input_height, input_width = input_tensor.get_shape().as_list()[1:3] | |
target_height, target_width = target_size | |
if ((input_height - 1) % (target_height - 1) or | |
(input_width - 1) % (target_width - 1)): | |
raise ValueError('The input cannot be downsampled with integer striding. ' | |
'Please ensure (height - 1) % (target_height - 1) == 0 ' | |
'and (width - 1) % (target_width - 1) == 0.') | |
stride_height = (input_height - 1) // (target_height - 1) | |
stride_width = (input_width - 1) // (target_width - 1) | |
if stride_height != stride_width: | |
raise ValueError('The height axis stride does not equal to the width axis ' | |
'stride.') | |
if stride_height > 1 or stride_width > 1: | |
return input_tensor[:, ::stride_height, ::stride_width] | |
return input_tensor | |
def get_stuff_class_ids(num_thing_stuff_classes: int, | |
thing_class_ids: List[int], | |
void_label: int) -> List[int]: | |
"""Computes stuff_class_ids. | |
The stuff_class_ids are computed from the num_thing_stuff_classes, the | |
thing_class_ids and the void_label. | |
Args: | |
num_thing_stuff_classes: An integer specifying the number of stuff and thing | |
classes, not including `void` class. | |
thing_class_ids: A List of integers of length [num_thing_classes] containing | |
thing class indices. | |
void_label: An integer specifying the void label. | |
Returns: | |
stuff_class_ids: A sorted List of integers of shape [num_stuff_classes] | |
containing stuff class indices. | |
""" | |
if void_label >= num_thing_stuff_classes: | |
thing_stuff_class_ids = list(range(num_thing_stuff_classes)) | |
else: | |
thing_stuff_class_ids = [_ for _ in range(num_thing_stuff_classes + 1) | |
if _ is not void_label] | |
return sorted(set(thing_stuff_class_ids) - set(thing_class_ids)) | |
def get_supported_tasks( | |
config: config_pb2.ExperimentOptions) -> Set[str]: | |
"""Gets currently supported tasks for each meta_architecture. | |
Args: | |
config: A config_pb2.ExperimentOptions configuration. | |
Returns: | |
supported_tasks: A set of strings (see common.py), optionally | |
- common.TASK_PANOPTIC_SEGMENTATION, | |
- common.TASK_INSTANCE_SEGMENTATION, | |
- common.TASK_VIDEO_PANOPTIC_SEGMENTATION, | |
""" | |
supported_tasks = set() | |
meta_architecture = config.model_options.WhichOneof('meta_architecture') | |
is_max_deeplab = meta_architecture == 'max_deeplab' | |
is_motion_deeplab = meta_architecture == 'motion_deeplab' | |
is_panoptic_deeplab = meta_architecture == 'panoptic_deeplab' | |
is_vip_deeplab = meta_architecture == 'vip_deeplab' | |
is_panoptic = ( | |
(config.model_options.panoptic_deeplab.instance.enable and | |
is_panoptic_deeplab) or | |
is_motion_deeplab or is_max_deeplab or is_vip_deeplab) | |
if is_panoptic: | |
supported_tasks.add(common.TASK_PANOPTIC_SEGMENTATION) | |
# MaX-DeepLab does not support evaluating instance segmentation mask AP yet. | |
if not is_max_deeplab: | |
supported_tasks.add(common.TASK_INSTANCE_SEGMENTATION) | |
if is_motion_deeplab or is_vip_deeplab: | |
supported_tasks.add(common.TASK_VIDEO_PANOPTIC_SEGMENTATION) | |
if is_vip_deeplab: | |
supported_tasks.add(common.TASK_DEPTH_AWARE_VIDEO_PANOPTIC_SEGMENTATION) | |
return supported_tasks | |