File size: 6,674 Bytes
b213d84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# Copyright (c) Facebook, Inc. and its affiliates.
import logging
import numpy as np
from itertools import count
import torch
from caffe2.proto import caffe2_pb2
from caffe2.python import core
from .caffe2_modeling import META_ARCH_CAFFE2_EXPORT_TYPE_MAP, convert_batched_inputs_to_c2_format
from .shared import ScopedWS, get_pb_arg_vali, get_pb_arg_vals, infer_device_type
logger = logging.getLogger(__name__)
# ===== ref: mobile-vision predictor's 'Caffe2Wrapper' class ======
class ProtobufModel(torch.nn.Module):
"""
Wrapper of a caffe2's protobuf model.
It works just like nn.Module, but running caffe2 under the hood.
Input/Output are tuple[tensor] that match the caffe2 net's external_input/output.
"""
_ids = count(0)
def __init__(self, predict_net, init_net):
logger.info(f"Initializing ProtobufModel for: {predict_net.name} ...")
super().__init__()
assert isinstance(predict_net, caffe2_pb2.NetDef)
assert isinstance(init_net, caffe2_pb2.NetDef)
# create unique temporary workspace for each instance
self.ws_name = "__tmp_ProtobufModel_{}__".format(next(self._ids))
self.net = core.Net(predict_net)
logger.info("Running init_net once to fill the parameters ...")
with ScopedWS(self.ws_name, is_reset=True, is_cleanup=False) as ws:
ws.RunNetOnce(init_net)
uninitialized_external_input = []
for blob in self.net.Proto().external_input:
if blob not in ws.Blobs():
uninitialized_external_input.append(blob)
ws.CreateBlob(blob)
ws.CreateNet(self.net)
self._error_msgs = set()
self._input_blobs = uninitialized_external_input
def _infer_output_devices(self, inputs):
"""
Returns:
list[str]: list of device for each external output
"""
def _get_device_type(torch_tensor):
assert torch_tensor.device.type in ["cpu", "cuda"]
assert torch_tensor.device.index == 0
return torch_tensor.device.type
predict_net = self.net.Proto()
input_device_types = {
(name, 0): _get_device_type(tensor) for name, tensor in zip(self._input_blobs, inputs)
}
device_type_map = infer_device_type(
predict_net, known_status=input_device_types, device_name_style="pytorch"
)
ssa, versions = core.get_ssa(predict_net)
versioned_outputs = [(name, versions[name]) for name in predict_net.external_output]
output_devices = [device_type_map[outp] for outp in versioned_outputs]
return output_devices
def forward(self, inputs):
"""
Args:
inputs (tuple[torch.Tensor])
Returns:
tuple[torch.Tensor]
"""
assert len(inputs) == len(self._input_blobs), (
f"Length of inputs ({len(inputs)}) "
f"doesn't match the required input blobs: {self._input_blobs}"
)
with ScopedWS(self.ws_name, is_reset=False, is_cleanup=False) as ws:
for b, tensor in zip(self._input_blobs, inputs):
ws.FeedBlob(b, tensor)
try:
ws.RunNet(self.net.Proto().name)
except RuntimeError as e:
if not str(e) in self._error_msgs:
self._error_msgs.add(str(e))
logger.warning("Encountered new RuntimeError: \n{}".format(str(e)))
logger.warning("Catch the error and use partial results.")
c2_outputs = [ws.FetchBlob(b) for b in self.net.Proto().external_output]
# Remove outputs of current run, this is necessary in order to
# prevent fetching the result from previous run if the model fails
# in the middle.
for b in self.net.Proto().external_output:
# Needs to create uninitialized blob to make the net runable.
# This is "equivalent" to: ws.RemoveBlob(b) then ws.CreateBlob(b),
# but there'no such API.
ws.FeedBlob(b, f"{b}, a C++ native class of type nullptr (uninitialized).")
# Cast output to torch.Tensor on the desired device
output_devices = (
self._infer_output_devices(inputs)
if any(t.device.type != "cpu" for t in inputs)
else ["cpu" for _ in self.net.Proto().external_output]
)
outputs = []
for name, c2_output, device in zip(
self.net.Proto().external_output, c2_outputs, output_devices
):
if not isinstance(c2_output, np.ndarray):
raise RuntimeError(
"Invalid output for blob {}, received: {}".format(name, c2_output)
)
outputs.append(torch.tensor(c2_output).to(device=device))
return tuple(outputs)
class ProtobufDetectionModel(torch.nn.Module):
"""
A class works just like a pytorch meta arch in terms of inference, but running
caffe2 model under the hood.
"""
def __init__(self, predict_net, init_net, *, convert_outputs=None):
"""
Args:
predict_net, init_net (core.Net): caffe2 nets
convert_outptus (callable): a function that converts caffe2
outputs to the same format of the original pytorch model.
By default, use the one defined in the caffe2 meta_arch.
"""
super().__init__()
self.protobuf_model = ProtobufModel(predict_net, init_net)
self.size_divisibility = get_pb_arg_vali(predict_net, "size_divisibility", 0)
self.device = get_pb_arg_vals(predict_net, "device", b"cpu").decode("ascii")
if convert_outputs is None:
meta_arch = get_pb_arg_vals(predict_net, "meta_architecture", b"GeneralizedRCNN")
meta_arch = META_ARCH_CAFFE2_EXPORT_TYPE_MAP[meta_arch.decode("ascii")]
self._convert_outputs = meta_arch.get_outputs_converter(predict_net, init_net)
else:
self._convert_outputs = convert_outputs
def _convert_inputs(self, batched_inputs):
# currently all models convert inputs in the same way
return convert_batched_inputs_to_c2_format(
batched_inputs, self.size_divisibility, self.device
)
def forward(self, batched_inputs):
c2_inputs = self._convert_inputs(batched_inputs)
c2_results = self.protobuf_model(c2_inputs)
c2_results = dict(zip(self.protobuf_model.net.Proto().external_output, c2_results))
return self._convert_outputs(batched_inputs, c2_inputs, c2_results)
|