|
from PIL import Image |
|
from ultralytics import YOLO |
|
import numpy as np |
|
import cv2 |
|
import torch |
|
|
|
|
|
|
|
from utils import readb64, img2base64 |
|
|
|
model_int8 = YOLO('weights/best.torchscript', task='detect') |
|
|
|
labels = { |
|
0: 'mask_weared_incorrect', |
|
1: 'with_mask', |
|
2: 'without_mask', |
|
} |
|
|
|
|
|
def inference_on_image(path): |
|
results = model_int8(path) |
|
|
|
img = cv2.imread(path, cv2.COLOR_BGR2RGB) |
|
for box in results[0].boxes: |
|
img = draw_bbox_prediction(img, box) |
|
|
|
cv2.imshow('Detected Image', img) |
|
cv2.waitKey(0) |
|
|
|
return results |
|
|
|
def inference_on_video(path, vid_stride=10): |
|
results = model_int8(path, vid_stride=10, stream=True) |
|
|
|
cap = cv2.VideoCapture(path) |
|
ret, img = cap.read() |
|
|
|
frame_counter = 0 |
|
while True: |
|
ret, img = cap.read() |
|
if ret: |
|
if frame_counter % 10 == 0: |
|
result = next(results) |
|
for box in result.boxes: |
|
img = draw_bbox_prediction(img, box) |
|
else: |
|
cap.release() |
|
break |
|
|
|
cv2.imshow('Detected Image', img) |
|
frame_counter += 1 |
|
|
|
k = cv2.waitKey(5) & 0xFF |
|
if k == 27: |
|
cap.release() |
|
cv2.destroyAllWindows() |
|
break |
|
|
|
return results |
|
|
|
def draw_bbox_prediction(img, box): |
|
cls = box.cls.item() |
|
confidence = box.conf.item() |
|
label = labels[cls] |
|
|
|
x1, y1, x2, y2 = map(int, list(box.xyxy.numpy()[0])) |
|
scaler = (x2-x1)/(640/8) |
|
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 102, 255), int(2*scaler)) |
|
img = cv2.rectangle(img, (x1, y1 - int(20*scaler)), (x1 + (x2 - x1)*3, y1), (0, 102, 255), -1) |
|
img = cv2.putText(img, "{}: {:.3f}".format(label, confidence), (x1,y1-5),cv2.FONT_HERSHEY_SIMPLEX,0.6*scaler,(255,255,255), int(1*scaler)) |
|
return img |
|
|
|
|
|
class ImagePipeline: |
|
def __init__(self, device='cpu', gpu_id=0, weights='weights/best.torchscript'): |
|
self.model = YOLO(weights, task='detect') |
|
|
|
def preprocess(self, data): |
|
image_base64 = data.pop("images", data) |
|
|
|
if not type(image_base64) == list: |
|
image_base64 = [image_base64] |
|
elif len(image_base64) > 1: |
|
raise Exception("ImagePipeline only accepts 1 image/frame") |
|
|
|
images = [readb64(image) for image in image_base64] |
|
return images |
|
|
|
def inference(self, images): |
|
results = self.model(images[0]) |
|
return results |
|
|
|
def get_response(self, inference_result): |
|
response = [] |
|
|
|
if not bool(set([0, 2]).intersection(inference_result[0].boxes.cls.numpy())): |
|
|
|
message = "Everyone is wearing mask correctly" |
|
else: |
|
message = "Someone is not wearing mask or incorrectly wearing mask" |
|
|
|
for i, result in enumerate(inference_result): |
|
for xywhn, cls, conf in zip( |
|
result.boxes.xywhn, |
|
result.boxes.cls, |
|
result.boxes.conf |
|
): |
|
xywhn = list(xywhn.numpy()) |
|
response.append({ |
|
'xywhn': { |
|
'x': float(xywhn[0]), |
|
'y': float(xywhn[1]), |
|
'w': float(xywhn[2]), |
|
'h': float(xywhn[3]), |
|
}, |
|
'class': cls.item(), |
|
'confidence': conf.item(), |
|
}) |
|
|
|
return {'results': response, |
|
'message': message} |
|
|
|
def draw_bbox(self, images, inference_result): |
|
img = np.array(images[0]) |
|
boxes = list(inference_result[0].boxes) |
|
boxes.reverse() |
|
|
|
|
|
for box in boxes: |
|
img = draw_bbox_prediction(img, box) |
|
|
|
return img |
|
|
|
def __call__(self, data, config_payload=None, draw_bbox=False): |
|
images = self.preprocess(data) |
|
inference_result = self.inference(images) |
|
response = self.get_response(inference_result) |
|
if draw_bbox: |
|
annotated_img = self.draw_bbox(images, inference_result) |
|
return response, annotated_img |
|
return response |
|
|
|
class VideoPipeline: |
|
def __init__(self, device='cpu', gpu_id=0, weights='weights/best.torchscript'): |
|
self.model = YOLO(weights, task='detect') |
|
|
|
def preprocess(self, data): |
|
return data |
|
|
|
def inference(self, video_path, vid_stride=30): |
|
results = self.model(video_path, vid_stride=vid_stride) |
|
return results |
|
|
|
def get_response(self, inference_result): |
|
response = [] |
|
|
|
|
|
|
|
message = "Everyone is wearing mask correctly" |
|
|
|
for i, result in enumerate(inference_result): |
|
|
|
if set([0, 2]).issubset(inference_result[0].boxes.cls.numpy()): |
|
message = "Someone is not wearing mask or incorrectly wearing mask" |
|
|
|
for xywhn, cls, conf in zip( |
|
result.boxes.xywhn, |
|
result.boxes.cls, |
|
result.boxes.conf |
|
): |
|
xywhn = list(xywhn.numpy()) |
|
response.append({ |
|
'xywhn': { |
|
'x': float(xywhn[0]), |
|
'y': float(xywhn[1]), |
|
'w': float(xywhn[2]), |
|
'h': float(xywhn[3]), |
|
}, |
|
'class': cls.item(), |
|
'confidence': conf.item(), |
|
}) |
|
|
|
return {'results': response, |
|
'message': message} |
|
|
|
def __call__(self, data, config_payload=None): |
|
data = self.preprocess(data) |
|
inference_result = self.inference(data) |
|
response = self.get_response(inference_result) |
|
return response |
|
|
|
|
|
if __name__ == '__main__': |
|
import cv2 |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
|
parser.add_argument('--input_type', |
|
default='image', |
|
const='image', |
|
nargs='?', |
|
choices=['image', 'video'], |
|
help='type of input (default: %(default)s)') |
|
parser.add_argument("-p", "--path", help="filepath") |
|
args = parser.parse_args() |
|
|
|
if args.input_type=='image': |
|
results = inference_on_image(args.path) |
|
elif args.input_type == 'video': |
|
results = inference_on_video(args.path) |
|
|
|
|
|
print(results) |
|
|
|
|
|
|
|
|
|
|