Merge pull request #34 from blakeblackshear/watchdog

0.1.2
This commit is contained in:
Blake Blackshear 2019-05-11 07:43:09 -05:00 committed by GitHub
commit e791d6646b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 8 deletions

View File

@ -21,11 +21,14 @@ cameras:
x_offset: 0 x_offset: 0
y_offset: 300 y_offset: 300
min_person_area: 5000 min_person_area: 5000
threshold: 0.5
- size: 400 - size: 400
x_offset: 350 x_offset: 350
y_offset: 250 y_offset: 250
min_person_area: 2000 min_person_area: 2000
threshold: 0.5
- size: 400 - size: 400
x_offset: 750 x_offset: 750
y_offset: 250 y_offset: 250
min_person_area: 2000 min_person_area: 2000
threshold: 0.5

View File

@ -38,7 +38,7 @@ class PreppedQueueProcessor(threading.Thread):
frame = self.prepped_frame_queue.get() frame = self.prepped_frame_queue.get()
# Actual detection. # Actual detection.
objects = self.engine.DetectWithInputTensor(frame['frame'], threshold=0.5, top_k=3) objects = self.engine.DetectWithInputTensor(frame['frame'], threshold=frame['region_threshold'], top_k=3)
# parse and pass detected objects back to the camera # parse and pass detected objects back to the camera
parsed_objects = [] parsed_objects = []
for obj in objects: for obj in objects:
@ -59,7 +59,7 @@ class PreppedQueueProcessor(threading.Thread):
class FramePrepper(threading.Thread): class FramePrepper(threading.Thread):
def __init__(self, camera_name, shared_frame, frame_time, frame_ready, def __init__(self, camera_name, shared_frame, frame_time, frame_ready,
frame_lock, frame_lock,
region_size, region_x_offset, region_y_offset, region_size, region_x_offset, region_y_offset, region_threshold,
prepped_frame_queue): prepped_frame_queue):
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -71,6 +71,7 @@ class FramePrepper(threading.Thread):
self.region_size = region_size self.region_size = region_size
self.region_x_offset = region_x_offset self.region_x_offset = region_x_offset
self.region_y_offset = region_y_offset self.region_y_offset = region_y_offset
self.region_threshold = region_threshold
self.prepped_frame_queue = prepped_frame_queue self.prepped_frame_queue = prepped_frame_queue
def run(self): def run(self):
@ -103,6 +104,7 @@ class FramePrepper(threading.Thread):
'frame_time': frame_time, 'frame_time': frame_time,
'frame': frame_expanded.flatten().copy(), 'frame': frame_expanded.flatten().copy(),
'region_size': self.region_size, 'region_size': self.region_size,
'region_threshold': self.region_threshold,
'region_x_offset': self.region_x_offset, 'region_x_offset': self.region_x_offset,
'region_y_offset': self.region_y_offset 'region_y_offset': self.region_y_offset
}) })

View File

@ -5,6 +5,7 @@ import cv2
import threading import threading
import ctypes import ctypes
import multiprocessing as mp import multiprocessing as mp
import numpy as np
from object_detection.utils import visualization_utils as vis_util from object_detection.utils import visualization_utils as vis_util
from . util import tonumpyarray from . util import tonumpyarray
from . object_detection import FramePrepper from . object_detection import FramePrepper
@ -19,6 +20,7 @@ def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_s
# start the video capture # start the video capture
video = cv2.VideoCapture() video = cv2.VideoCapture()
video.open(rtsp_url) video.open(rtsp_url)
print("Opening the RTSP Url...")
# keep the buffer small so we minimize old data # keep the buffer small so we minimize old data
video.set(cv2.CAP_PROP_BUFFERSIZE,1) video.set(cv2.CAP_PROP_BUFFERSIZE,1)
@ -108,6 +110,22 @@ def get_rtsp_url(rtsp_config):
rtsp_config['password'], rtsp_config['host'], rtsp_config['port'], rtsp_config['password'], rtsp_config['host'], rtsp_config['port'],
rtsp_config['path']) rtsp_config['path'])
class CameraWatchdog(threading.Thread):
def __init__(self, camera):
threading.Thread.__init__(self)
self.camera = camera
def run(self):
while True:
# wait a bit before checking
time.sleep(60)
if (datetime.datetime.now().timestamp() - self.camera.shared_frame_time.value) > 2:
print("last frame is more than 2 seconds old, restarting camera capture...")
self.camera.start_or_restart_capture()
time.sleep(5)
class Camera: class Camera:
def __init__(self, name, config, prepped_frame_queue, mqtt_client, mqtt_prefix): def __init__(self, name, config, prepped_frame_queue, mqtt_client, mqtt_prefix):
self.name = name self.name = name
@ -136,21 +154,24 @@ class Camera:
# shape current frame so it can be treated as a numpy image # shape current frame so it can be treated as a numpy image
self.shared_frame_np = tonumpyarray(self.shared_frame_array).reshape(self.frame_shape) self.shared_frame_np = tonumpyarray(self.shared_frame_array).reshape(self.frame_shape)
# create the process to capture frames from the RTSP stream and store in a shared array self.capture_process = None
self.capture_process = mp.Process(target=fetch_frames, args=(self.shared_frame_array,
self.shared_frame_time, self.frame_lock, self.frame_ready, self.frame_shape, self.rtsp_url))
self.capture_process.daemon = True
# for each region, create a separate thread to resize the region and prep for detection # for each region, create a separate thread to resize the region and prep for detection
self.detection_prep_threads = [] self.detection_prep_threads = []
for region in self.config['regions']: for region in self.config['regions']:
# set a default threshold of 0.5 if not defined
if not 'threshold' in region:
region['threshold'] = 0.5
if not isinstance(region['threshold'], float):
print('Threshold is not a float. Setting to 0.5 default.')
region['threshold'] = 0.5
self.detection_prep_threads.append(FramePrepper( self.detection_prep_threads.append(FramePrepper(
self.name, self.name,
self.shared_frame_np, self.shared_frame_np,
self.shared_frame_time, self.shared_frame_time,
self.frame_ready, self.frame_ready,
self.frame_lock, self.frame_lock,
region['size'], region['x_offset'], region['y_offset'], region['size'], region['x_offset'], region['y_offset'], region['threshold'],
prepped_frame_queue prepped_frame_queue
)) ))
@ -171,6 +192,9 @@ class Camera:
mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self.objects_parsed, self.detected_objects) mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self.objects_parsed, self.detected_objects)
mqtt_publisher.start() mqtt_publisher.start()
# create a watchdog thread for capture process
self.watchdog = CameraWatchdog(self)
# load in the mask for person detection # load in the mask for person detection
if 'mask' in self.config: if 'mask' in self.config:
self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE) self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE)
@ -178,11 +202,28 @@ class Camera:
self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8) self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8)
self.mask[:] = 255 self.mask[:] = 255
def start(self):
def start_or_restart_capture(self):
if not self.capture_process is None:
print("Terminating the existing capture process...")
self.capture_process.terminate()
del self.capture_process
self.capture_process = None
# create the process to capture frames from the RTSP stream and store in a shared array
print("Creating a new capture process...")
self.capture_process = mp.Process(target=fetch_frames, args=(self.shared_frame_array,
self.shared_frame_time, self.frame_lock, self.frame_ready, self.frame_shape, self.rtsp_url))
self.capture_process.daemon = True
print("Starting a new capture process...")
self.capture_process.start() self.capture_process.start()
def start(self):
self.start_or_restart_capture()
# start the object detection prep threads # start the object detection prep threads
for detection_prep_thread in self.detection_prep_threads: for detection_prep_thread in self.detection_prep_threads:
detection_prep_thread.start() detection_prep_thread.start()
self.watchdog.start()
def join(self): def join(self):
self.capture_process.join() self.capture_process.join()