Myogyi's picture
o
b36970b
raw
history blame
14.4 kB
#!/usr/bin/env python
import argparse
import numpy as np
import sys
import cv2
import tritonclient.grpc as grpcclient
from tritonclient.utils import InferenceServerException
from processing import preprocess, postprocess
from render import render_box, render_filled_box, get_text_size, render_text, RAND_COLORS
from labels import COCOLabels
INPUT_NAMES = ["images"]
OUTPUT_NAMES = ["num_dets", "det_boxes", "det_scores", "det_classes"]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('mode',
choices=['dummy', 'image', 'video'],
default='dummy',
help='Run mode. \'dummy\' will send an emtpy buffer to the server to test if inference works. \'image\' will process an image. \'video\' will process a video.')
parser.add_argument('input',
type=str,
nargs='?',
help='Input file to load from in image or video mode')
parser.add_argument('-m',
'--model',
type=str,
required=False,
default='yolov7',
help='Inference model name, default yolov7')
parser.add_argument('--width',
type=int,
required=False,
default=640,
help='Inference model input width, default 640')
parser.add_argument('--height',
type=int,
required=False,
default=640,
help='Inference model input height, default 640')
parser.add_argument('-u',
'--url',
type=str,
required=False,
default='localhost:8001',
help='Inference server URL, default localhost:8001')
parser.add_argument('-o',
'--out',
type=str,
required=False,
default='',
help='Write output into file instead of displaying it')
parser.add_argument('-f',
'--fps',
type=float,
required=False,
default=24.0,
help='Video output fps, default 24.0 FPS')
parser.add_argument('-i',
'--model-info',
action="store_true",
required=False,
default=False,
help='Print model status, configuration and statistics')
parser.add_argument('-v',
'--verbose',
action="store_true",
required=False,
default=False,
help='Enable verbose client output')
parser.add_argument('-t',
'--client-timeout',
type=float,
required=False,
default=None,
help='Client timeout in seconds, default no timeout')
parser.add_argument('-s',
'--ssl',
action="store_true",
required=False,
default=False,
help='Enable SSL encrypted channel to the server')
parser.add_argument('-r',
'--root-certificates',
type=str,
required=False,
default=None,
help='File holding PEM-encoded root certificates, default none')
parser.add_argument('-p',
'--private-key',
type=str,
required=False,
default=None,
help='File holding PEM-encoded private key, default is none')
parser.add_argument('-x',
'--certificate-chain',
type=str,
required=False,
default=None,
help='File holding PEM-encoded certicate chain default is none')
FLAGS = parser.parse_args()
# Create server context
try:
triton_client = grpcclient.InferenceServerClient(
url=FLAGS.url,
verbose=FLAGS.verbose,
ssl=FLAGS.ssl,
root_certificates=FLAGS.root_certificates,
private_key=FLAGS.private_key,
certificate_chain=FLAGS.certificate_chain)
except Exception as e:
print("context creation failed: " + str(e))
sys.exit()
# Health check
if not triton_client.is_server_live():
print("FAILED : is_server_live")
sys.exit(1)
if not triton_client.is_server_ready():
print("FAILED : is_server_ready")
sys.exit(1)
if not triton_client.is_model_ready(FLAGS.model):
print("FAILED : is_model_ready")
sys.exit(1)
if FLAGS.model_info:
# Model metadata
try:
metadata = triton_client.get_model_metadata(FLAGS.model)
print(metadata)
except InferenceServerException as ex:
if "Request for unknown model" not in ex.message():
print("FAILED : get_model_metadata")
print("Got: {}".format(ex.message()))
sys.exit(1)
else:
print("FAILED : get_model_metadata")
sys.exit(1)
# Model configuration
try:
config = triton_client.get_model_config(FLAGS.model)
if not (config.config.name == FLAGS.model):
print("FAILED: get_model_config")
sys.exit(1)
print(config)
except InferenceServerException as ex:
print("FAILED : get_model_config")
print("Got: {}".format(ex.message()))
sys.exit(1)
# DUMMY MODE
if FLAGS.mode == 'dummy':
print("Running in 'dummy' mode")
print("Creating emtpy buffer filled with ones...")
inputs = []
outputs = []
inputs.append(grpcclient.InferInput(INPUT_NAMES[0], [1, 3, FLAGS.width, FLAGS.height], "FP32"))
inputs[0].set_data_from_numpy(np.ones(shape=(1, 3, FLAGS.width, FLAGS.height), dtype=np.float32))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[0]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[1]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[2]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[3]))
print("Invoking inference...")
results = triton_client.infer(model_name=FLAGS.model,
inputs=inputs,
outputs=outputs,
client_timeout=FLAGS.client_timeout)
if FLAGS.model_info:
statistics = triton_client.get_inference_statistics(model_name=FLAGS.model)
if len(statistics.model_stats) != 1:
print("FAILED: get_inference_statistics")
sys.exit(1)
print(statistics)
print("Done")
for output in OUTPUT_NAMES:
result = results.as_numpy(output)
print(f"Received result buffer \"{output}\" of size {result.shape}")
print(f"Naive buffer sum: {np.sum(result)}")
# IMAGE MODE
if FLAGS.mode == 'image':
print("Running in 'image' mode")
if not FLAGS.input:
print("FAILED: no input image")
sys.exit(1)
inputs = []
outputs = []
inputs.append(grpcclient.InferInput(INPUT_NAMES[0], [1, 3, FLAGS.width, FLAGS.height], "FP32"))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[0]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[1]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[2]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[3]))
print("Creating buffer from image file...")
input_image = cv2.imread(str(FLAGS.input))
if input_image is None:
print(f"FAILED: could not load input image {str(FLAGS.input)}")
sys.exit(1)
input_image_buffer = preprocess(input_image, [FLAGS.width, FLAGS.height])
input_image_buffer = np.expand_dims(input_image_buffer, axis=0)
inputs[0].set_data_from_numpy(input_image_buffer)
print("Invoking inference...")
results = triton_client.infer(model_name=FLAGS.model,
inputs=inputs,
outputs=outputs,
client_timeout=FLAGS.client_timeout)
if FLAGS.model_info:
statistics = triton_client.get_inference_statistics(model_name=FLAGS.model)
if len(statistics.model_stats) != 1:
print("FAILED: get_inference_statistics")
sys.exit(1)
print(statistics)
print("Done")
for output in OUTPUT_NAMES:
result = results.as_numpy(output)
print(f"Received result buffer \"{output}\" of size {result.shape}")
print(f"Naive buffer sum: {np.sum(result)}")
num_dets = results.as_numpy(OUTPUT_NAMES[0])
det_boxes = results.as_numpy(OUTPUT_NAMES[1])
det_scores = results.as_numpy(OUTPUT_NAMES[2])
det_classes = results.as_numpy(OUTPUT_NAMES[3])
detected_objects = postprocess(num_dets, det_boxes, det_scores, det_classes, input_image.shape[1], input_image.shape[0], [FLAGS.width, FLAGS.height])
print(f"Detected objects: {len(detected_objects)}")
for box in detected_objects:
print(f"{COCOLabels(box.classID).name}: {box.confidence}")
input_image = render_box(input_image, box.box(), color=tuple(RAND_COLORS[box.classID % 64].tolist()))
size = get_text_size(input_image, f"{COCOLabels(box.classID).name}: {box.confidence:.2f}", normalised_scaling=0.6)
input_image = render_filled_box(input_image, (box.x1 - 3, box.y1 - 3, box.x1 + size[0], box.y1 + size[1]), color=(220, 220, 220))
input_image = render_text(input_image, f"{COCOLabels(box.classID).name}: {box.confidence:.2f}", (box.x1, box.y1), color=(30, 30, 30), normalised_scaling=0.5)
if FLAGS.out:
cv2.imwrite(FLAGS.out, input_image)
print(f"Saved result to {FLAGS.out}")
else:
cv2.imshow('image', input_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# VIDEO MODE
if FLAGS.mode == 'video':
print("Running in 'video' mode")
if not FLAGS.input:
print("FAILED: no input video")
sys.exit(1)
inputs = []
outputs = []
inputs.append(grpcclient.InferInput(INPUT_NAMES[0], [1, 3, FLAGS.width, FLAGS.height], "FP32"))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[0]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[1]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[2]))
outputs.append(grpcclient.InferRequestedOutput(OUTPUT_NAMES[3]))
print("Opening input video stream...")
cap = cv2.VideoCapture(FLAGS.input)
if not cap.isOpened():
print(f"FAILED: cannot open video {FLAGS.input}")
sys.exit(1)
counter = 0
out = None
print("Invoking inference...")
while True:
ret, frame = cap.read()
if not ret:
print("failed to fetch next frame")
break
if counter == 0 and FLAGS.out:
print("Opening output video stream...")
fourcc = cv2.VideoWriter_fourcc('M', 'P', '4', 'V')
out = cv2.VideoWriter(FLAGS.out, fourcc, FLAGS.fps, (frame.shape[1], frame.shape[0]))
input_image_buffer = preprocess(frame, [FLAGS.width, FLAGS.height])
input_image_buffer = np.expand_dims(input_image_buffer, axis=0)
inputs[0].set_data_from_numpy(input_image_buffer)
results = triton_client.infer(model_name=FLAGS.model,
inputs=inputs,
outputs=outputs,
client_timeout=FLAGS.client_timeout)
num_dets = results.as_numpy("num_dets")
det_boxes = results.as_numpy("det_boxes")
det_scores = results.as_numpy("det_scores")
det_classes = results.as_numpy("det_classes")
detected_objects = postprocess(num_dets, det_boxes, det_scores, det_classes, frame.shape[1], frame.shape[0], [FLAGS.width, FLAGS.height])
print(f"Frame {counter}: {len(detected_objects)} objects")
counter += 1
for box in detected_objects:
print(f"{COCOLabels(box.classID).name}: {box.confidence}")
frame = render_box(frame, box.box(), color=tuple(RAND_COLORS[box.classID % 64].tolist()))
size = get_text_size(frame, f"{COCOLabels(box.classID).name}: {box.confidence:.2f}", normalised_scaling=0.6)
frame = render_filled_box(frame, (box.x1 - 3, box.y1 - 3, box.x1 + size[0], box.y1 + size[1]), color=(220, 220, 220))
frame = render_text(frame, f"{COCOLabels(box.classID).name}: {box.confidence:.2f}", (box.x1, box.y1), color=(30, 30, 30), normalised_scaling=0.5)
if FLAGS.out:
out.write(frame)
else:
cv2.imshow('image', frame)
if cv2.waitKey(1) == ord('q'):
break
if FLAGS.model_info:
statistics = triton_client.get_inference_statistics(model_name=FLAGS.model)
if len(statistics.model_stats) != 1:
print("FAILED: get_inference_statistics")
sys.exit(1)
print(statistics)
print("Done")
cap.release()
if FLAGS.out:
out.release()
else:
cv2.destroyAllWindows()