Spaces:
Runtime error
Runtime error
import streamlit as st | |
import cv2 | |
import numpy as np | |
import tempfile | |
import time | |
from random import randint | |
# Configuration constants | |
LINE_IN_COLOR = (64, 255, 0) | |
LINE_OUT_COLOR = (0, 0, 255) | |
BOUNDING_BOX_COLOR = (255, 128, 0) | |
TRACKER_COLOR = (randint(0, 255), randint(0, 255), randint(0, 255)) | |
CENTROID_COLOR = (randint(0, 255), randint(0, 255), randint(0, 255)) | |
TEXT_COLOR = (randint(0, 255), randint(0, 255), randint(0, 255)) | |
TEXT_POSITION_BGS = (10, 50) | |
TEXT_POSITION_COUNT_CARS = (10, 100) | |
TEXT_POSITION_COUNT_TRUCKS = (10, 150) | |
TEXT_SIZE = 1.2 | |
FONT = cv2.FONT_HERSHEY_SIMPLEX | |
VIDEO_SOURCE = "videos/Traffic_4.mp4" | |
# Background Subtraction Method | |
BGS_TYPES = ["GMG", "MOG", "MOG2", "KNN", "CNT"] | |
BGS_TYPE = BGS_TYPES[2] | |
def getKernel(KERNEL_TYPE): | |
if KERNEL_TYPE == "dilation": | |
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) | |
if KERNEL_TYPE == "opening": | |
kernel = np.ones((3, 3), np.uint8) | |
if KERNEL_TYPE == "closing": | |
kernel = np.ones((11, 11), np.uint8) | |
return kernel | |
def getFilter(img, filter): | |
if filter == 'closing': | |
return cv2.morphologyEx(img, cv2.MORPH_CLOSE, getKernel("closing"), iterations=2) | |
if filter == 'opening': | |
return cv2.morphologyEx(img, cv2.MORPH_OPEN, getKernel("opening"), iterations=2) | |
if filter == 'dilation': | |
return cv2.dilate(img, getKernel("dilation"), iterations=2) | |
if filter == 'combine': | |
closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, getKernel("closing"), iterations=2) | |
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, getKernel("opening"), iterations=2) | |
dilation = cv2.dilate(opening, getKernel("dilation"), iterations=2) | |
return dilation | |
def getBGSubtractor(BGS_TYPE): | |
if BGS_TYPE == "GMG": | |
return cv2.bgsegm.createBackgroundSubtractorGMG(initializationFrames=120, decisionThreshold=.8) | |
if BGS_TYPE == "MOG": | |
return cv2.bgsegm.createBackgroundSubtractorMOG(history=200, nmixtures=5, backgroundRatio=.7, noiseSigma=0) | |
if BGS_TYPE == "MOG2": | |
return cv2.createBackgroundSubtractorMOG2(history=50, detectShadows=False, varThreshold=200) | |
if BGS_TYPE == "KNN": | |
return cv2.createBackgroundSubtractorKNN(history=100, dist2Threshold=400, detectShadows=True) | |
if BGS_TYPE == "CNT": | |
return cv2.bgsegm.createBackgroundSubtractorCNT(minPixelStability=15, useHistory=True, | |
maxPixelStability=15 * 60, isParallel=True) | |
print("Invalid detector") | |
sys.exit(1) | |
def getCentroid(x, y, w, h): | |
x1 = int(w / 2) | |
y1 = int(h / 2) | |
cx = x + x1 | |
cy = y + y1 | |
return (cx, cy) | |
def process_video(video_path): | |
cap = cv2.VideoCapture(video_path) | |
hasFrame, frame = cap.read() | |
if not hasFrame: | |
st.error("Failed to load the video.") | |
return None | |
# ROI | |
bbox = cv2.selectROI(frame, False) | |
(w1, h1, w2, h2) = bbox | |
frameArea = h2 * w2 | |
minArea = int(frameArea / 250) | |
maxArea = 15000 | |
line_IN = int(h1) | |
line_OUT = int(h2 - 20) | |
DOWN_limit = int(h1 / 4) | |
bg_subtractor = getBGSubtractor(BGS_TYPE) | |
frame_number = -1 | |
cnt_cars, cnt_trucks = 0, 0 | |
objects = [] | |
max_p_age = 2 | |
pid = 1 | |
with tempfile.NamedTemporaryFile(delete=False) as output_file: | |
output_path = output_file.name + ".avi" | |
fourcc = cv2.VideoWriter_fourcc(*'XVID') | |
writer_video = cv2.VideoWriter(output_path, fourcc, 25, (frame.shape[1], frame.shape[0]), True) | |
while cap.isOpened(): | |
ok, frame = cap.read() | |
if not ok: | |
break | |
roi = frame[h1:h1 + h2, w1:w1 + w2] | |
for i in objects: | |
i.age_one() | |
frame_number += 1 | |
bg_mask = bg_subtractor.apply(roi) | |
bg_mask = getFilter(bg_mask, 'combine') | |
(contours, _) = cv2.findContours(bg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
for cnt in contours: | |
area = cv2.contourArea(cnt) | |
if area > minArea and area <= maxArea: | |
x, y, w, h = cv2.boundingRect(cnt) | |
centroid = getCentroid(x, y, w, h) | |
cx = centroid[0] | |
cy = centroid[1] | |
new = True | |
cv2.rectangle(roi, (x, y), (x + 50, y - 13), TRACKER_COLOR, -1) | |
cv2.putText(roi, 'CAR', (x, y - 2), FONT, 0.5, (255, 255, 255), 1, cv2.LINE_AA) | |
for i in objects: | |
if abs(x - i.getX()) <= w and abs(y - i.getY()) <= h: | |
new = False | |
i.updateCoords(cx, cy) | |
if i.going_DOWN(DOWN_limit): | |
cnt_cars += 1 | |
break | |
if i.getState() == '1' and i.getDir() == 'down' and i.getY() > line_OUT: | |
i.setDone() | |
if i.timedOut(): | |
objects.pop(objects.index(i)) | |
del i | |
if new: | |
p = validator.MyValidator(pid, cx, cy, max_p_age) | |
objects.append(p) | |
pid += 1 | |
cv2.circle(roi, (cx, cy), 5, CENTROID_COLOR, -1) | |
elif area >= maxArea: | |
x, y, w, h = cv2.boundingRect(cnt) | |
centroid = getCentroid(x, y, w, h) | |
cx = centroid[0] | |
cy = centroid[1] | |
new = True | |
cv2.rectangle(roi, (x, y), (x + 50, y - 13), TRACKER_COLOR, -1) | |
cv2.putText(roi, 'TRUCK', (x, y - 2), FONT, .5, (255, 255, 255), 1, cv2.LINE_AA) | |
for i in objects: | |
if abs(x - i.getX()) <= w and abs(y - i.getY()) <= h: | |
new = False | |
i.updateCoords(cx, cy) | |
if i.going_DOWN(DOWN_limit): | |
cnt_trucks += 1 | |
break | |
if i.getState() == '1' and i.getDir() == 'down' and i.getY() > line_OUT: | |
i.setDone() | |
if i.timedOut(): | |
objects.pop(objects.index(i)) | |
del i | |
if new: | |
p = validator.MyValidator(pid, cx, cy, max_p_age) | |
objects.append(p) | |
pid += 1 | |
cv2.circle(roi, (cx, cy), 5, CENTROID_COLOR, -1) | |
for i in objects: | |
cv2.putText(roi, str(i.getId()), (i.getX(), i.getY()), FONT, 0.3, TEXT_COLOR, 1, cv2.LINE_AA) | |
str_cars = 'Cars: ' + str(cnt_cars) | |
str_trucks = 'Trucks: ' + str(cnt_trucks) | |
frame = cv2.line(frame, (w1, line_IN), (w1 + w2, line_IN), LINE_IN_COLOR, 2) | |
frame = cv2.line(frame, (w1, h1 + line_OUT), (w1 + w2, h1 + line_OUT), LINE_OUT_COLOR, 2) | |
cv2.putText(frame, str_cars, TEXT_POSITION_COUNT_CARS, FONT, 1, (255, 255, 255), 3, cv2.LINE_AA) | |
cv2.putText(frame, str_cars, TEXT_POSITION_COUNT_CARS, FONT, 1, (232, 162, 0), 2, cv2.LINE_AA) | |
cv2.putText(frame, str_trucks, TEXT_POSITION_COUNT_TRUCKS, FONT, 1, (255, 255, 255), 3, cv2.LINE_AA) | |
cv2.putText(frame, str_trucks, TEXT_POSITION_COUNT_TRUCKS, FONT, 1, (232, 162, 0), 2, cv2.LINE_AA) | |
writer_video.write(frame) | |
if frame_number % 30 == 0: | |
st.image(frame, channels="BGR", use_column_width=True) | |
cap.release() | |
writer_video.release() | |
return output_path | |
def main(): | |
st.title("Vehicle Counting and Tracking") | |
st.write("This application processes the video, counts vehicles, and tracks them.") | |
video_path = VIDEO_SOURCE | |
if video_path: | |
st.video(video_path) | |
st.write("Processing video...") | |
output_path = process_video(video_path) | |
if output_path: | |
st.write("Processing completed.") | |
st.video(output_path) | |
else: | |
st.write("Failed to process video.") | |
else: | |
st.error("Please upload a video file.") | |
if __name__ == "__main__": | |
main() | |