Spaces:
Build error
Build error
import torch.nn as nn | |
import torch | |
from pathlib import Path | |
import numpy as np | |
from itertools import islice | |
import torchvision.transforms as transforms | |
import cv2 | |
import sys | |
import torchvision.transforms as T | |
from collections import OrderedDict, namedtuple | |
import gdown | |
from os.path import exists as file_exists | |
from yolov5.utils.general import LOGGER, check_version, check_requirements | |
from trackers.strong_sort.deep.reid_model_factory import (show_downloadeable_models, get_model_url, get_model_name, | |
download_url, load_pretrained_weights) | |
from trackers.strong_sort.deep.models import build_model | |
def check_suffix(file='yolov5s.pt', suffix=('.pt',), msg=''): | |
# Check file(s) for acceptable suffix | |
if file and suffix: | |
if isinstance(suffix, str): | |
suffix = [suffix] | |
for f in file if isinstance(file, (list, tuple)) else [file]: | |
s = Path(f).suffix.lower() # file suffix | |
if len(s): | |
assert s in suffix, f"{msg}{f} acceptable suffix is {suffix}" | |
class ReIDDetectMultiBackend(nn.Module): | |
# ReID models MultiBackend class for python inference on various backends | |
def __init__(self, weights='osnet_x0_25_msmt17.pt', device=torch.device('cpu'), fp16=False): | |
super().__init__() | |
w = weights[0] if isinstance(weights, list) else weights | |
self.pt, self.jit, self.onnx, self.xml, self.engine, self.coreml, self.saved_model, \ | |
self.pb, self.tflite, self.edgetpu, self.tfjs, self.paddle = self.model_type(w) # get backend | |
self.fp16 = fp16 | |
self.fp16 &= self.pt or self.jit or self.engine # FP16 | |
# Build transform functions | |
self.device = device | |
self.image_size=(256, 128) | |
self.pixel_mean=[0.485, 0.456, 0.406] | |
self.pixel_std=[0.229, 0.224, 0.225] | |
self.transforms = [] | |
self.transforms += [T.Resize(self.image_size)] | |
self.transforms += [T.ToTensor()] | |
self.transforms += [T.Normalize(mean=self.pixel_mean, std=self.pixel_std)] | |
self.preprocess = T.Compose(self.transforms) | |
self.to_pil = T.ToPILImage() | |
model_name = get_model_name(w) | |
if w.suffix == '.pt': | |
model_url = get_model_url(w) | |
if not file_exists(w) and model_url is not None: | |
gdown.download(model_url, str(w), quiet=False) | |
elif file_exists(w): | |
pass | |
else: | |
print(f'No URL associated to the chosen StrongSORT weights ({w}). Choose between:') | |
show_downloadeable_models() | |
exit() | |
# Build model | |
self.model = build_model( | |
model_name, | |
num_classes=1, | |
pretrained=not (w and w.is_file()), | |
use_gpu=device | |
) | |
if self.pt: # PyTorch | |
# populate model arch with weights | |
if w and w.is_file() and w.suffix == '.pt': | |
load_pretrained_weights(self.model, w) | |
self.model.to(device).eval() | |
self.model.half() if self.fp16 else self.model.float() | |
elif self.jit: | |
LOGGER.info(f'Loading {w} for TorchScript inference...') | |
self.model = torch.jit.load(w) | |
self.model.half() if self.fp16 else self.model.float() | |
elif self.onnx: # ONNX Runtime | |
LOGGER.info(f'Loading {w} for ONNX Runtime inference...') | |
cuda = torch.cuda.is_available() and device.type != 'cpu' | |
#check_requirements(('onnx', 'onnxruntime-gpu' if cuda else 'onnxruntime')) | |
import onnxruntime | |
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if cuda else ['CPUExecutionProvider'] | |
self.session = onnxruntime.InferenceSession(str(w), providers=providers) | |
elif self.engine: # TensorRT | |
LOGGER.info(f'Loading {w} for TensorRT inference...') | |
import tensorrt as trt # https://developer.nvidia.com/nvidia-tensorrt-download | |
check_version(trt.__version__, '7.0.0', hard=True) # require tensorrt>=7.0.0 | |
if device.type == 'cpu': | |
device = torch.device('cuda:0') | |
Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr')) | |
logger = trt.Logger(trt.Logger.INFO) | |
with open(w, 'rb') as f, trt.Runtime(logger) as runtime: | |
self.model_ = runtime.deserialize_cuda_engine(f.read()) | |
self.context = self.model_.create_execution_context() | |
self.bindings = OrderedDict() | |
self.fp16 = False # default updated below | |
dynamic = False | |
for index in range(self.model_.num_bindings): | |
name = self.model_.get_binding_name(index) | |
dtype = trt.nptype(self.model_.get_binding_dtype(index)) | |
if self.model_.binding_is_input(index): | |
if -1 in tuple(self.model_.get_binding_shape(index)): # dynamic | |
dynamic = True | |
self.context.set_binding_shape(index, tuple(self.model_.get_profile_shape(0, index)[2])) | |
if dtype == np.float16: | |
self.fp16 = True | |
shape = tuple(self.context.get_binding_shape(index)) | |
im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device) | |
self.bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr())) | |
self.binding_addrs = OrderedDict((n, d.ptr) for n, d in self.bindings.items()) | |
batch_size = self.bindings['images'].shape[0] # if dynamic, this is instead max batch size | |
elif self.xml: # OpenVINO | |
LOGGER.info(f'Loading {w} for OpenVINO inference...') | |
check_requirements(('openvino',)) # requires openvino-dev: https://pypi.org/project/openvino-dev/ | |
from openvino.runtime import Core, Layout, get_batch | |
ie = Core() | |
if not Path(w).is_file(): # if not *.xml | |
w = next(Path(w).glob('*.xml')) # get *.xml file from *_openvino_model dir | |
network = ie.read_model(model=w, weights=Path(w).with_suffix('.bin')) | |
if network.get_parameters()[0].get_layout().empty: | |
network.get_parameters()[0].set_layout(Layout("NCWH")) | |
batch_dim = get_batch(network) | |
if batch_dim.is_static: | |
batch_size = batch_dim.get_length() | |
self.executable_network = ie.compile_model(network, device_name="CPU") # device_name="MYRIAD" for Intel NCS2 | |
self.output_layer = next(iter(self.executable_network.outputs)) | |
elif self.tflite: | |
LOGGER.info(f'Loading {w} for TensorFlow Lite inference...') | |
try: # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu | |
from tflite_runtime.interpreter import Interpreter, load_delegate | |
except ImportError: | |
import tensorflow as tf | |
Interpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegate, | |
self.interpreter = tf.lite.Interpreter(model_path=w) | |
self.interpreter.allocate_tensors() | |
# Get input and output tensors. | |
self.input_details = self.interpreter.get_input_details() | |
self.output_details = self.interpreter.get_output_details() | |
# Test model on random input data. | |
input_data = np.array(np.random.random_sample((1,256,128,3)), dtype=np.float32) | |
self.interpreter.set_tensor(self.input_details[0]['index'], input_data) | |
self.interpreter.invoke() | |
# The function `get_tensor()` returns a copy of the tensor data. | |
output_data = self.interpreter.get_tensor(self.output_details[0]['index']) | |
else: | |
print('This model framework is not supported yet!') | |
exit() | |
def model_type(p='path/to/model.pt'): | |
# Return model type from model path, i.e. path='path/to/model.onnx' -> type=onnx | |
from export import export_formats | |
sf = list(export_formats().Suffix) # export suffixes | |
check_suffix(p, sf) # checks | |
types = [s in Path(p).name for s in sf] | |
types[8] &= not types[9] # tflite &= not edgetpu | |
return types | |
def _preprocess(self, im_batch): | |
images = [] | |
for element in im_batch: | |
image = self.to_pil(element) | |
image = self.preprocess(image) | |
images.append(image) | |
images = torch.stack(images, dim=0) | |
images = images.to(self.device) | |
return images | |
def forward(self, im_batch): | |
# preprocess batch | |
im_batch = self._preprocess(im_batch) | |
# batch to half | |
if self.fp16 and im_batch.dtype != torch.float16: | |
im_batch = im_batch.half() | |
# batch processing | |
features = [] | |
if self.pt: | |
features = self.model(im_batch) | |
elif self.jit: # TorchScript | |
features = self.model(im_batch) | |
elif self.onnx: # ONNX Runtime | |
im_batch = im_batch.cpu().numpy() # torch to numpy | |
features = self.session.run([self.session.get_outputs()[0].name], {self.session.get_inputs()[0].name: im_batch})[0] | |
elif self.engine: # TensorRT | |
if True and im_batch.shape != self.bindings['images'].shape: | |
i_in, i_out = (self.model_.get_binding_index(x) for x in ('images', 'output')) | |
self.context.set_binding_shape(i_in, im_batch.shape) # reshape if dynamic | |
self.bindings['images'] = self.bindings['images']._replace(shape=im_batch.shape) | |
self.bindings['output'].data.resize_(tuple(self.context.get_binding_shape(i_out))) | |
s = self.bindings['images'].shape | |
assert im_batch.shape == s, f"input size {im_batch.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}" | |
self.binding_addrs['images'] = int(im_batch.data_ptr()) | |
self.context.execute_v2(list(self.binding_addrs.values())) | |
features = self.bindings['output'].data | |
elif self.xml: # OpenVINO | |
im_batch = im_batch.cpu().numpy() # FP32 | |
features = self.executable_network([im_batch])[self.output_layer] | |
else: | |
print('Framework not supported at the moment, we are working on it...') | |
exit() | |
if isinstance(features, (list, tuple)): | |
return self.from_numpy(features[0]) if len(features) == 1 else [self.from_numpy(x) for x in features] | |
else: | |
return self.from_numpy(features) | |
def from_numpy(self, x): | |
return torch.from_numpy(x).to(self.device) if isinstance(x, np.ndarray) else x | |
def warmup(self, imgsz=[(256, 128, 3)]): | |
# Warmup model by running inference once | |
warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb | |
if any(warmup_types) and self.device.type != 'cpu': | |
im = [np.empty(*imgsz).astype(np.uint8)] # input | |
for _ in range(2 if self.jit else 1): # | |
self.forward(im) # warmup |