from collections import deque from threading import Thread from cv2 import VideoCapture from typing_extensions import Self class Camera: def __init__(self, capture_id: int=0, buffer_size: int=1): self.capture_id = capture_id self.buffer_size = buffer_size self.capture: VideoCapture self.capture_thread: Thread self.buffer = deque([], buffer_size) self.stop_capture = False def __enter__(self) -> Self: self.capture = VideoCapture(self.capture_id) if not self.capture.isOpened(): raise IOError("Unable to open device.") self.capture_thread = Thread(target=self.start_capture) self.capture_thread.start() return self def __exit__(self, *_): self.stop_capture = True self.capture_thread.join() self.capture.release() def start_capture(self): while True: if self.stop_capture: break _, frame = self.capture.read() if len(self.buffer) == self.buffer_size: self.buffer.popleft() self.buffer.append(frame) def is_capturing(self) -> bool: print(f"Filling buffer: {len(self.buffer)}/{self.buffer_size}") return len(self.buffer) == self.buffer_size