-
Notifications
You must be signed in to change notification settings - Fork 30
/
camera.py
56 lines (50 loc) · 1.41 KB
/
camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import cv2
import threading
import time
import logging
logger = logging.getLogger(__name__)
thread = None
class Camera:
def __init__(self,fps=20,video_source=0):
logger.info(f"Initializing camera class with {fps} fps and video_source={video_source}")
self.fps = fps
self.video_source = video_source
self.camera = cv2.VideoCapture(self.video_source)
# We want a max of 5s history to be stored, thats 5s*fps
self.max_frames = 5*self.fps
self.frames = []
self.isrunning = False
def run(self):
logging.debug("Perparing thread")
global thread
if thread is None:
logging.debug("Creating thread")
thread = threading.Thread(target=self._capture_loop,daemon=True)
logger.debug("Starting thread")
self.isrunning = True
thread.start()
logger.info("Thread started")
def _capture_loop(self):
dt = 1/self.fps
logger.debug("Observation started")
while self.isrunning:
v,im = self.camera.read()
if v:
if len(self.frames)==self.max_frames:
self.frames = self.frames[1:]
self.frames.append(im)
time.sleep(dt)
logger.info("Thread stopped successfully")
def stop(self):
logger.debug("Stopping thread")
self.isrunning = False
def get_frame(self, _bytes=True):
if len(self.frames)>0:
if _bytes:
img = cv2.imencode('.png',self.frames[-1])[1].tobytes()
else:
img = self.frames[-1]
else:
with open("images/not_found.jpeg","rb") as f:
img = f.read()
return img