Prathamesh1420's picture
Update app.py
2670a4f verified
import streamlit as st
import cv2
import numpy as np
import tempfile
import time
from random import randint
# Configuration constants
LINE_IN_COLOR = (64, 255, 0)
LINE_OUT_COLOR = (0, 0, 255)
BOUNDING_BOX_COLOR = (255, 128, 0)
TRACKER_COLOR = (randint(0, 255), randint(0, 255), randint(0, 255))
CENTROID_COLOR = (randint(0, 255), randint(0, 255), randint(0, 255))
TEXT_COLOR = (randint(0, 255), randint(0, 255), randint(0, 255))
TEXT_POSITION_BGS = (10, 50)
TEXT_POSITION_COUNT_CARS = (10, 100)
TEXT_POSITION_COUNT_TRUCKS = (10, 150)
TEXT_SIZE = 1.2
FONT = cv2.FONT_HERSHEY_SIMPLEX
VIDEO_SOURCE = "videos/Traffic_4.mp4"
# Background Subtraction Method
BGS_TYPES = ["GMG", "MOG", "MOG2", "KNN", "CNT"]
BGS_TYPE = BGS_TYPES[2]
def getKernel(KERNEL_TYPE):
if KERNEL_TYPE == "dilation":
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
if KERNEL_TYPE == "opening":
kernel = np.ones((3, 3), np.uint8)
if KERNEL_TYPE == "closing":
kernel = np.ones((11, 11), np.uint8)
return kernel
def getFilter(img, filter):
if filter == 'closing':
return cv2.morphologyEx(img, cv2.MORPH_CLOSE, getKernel("closing"), iterations=2)
if filter == 'opening':
return cv2.morphologyEx(img, cv2.MORPH_OPEN, getKernel("opening"), iterations=2)
if filter == 'dilation':
return cv2.dilate(img, getKernel("dilation"), iterations=2)
if filter == 'combine':
closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, getKernel("closing"), iterations=2)
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, getKernel("opening"), iterations=2)
dilation = cv2.dilate(opening, getKernel("dilation"), iterations=2)
return dilation
def getBGSubtractor(BGS_TYPE):
if BGS_TYPE == "GMG":
return cv2.bgsegm.createBackgroundSubtractorGMG(initializationFrames=120, decisionThreshold=.8)
if BGS_TYPE == "MOG":
return cv2.bgsegm.createBackgroundSubtractorMOG(history=200, nmixtures=5, backgroundRatio=.7, noiseSigma=0)
if BGS_TYPE == "MOG2":
return cv2.createBackgroundSubtractorMOG2(history=50, detectShadows=False, varThreshold=200)
if BGS_TYPE == "KNN":
return cv2.createBackgroundSubtractorKNN(history=100, dist2Threshold=400, detectShadows=True)
if BGS_TYPE == "CNT":
return cv2.bgsegm.createBackgroundSubtractorCNT(minPixelStability=15, useHistory=True,
maxPixelStability=15 * 60, isParallel=True)
print("Invalid detector")
sys.exit(1)
def getCentroid(x, y, w, h):
x1 = int(w / 2)
y1 = int(h / 2)
cx = x + x1
cy = y + y1
return (cx, cy)
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
hasFrame, frame = cap.read()
if not hasFrame:
st.error("Failed to load the video.")
return None
# ROI
bbox = cv2.selectROI(frame, False)
(w1, h1, w2, h2) = bbox
frameArea = h2 * w2
minArea = int(frameArea / 250)
maxArea = 15000
line_IN = int(h1)
line_OUT = int(h2 - 20)
DOWN_limit = int(h1 / 4)
bg_subtractor = getBGSubtractor(BGS_TYPE)
frame_number = -1
cnt_cars, cnt_trucks = 0, 0
objects = []
max_p_age = 2
pid = 1
with tempfile.NamedTemporaryFile(delete=False) as output_file:
output_path = output_file.name + ".avi"
fourcc = cv2.VideoWriter_fourcc(*'XVID')
writer_video = cv2.VideoWriter(output_path, fourcc, 25, (frame.shape[1], frame.shape[0]), True)
while cap.isOpened():
ok, frame = cap.read()
if not ok:
break
roi = frame[h1:h1 + h2, w1:w1 + w2]
for i in objects:
i.age_one()
frame_number += 1
bg_mask = bg_subtractor.apply(roi)
bg_mask = getFilter(bg_mask, 'combine')
(contours, _) = cv2.findContours(bg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if area > minArea and area <= maxArea:
x, y, w, h = cv2.boundingRect(cnt)
centroid = getCentroid(x, y, w, h)
cx = centroid[0]
cy = centroid[1]
new = True
cv2.rectangle(roi, (x, y), (x + 50, y - 13), TRACKER_COLOR, -1)
cv2.putText(roi, 'CAR', (x, y - 2), FONT, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
for i in objects:
if abs(x - i.getX()) <= w and abs(y - i.getY()) <= h:
new = False
i.updateCoords(cx, cy)
if i.going_DOWN(DOWN_limit):
cnt_cars += 1
break
if i.getState() == '1' and i.getDir() == 'down' and i.getY() > line_OUT:
i.setDone()
if i.timedOut():
objects.pop(objects.index(i))
del i
if new:
p = validator.MyValidator(pid, cx, cy, max_p_age)
objects.append(p)
pid += 1
cv2.circle(roi, (cx, cy), 5, CENTROID_COLOR, -1)
elif area >= maxArea:
x, y, w, h = cv2.boundingRect(cnt)
centroid = getCentroid(x, y, w, h)
cx = centroid[0]
cy = centroid[1]
new = True
cv2.rectangle(roi, (x, y), (x + 50, y - 13), TRACKER_COLOR, -1)
cv2.putText(roi, 'TRUCK', (x, y - 2), FONT, .5, (255, 255, 255), 1, cv2.LINE_AA)
for i in objects:
if abs(x - i.getX()) <= w and abs(y - i.getY()) <= h:
new = False
i.updateCoords(cx, cy)
if i.going_DOWN(DOWN_limit):
cnt_trucks += 1
break
if i.getState() == '1' and i.getDir() == 'down' and i.getY() > line_OUT:
i.setDone()
if i.timedOut():
objects.pop(objects.index(i))
del i
if new:
p = validator.MyValidator(pid, cx, cy, max_p_age)
objects.append(p)
pid += 1
cv2.circle(roi, (cx, cy), 5, CENTROID_COLOR, -1)
for i in objects:
cv2.putText(roi, str(i.getId()), (i.getX(), i.getY()), FONT, 0.3, TEXT_COLOR, 1, cv2.LINE_AA)
str_cars = 'Cars: ' + str(cnt_cars)
str_trucks = 'Trucks: ' + str(cnt_trucks)
frame = cv2.line(frame, (w1, line_IN), (w1 + w2, line_IN), LINE_IN_COLOR, 2)
frame = cv2.line(frame, (w1, h1 + line_OUT), (w1 + w2, h1 + line_OUT), LINE_OUT_COLOR, 2)
cv2.putText(frame, str_cars, TEXT_POSITION_COUNT_CARS, FONT, 1, (255, 255, 255), 3, cv2.LINE_AA)
cv2.putText(frame, str_cars, TEXT_POSITION_COUNT_CARS, FONT, 1, (232, 162, 0), 2, cv2.LINE_AA)
cv2.putText(frame, str_trucks, TEXT_POSITION_COUNT_TRUCKS, FONT, 1, (255, 255, 255), 3, cv2.LINE_AA)
cv2.putText(frame, str_trucks, TEXT_POSITION_COUNT_TRUCKS, FONT, 1, (232, 162, 0), 2, cv2.LINE_AA)
writer_video.write(frame)
if frame_number % 30 == 0:
st.image(frame, channels="BGR", use_column_width=True)
cap.release()
writer_video.release()
return output_path
def main():
st.title("Vehicle Counting and Tracking")
st.write("This application processes the video, counts vehicles, and tracks them.")
video_path = VIDEO_SOURCE
if video_path:
st.video(video_path)
st.write("Processing video...")
output_path = process_video(video_path)
if output_path:
st.write("Processing completed.")
st.video(output_path)
else:
st.write("Failed to process video.")
else:
st.error("Please upload a video file.")
if __name__ == "__main__":
main()