|
from collections import deque |
|
from threading import Thread |
|
|
|
from cv2 import VideoCapture |
|
from typing_extensions import Self |
|
|
|
|
|
class Camera: |
|
|
|
def __init__(self, capture_id: int=0, buffer_size: int=1): |
|
|
|
self.capture_id = capture_id |
|
self.buffer_size = buffer_size |
|
self.capture: VideoCapture |
|
self.capture_thread: Thread |
|
self.buffer = deque([], buffer_size) |
|
self.stop_capture = False |
|
|
|
|
|
def __enter__(self) -> Self: |
|
|
|
self.capture = VideoCapture(self.capture_id) |
|
|
|
if not self.capture.isOpened(): |
|
raise IOError("Unable to open device.") |
|
|
|
self.capture_thread = Thread(target=self.start_capture) |
|
self.capture_thread.start() |
|
|
|
return self |
|
|
|
|
|
def __exit__(self, *_): |
|
|
|
self.stop_capture = True |
|
self.capture_thread.join() |
|
self.capture.release() |
|
|
|
|
|
def start_capture(self): |
|
|
|
while True: |
|
if self.stop_capture: |
|
break |
|
|
|
_, frame = self.capture.read() |
|
|
|
if len(self.buffer) == self.buffer_size: |
|
self.buffer.popleft() |
|
|
|
self.buffer.append(frame) |
|
|
|
|
|
def is_capturing(self) -> bool: |
|
|
|
print(f"Filling buffer: {len(self.buffer)}/{self.buffer_size}") |
|
return len(self.buffer) == self.buffer_size |
|
|