File size: 4,607 Bytes
6781da9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
"""Object detection demo with MobileNet SSD.
This model and code are based on
https://github.com/robmarkcole/object-detection-app
"""
import logging
import queue
from pathlib import Path
from typing import List, NamedTuple
import av
import cv2
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer
from utils.download import download_file
from utils.turn import get_ice_servers
HERE = Path(__file__).parent
ROOT = HERE.parent
logger = logging.getLogger(__name__)
MODEL_URL = "https://github.com/robmarkcole/object-detection-app/raw/master/model/MobileNetSSD_deploy.caffemodel" # noqa: E501
MODEL_LOCAL_PATH = ROOT / "./models/MobileNetSSD_deploy.caffemodel"
PROTOTXT_URL = "https://github.com/robmarkcole/object-detection-app/raw/master/model/MobileNetSSD_deploy.prototxt.txt" # noqa: E501
PROTOTXT_LOCAL_PATH = ROOT / "./models/MobileNetSSD_deploy.prototxt.txt"
CLASSES = [
"background",
"aeroplane",
"bicycle",
"bird",
"boat",
"bottle",
"bus",
"car",
"cat",
"chair",
"cow",
"diningtable",
"dog",
"horse",
"motorbike",
"person",
"pottedplant",
"sheep",
"sofa",
"train",
"tvmonitor",
]
class Detection(NamedTuple):
class_id: int
label: str
score: float
box: np.ndarray
@st.cache_resource # type: ignore
def generate_label_colors():
return np.random.uniform(0, 255, size=(len(CLASSES), 3))
COLORS = generate_label_colors()
download_file(MODEL_URL, MODEL_LOCAL_PATH, expected_size=23147564)
download_file(PROTOTXT_URL, PROTOTXT_LOCAL_PATH, expected_size=29353)
# Session-specific caching
cache_key = "object_detection_dnn"
if cache_key in st.session_state:
net = st.session_state[cache_key]
else:
net = cv2.dnn.readNetFromCaffe(str(PROTOTXT_LOCAL_PATH), str(MODEL_LOCAL_PATH))
st.session_state[cache_key] = net
score_threshold = st.slider("Score threshold", 0.0, 1.0, 0.5, 0.05)
# NOTE: The callback will be called in another thread,
# so use a queue here for thread-safety to pass the data
# from inside to outside the callback.
# TODO: A general-purpose shared state object may be more useful.
result_queue: "queue.Queue[List[Detection]]" = queue.Queue()
def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame:
image = frame.to_ndarray(format="bgr24")
# Run inference
blob = cv2.dnn.blobFromImage(
cv2.resize(image, (300, 300)), 0.007843, (300, 300), 127.5
)
net.setInput(blob)
output = net.forward()
h, w = image.shape[:2]
# Convert the output array into a structured form.
output = output.squeeze() # (1, 1, N, 7) -> (N, 7)
output = output[output[:, 2] >= score_threshold]
detections = [
Detection(
class_id=int(detection[1]),
label=CLASSES[int(detection[1])],
score=float(detection[2]),
box=(detection[3:7] * np.array([w, h, w, h])),
)
for detection in output
]
# Render bounding boxes and captions
for detection in detections:
caption = f"{detection.label}: {round(detection.score * 100, 2)}%"
color = COLORS[detection.class_id]
xmin, ymin, xmax, ymax = detection.box.astype("int")
cv2.rectangle(image, (xmin, ymin), (xmax, ymax), color, 2)
cv2.putText(
image,
caption,
(xmin, ymin - 15 if ymin - 15 > 15 else ymin + 15),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
color,
2,
)
result_queue.put(detections)
return av.VideoFrame.from_ndarray(image, format="bgr24")
webrtc_ctx = webrtc_streamer(
key="object-detection",
mode=WebRtcMode.SENDRECV,
rtc_configuration={"iceServers": get_ice_servers()},
video_frame_callback=video_frame_callback,
media_stream_constraints={"video": True, "audio": False},
async_processing=True,
)
if st.checkbox("Show the detected labels", value=True):
if webrtc_ctx.state.playing:
labels_placeholder = st.empty()
# NOTE: The video transformation with object detection and
# this loop displaying the result labels are running
# in different threads asynchronously.
# Then the rendered video frames and the labels displayed here
# are not strictly synchronized.
while True:
result = result_queue.get()
labels_placeholder.table(result)
st.markdown(
"This demo uses a model and code from "
"https://github.com/robmarkcole/object-detection-app. "
"Many thanks to the project."
)
|