Leffa / detectron2 /modeling /meta_arch /dense_detector.py
franciszzj's picture
init code
b213d84
raw
history blame
11.8 kB
import numpy as np
from typing import Dict, List, Optional, Tuple
import torch
from torch import Tensor, nn
from detectron2.data.detection_utils import convert_image_to_rgb
from detectron2.layers import move_device_like
from detectron2.modeling import Backbone
from detectron2.structures import Boxes, ImageList, Instances
from detectron2.utils.events import get_event_storage
from ..postprocessing import detector_postprocess
def permute_to_N_HWA_K(tensor, K: int):
"""
Transpose/reshape a tensor from (N, (Ai x K), H, W) to (N, (HxWxAi), K)
"""
assert tensor.dim() == 4, tensor.shape
N, _, H, W = tensor.shape
tensor = tensor.view(N, -1, K, H, W)
tensor = tensor.permute(0, 3, 4, 1, 2)
tensor = tensor.reshape(N, -1, K) # Size=(N,HWA,K)
return tensor
class DenseDetector(nn.Module):
"""
Base class for dense detector. We define a dense detector as a fully-convolutional model that
makes per-pixel (i.e. dense) predictions.
"""
def __init__(
self,
backbone: Backbone,
head: nn.Module,
head_in_features: Optional[List[str]] = None,
*,
pixel_mean,
pixel_std,
):
"""
Args:
backbone: backbone module
head: head module
head_in_features: backbone features to use in head. Default to all backbone features.
pixel_mean (Tuple[float]):
Values to be used for image normalization (BGR order).
To train on images of different number of channels, set different mean & std.
Default values are the mean pixel value from ImageNet: [103.53, 116.28, 123.675]
pixel_std (Tuple[float]):
When using pre-trained models in Detectron1 or any MSRA models,
std has been absorbed into its conv1 weights, so the std needs to be set 1.
Otherwise, you can use [57.375, 57.120, 58.395] (ImageNet std)
"""
super().__init__()
self.backbone = backbone
self.head = head
if head_in_features is None:
shapes = self.backbone.output_shape()
self.head_in_features = sorted(shapes.keys(), key=lambda x: shapes[x].stride)
else:
self.head_in_features = head_in_features
self.register_buffer("pixel_mean", torch.tensor(pixel_mean).view(-1, 1, 1), False)
self.register_buffer("pixel_std", torch.tensor(pixel_std).view(-1, 1, 1), False)
@property
def device(self):
return self.pixel_mean.device
def _move_to_current_device(self, x):
return move_device_like(x, self.pixel_mean)
def forward(self, batched_inputs: List[Dict[str, Tensor]]):
"""
Args:
batched_inputs: a list, batched outputs of :class:`DatasetMapper` .
Each item in the list contains the inputs for one image.
For now, each item in the list is a dict that contains:
* image: Tensor, image in (C, H, W) format.
* instances: Instances
Other information that's included in the original dicts, such as:
* "height", "width" (int): the output resolution of the model, used in inference.
See :meth:`postprocess` for details.
Returns:
In training, dict[str, Tensor]: mapping from a named loss to a tensor storing the
loss. Used during training only. In inference, the standard output format, described
in :doc:`/tutorials/models`.
"""
images = self.preprocess_image(batched_inputs)
features = self.backbone(images.tensor)
features = [features[f] for f in self.head_in_features]
predictions = self.head(features)
if self.training:
assert not torch.jit.is_scripting(), "Not supported"
assert "instances" in batched_inputs[0], "Instance annotations are missing in training!"
gt_instances = [x["instances"].to(self.device) for x in batched_inputs]
return self.forward_training(images, features, predictions, gt_instances)
else:
results = self.forward_inference(images, features, predictions)
if torch.jit.is_scripting():
return results
processed_results = []
for results_per_image, input_per_image, image_size in zip(
results, batched_inputs, images.image_sizes
):
height = input_per_image.get("height", image_size[0])
width = input_per_image.get("width", image_size[1])
r = detector_postprocess(results_per_image, height, width)
processed_results.append({"instances": r})
return processed_results
def forward_training(self, images, features, predictions, gt_instances):
raise NotImplementedError()
def preprocess_image(self, batched_inputs: List[Dict[str, Tensor]]):
"""
Normalize, pad and batch the input images.
"""
images = [self._move_to_current_device(x["image"]) for x in batched_inputs]
images = [(x - self.pixel_mean) / self.pixel_std for x in images]
images = ImageList.from_tensors(
images,
self.backbone.size_divisibility,
padding_constraints=self.backbone.padding_constraints,
)
return images
def _transpose_dense_predictions(
self, predictions: List[List[Tensor]], dims_per_anchor: List[int]
) -> List[List[Tensor]]:
"""
Transpose the dense per-level predictions.
Args:
predictions: a list of outputs, each is a list of per-level
predictions with shape (N, Ai x K, Hi, Wi), where N is the
number of images, Ai is the number of anchors per location on
level i, K is the dimension of predictions per anchor.
dims_per_anchor: the value of K for each predictions. e.g. 4 for
box prediction, #classes for classification prediction.
Returns:
List[List[Tensor]]: each prediction is transposed to (N, Hi x Wi x Ai, K).
"""
assert len(predictions) == len(dims_per_anchor)
res: List[List[Tensor]] = []
for pred, dim_per_anchor in zip(predictions, dims_per_anchor):
pred = [permute_to_N_HWA_K(x, dim_per_anchor) for x in pred]
res.append(pred)
return res
def _ema_update(self, name: str, value: float, initial_value: float, momentum: float = 0.9):
"""
Apply EMA update to `self.name` using `value`.
This is mainly used for loss normalizer. In Detectron1, loss is normalized by number
of foreground samples in the batch. When batch size is 1 per GPU, #foreground has a
large variance and using it lead to lower performance. Therefore we maintain an EMA of
#foreground to stabilize the normalizer.
Args:
name: name of the normalizer
value: the new value to update
initial_value: the initial value to start with
momentum: momentum of EMA
Returns:
float: the updated EMA value
"""
if hasattr(self, name):
old = getattr(self, name)
else:
old = initial_value
new = old * momentum + value * (1 - momentum)
setattr(self, name, new)
return new
def _decode_per_level_predictions(
self,
anchors: Boxes,
pred_scores: Tensor,
pred_deltas: Tensor,
score_thresh: float,
topk_candidates: int,
image_size: Tuple[int, int],
) -> Instances:
"""
Decode boxes and classification predictions of one featuer level, by
the following steps:
1. filter the predictions based on score threshold and top K scores.
2. transform the box regression outputs
3. return the predicted scores, classes and boxes
Args:
anchors: Boxes, anchor for this feature level
pred_scores: HxWxA,K
pred_deltas: HxWxA,4
Returns:
Instances: with field "scores", "pred_boxes", "pred_classes".
"""
# Apply two filtering to make NMS faster.
# 1. Keep boxes with confidence score higher than threshold
keep_idxs = pred_scores > score_thresh
pred_scores = pred_scores[keep_idxs]
topk_idxs = torch.nonzero(keep_idxs) # Kx2
# 2. Keep top k top scoring boxes only
topk_idxs_size = topk_idxs.shape[0]
if isinstance(topk_idxs_size, Tensor):
# It's a tensor in tracing
num_topk = torch.clamp(topk_idxs_size, max=topk_candidates)
else:
num_topk = min(topk_idxs_size, topk_candidates)
pred_scores, idxs = pred_scores.topk(num_topk)
topk_idxs = topk_idxs[idxs]
anchor_idxs, classes_idxs = topk_idxs.unbind(dim=1)
pred_boxes = self.box2box_transform.apply_deltas(
pred_deltas[anchor_idxs], anchors.tensor[anchor_idxs]
)
return Instances(
image_size, pred_boxes=Boxes(pred_boxes), scores=pred_scores, pred_classes=classes_idxs
)
def _decode_multi_level_predictions(
self,
anchors: List[Boxes],
pred_scores: List[Tensor],
pred_deltas: List[Tensor],
score_thresh: float,
topk_candidates: int,
image_size: Tuple[int, int],
) -> Instances:
"""
Run `_decode_per_level_predictions` for all feature levels and concat the results.
"""
predictions = [
self._decode_per_level_predictions(
anchors_i,
box_cls_i,
box_reg_i,
score_thresh,
topk_candidates,
image_size,
)
# Iterate over every feature level
for box_cls_i, box_reg_i, anchors_i in zip(pred_scores, pred_deltas, anchors)
]
return predictions[0].cat(predictions) # 'Instances.cat' is not scriptale but this is
def visualize_training(self, batched_inputs, results):
"""
A function used to visualize ground truth images and final network predictions.
It shows ground truth bounding boxes on the original image and up to 20
predicted object bounding boxes on the original image.
Args:
batched_inputs (list): a list that contains input to the model.
results (List[Instances]): a list of #images elements returned by forward_inference().
"""
from detectron2.utils.visualizer import Visualizer
assert len(batched_inputs) == len(
results
), "Cannot visualize inputs and results of different sizes"
storage = get_event_storage()
max_boxes = 20
image_index = 0 # only visualize a single image
img = batched_inputs[image_index]["image"]
img = convert_image_to_rgb(img.permute(1, 2, 0), self.input_format)
v_gt = Visualizer(img, None)
v_gt = v_gt.overlay_instances(boxes=batched_inputs[image_index]["instances"].gt_boxes)
anno_img = v_gt.get_image()
processed_results = detector_postprocess(results[image_index], img.shape[0], img.shape[1])
predicted_boxes = processed_results.pred_boxes.tensor.detach().cpu().numpy()
v_pred = Visualizer(img, None)
v_pred = v_pred.overlay_instances(boxes=predicted_boxes[0:max_boxes])
prop_img = v_pred.get_image()
vis_img = np.vstack((anno_img, prop_img))
vis_img = vis_img.transpose(2, 0, 1)
vis_name = f"Top: GT bounding boxes; Bottom: {max_boxes} Highest Scoring Results"
storage.put_image(vis_name, vis_img)