manfredmichael commited on
Commit
93c1293
1 Parent(s): d95d48a

Add pipeline

Browse files
Files changed (3) hide show
  1. .gitignore +4 -0
  2. app.py +101 -0
  3. pipelines.py +218 -0
.gitignore ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ __pycache__/
2
+ temps/*
3
+ runs/*
4
+ .ipynb_checkpoints/
app.py ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import base64
2
+ import json
3
+ import os, shutil
4
+ import re
5
+ import time
6
+ import uuid
7
+
8
+ import cv2
9
+
10
+ import numpy as np
11
+ import streamlit as st
12
+ from PIL import Image
13
+ # from extract_video import extract_method_single_video
14
+
15
+ from utils import st_file_selector, img2base64
16
+ from pipelines import ImagePipeline, VideoPipeline
17
+
18
+ import os
19
+
20
+ DEBUG = True
21
+
22
+ def main():
23
+ st.markdown("###")
24
+ uploaded_file = st.file_uploader('Upload a picture', type=['mp4', 'jpg', 'jpeg', 'png'], accept_multiple_files=False)
25
+
26
+ with st.spinner(f'Loading samples...'):
27
+ while not os.path.isdir("sample_files"):
28
+ time.sleep(1)
29
+ st.markdown("### or")
30
+ selected_file = st_file_selector(st, path='sample_files', key = 'selected', label = 'Choose a sample image/video')
31
+
32
+ if uploaded_file:
33
+ random_id = uuid.uuid1()
34
+ base_folder = "temps"
35
+ filename = "{}.{}".format(random_id, uploaded_file.type.split("/")[-1])
36
+ file_type = uploaded_file.type.split("/")[0]
37
+ filepath = f"{base_folder}/{filename}"
38
+ faces_folder = f"{base_folder}/images/{random_id}"
39
+ st.write(filepath)
40
+ if uploaded_file.type == 'video/mp4':
41
+ with open(f"temps/{filename}", mode='wb') as f:
42
+ f.write(uploaded_file.read())
43
+ video_path = filepath
44
+ st.video(uploaded_file)
45
+ else:
46
+ img = Image.open(uploaded_file).convert('RGB')
47
+ ext = uploaded_file.type.split("/")[-1]
48
+ st.image(img)
49
+ elif selected_file:
50
+ base_folder = "sample_files"
51
+ file_type = selected_file.split(".")[-1]
52
+ filename = selected_file.split("/")[-1]
53
+ filepath = f"{base_folder}/{selected_file}"
54
+
55
+ if file_type == 'mp4':
56
+ video_file = open(filepath, 'rb')
57
+ video_bytes = video_file.read()
58
+ st.video(video_bytes)
59
+ video_path = filepath
60
+ else:
61
+ img = Image.open(filepath).convert('RGB')
62
+ st.image(img)
63
+ else:
64
+ return
65
+
66
+
67
+
68
+ annotated_img = None
69
+ with st.spinner(f'Analyzing {file_type}...'):
70
+ if file_type == 'video' or file_type == 'mp4':
71
+ result = video_pipeline(video_path)
72
+ else:
73
+ result, annotated_img = image_pipeline({'images': [img2base64(np.array(img))]}, draw_bbox=True)
74
+
75
+ if annotated_img is not None:
76
+ st.image(annotated_img)
77
+
78
+ if 'incorrectly' in result['message']:
79
+ st.error(result['message'], icon="🚨")
80
+ else:
81
+ st.success(result['message'], icon="✅")
82
+
83
+ st.divider()
84
+ st.write('## Response JSON')
85
+ st.write(result)
86
+
87
+
88
+ def setup():
89
+
90
+ if not os.path.isdir("temps"):
91
+ os.makedirs("temps")
92
+
93
+ if __name__ == "__main__":
94
+ image_pipeline = ImagePipeline()
95
+ video_pipeline = VideoPipeline()
96
+
97
+ # with st.sidebar:
98
+
99
+ st.title("Improper Mask Wearing Detection")
100
+ setup()
101
+ main()
pipelines.py ADDED
@@ -0,0 +1,218 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from PIL import Image
2
+ from ultralytics import YOLO
3
+ import numpy as np
4
+ import cv2
5
+ import torch
6
+
7
+
8
+
9
+ from utils import readb64, img2base64
10
+
11
+ model_int8 = YOLO('weights/best.torchscript', task='detect')
12
+
13
+ labels = {
14
+ 0: 'mask_weared_incorrect',
15
+ 1: 'with_mask',
16
+ 2: 'without_mask',
17
+ }
18
+
19
+
20
+ def inference_on_image(path):
21
+ results = model_int8(path)
22
+
23
+ img = cv2.imread(path, cv2.COLOR_BGR2RGB)
24
+ for box in results[0].boxes:
25
+ img = draw_bbox_prediction(img, box)
26
+
27
+ cv2.imshow('Detected Image', img)
28
+ cv2.waitKey(0)
29
+
30
+ return results
31
+
32
+ def inference_on_video(path, vid_stride=10):
33
+ results = model_int8(path, vid_stride=10, stream=True)
34
+
35
+ cap = cv2.VideoCapture(path)
36
+ ret, img = cap.read()
37
+
38
+ frame_counter = 0
39
+ while True:
40
+ ret, img = cap.read()
41
+ if ret:
42
+ if frame_counter % 10 == 0:
43
+ result = next(results)
44
+ for box in result.boxes:
45
+ img = draw_bbox_prediction(img, box)
46
+ else:
47
+ cap.release()
48
+ break
49
+
50
+ cv2.imshow('Detected Image', img)
51
+ frame_counter += 1
52
+
53
+ k = cv2.waitKey(5) & 0xFF
54
+ if k == 27:
55
+ cap.release()
56
+ cv2.destroyAllWindows()
57
+ break
58
+
59
+ return results
60
+
61
+ def draw_bbox_prediction(img, box):
62
+ cls = box.cls.item()
63
+ confidence = box.conf.item()
64
+ label = labels[cls]
65
+
66
+ x1, y1, x2, y2 = map(int, list(box.xyxy.numpy()[0]))
67
+ scaler = (x2-x1)/(640/8)
68
+ cv2.rectangle(img, (x1, y1), (x2, y2), (0, 102, 255), int(2*scaler))
69
+ img = cv2.rectangle(img, (x1, y1 - int(20*scaler)), (x1 + (x2 - x1)*3, y1), (0, 102, 255), -1)
70
+ img = cv2.putText(img, "{}: {:.3f}".format(label, confidence), (x1,y1-5),cv2.FONT_HERSHEY_SIMPLEX,0.6*scaler,(255,255,255), int(1*scaler))
71
+ return img
72
+
73
+
74
+ class ImagePipeline:
75
+ def __init__(self, device='cpu', gpu_id=0, weights='weights/best.torchscript'):
76
+ self.model = YOLO(weights, task='detect')
77
+
78
+ def preprocess(self, data):
79
+ image_base64 = data.pop("images", data)
80
+
81
+ if not type(image_base64) == list:
82
+ image_base64 = [image_base64]
83
+ elif len(image_base64) > 1:
84
+ raise Exception("ImagePipeline only accepts 1 image/frame")
85
+
86
+ images = [readb64(image) for image in image_base64]
87
+ return images
88
+
89
+ def inference(self, images):
90
+ results = self.model(images[0])
91
+ return results
92
+
93
+ def get_response(self, inference_result):
94
+ response = []
95
+
96
+ if not bool(set([0, 2]).intersection(inference_result[0].boxes.cls.numpy())):
97
+ # if not set([0, 2]).issubset(inference_result[0].boxes.cls.numpy()):
98
+ message = "Everyone is wearing mask correctly"
99
+ else:
100
+ message = "Someone is not wearing mask or incorrectly wearing mask"
101
+
102
+ for i, result in enumerate(inference_result):
103
+ for xywhn, cls, conf in zip(
104
+ result.boxes.xywhn,
105
+ result.boxes.cls,
106
+ result.boxes.conf
107
+ ):
108
+ xywhn = list(xywhn.numpy())
109
+ response.append({
110
+ 'xywhn': {
111
+ 'x': float(xywhn[0]),
112
+ 'y': float(xywhn[1]),
113
+ 'w': float(xywhn[2]),
114
+ 'h': float(xywhn[3]),
115
+ },
116
+ 'class': cls.item(),
117
+ 'confidence': conf.item(),
118
+ })
119
+
120
+ return {'results': response,
121
+ 'message': message}
122
+
123
+ def draw_bbox(self, images, inference_result):
124
+ img = np.array(images[0])
125
+ boxes = list(inference_result[0].boxes)
126
+ boxes.reverse()
127
+
128
+
129
+ for box in boxes:
130
+ img = draw_bbox_prediction(img, box)
131
+
132
+ return img
133
+
134
+ def __call__(self, data, config_payload=None, draw_bbox=False):
135
+ images = self.preprocess(data)
136
+ inference_result = self.inference(images)
137
+ response = self.get_response(inference_result)
138
+ if draw_bbox:
139
+ annotated_img = self.draw_bbox(images, inference_result)
140
+ return response, annotated_img
141
+ return response
142
+
143
+ class VideoPipeline:
144
+ def __init__(self, device='cpu', gpu_id=0, weights='weights/best.torchscript'):
145
+ self.model = YOLO(weights, task='detect')
146
+
147
+ def preprocess(self, data):
148
+ return data
149
+
150
+ def inference(self, video_path, vid_stride=30):
151
+ results = self.model(video_path, vid_stride=vid_stride)
152
+ return results
153
+
154
+ def get_response(self, inference_result):
155
+ response = []
156
+
157
+
158
+ # default message
159
+ message = "Everyone is wearing mask correctly"
160
+
161
+ for i, result in enumerate(inference_result):
162
+
163
+ if set([0, 2]).issubset(inference_result[0].boxes.cls.numpy()):
164
+ message = "Someone is not wearing mask or incorrectly wearing mask"
165
+
166
+ for xywhn, cls, conf in zip(
167
+ result.boxes.xywhn,
168
+ result.boxes.cls,
169
+ result.boxes.conf
170
+ ):
171
+ xywhn = list(xywhn.numpy())
172
+ response.append({
173
+ 'xywhn': {
174
+ 'x': float(xywhn[0]),
175
+ 'y': float(xywhn[1]),
176
+ 'w': float(xywhn[2]),
177
+ 'h': float(xywhn[3]),
178
+ },
179
+ 'class': cls.item(),
180
+ 'confidence': conf.item(),
181
+ })
182
+
183
+ return {'results': response,
184
+ 'message': message}
185
+
186
+ def __call__(self, data, config_payload=None):
187
+ data = self.preprocess(data)
188
+ inference_result = self.inference(data)
189
+ response = self.get_response(inference_result)
190
+ return response
191
+
192
+
193
+ if __name__ == '__main__':
194
+ import cv2
195
+ import argparse
196
+
197
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
198
+ parser.add_argument('--input_type',
199
+ default='image',
200
+ const='image',
201
+ nargs='?',
202
+ choices=['image', 'video'],
203
+ help='type of input (default: %(default)s)')
204
+ parser.add_argument("-p", "--path", help="filepath")
205
+ args = parser.parse_args()
206
+
207
+ if args.input_type=='image':
208
+ results = inference_on_image(args.path)
209
+ elif args.input_type == 'video':
210
+ results = inference_on_video(args.path)
211
+
212
+
213
+ print(results)
214
+
215
+
216
+ # Examples
217
+ # python pipelines.py --input_type image --path sample_files/image-1.jpeg
218
+ # python pipelines.py --input_type video --path sample_files/video-1.mp4