test1 / app /camera /__init__.py
supArs's picture
Upload 19 files
3afff35 verified
raw
history blame
1.34 kB
from collections import deque
from threading import Thread
from cv2 import VideoCapture
from typing_extensions import Self
class Camera:
def __init__(self, capture_id: int=0, buffer_size: int=1):
self.capture_id = capture_id
self.buffer_size = buffer_size
self.capture: VideoCapture
self.capture_thread: Thread
self.buffer = deque([], buffer_size)
self.stop_capture = False
def __enter__(self) -> Self:
self.capture = VideoCapture(self.capture_id)
if not self.capture.isOpened():
raise IOError("Unable to open device.")
self.capture_thread = Thread(target=self.start_capture)
self.capture_thread.start()
return self
def __exit__(self, *_):
self.stop_capture = True
self.capture_thread.join()
self.capture.release()
def start_capture(self):
while True:
if self.stop_capture:
break
_, frame = self.capture.read()
if len(self.buffer) == self.buffer_size:
self.buffer.popleft()
self.buffer.append(frame)
def is_capturing(self) -> bool:
print(f"Filling buffer: {len(self.buffer)}/{self.buffer_size}")
return len(self.buffer) == self.buffer_size