From bca4e78e9a3fa9cefcfe51e58ac06bee3cbcdbe0 Mon Sep 17 00:00:00 2001 From: blakeblackshear Date: Mon, 25 Mar 2019 06:24:36 -0500 Subject: [PATCH] use a queue instead --- detect_objects.py | 61 +++++++---- frigate/object_detection.py | 196 ++++++++++++++++++++++-------------- 2 files changed, 159 insertions(+), 98 deletions(-) diff --git a/detect_objects.py b/detect_objects.py index bcfa10a49..7c52f115a 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -6,6 +6,7 @@ import datetime import ctypes import logging import multiprocessing as mp +import queue import threading import json from contextlib import closing @@ -19,7 +20,7 @@ from frigate.mqtt import MqttMotionPublisher, MqttObjectPublisher from frigate.objects import ObjectParser, ObjectCleaner, BestPersonFrame from frigate.motion import detect_motion from frigate.video import fetch_frames, FrameTracker -from frigate.object_detection import prep_for_detection, detect_objects +from frigate.object_detection import FramePrepper, PreppedQueueProcessor, detect_objects RTSP_URL = os.getenv('RTSP_URL') @@ -82,10 +83,20 @@ def main(): frame_ready = mp.Condition() # Condition for notifying that motion status changed globally motion_changed = mp.Condition() + + prepped_frame_array = mp.Array(ctypes.c_uint8, 300*300*3) + # create shared value for storing the frame_time + prepped_frame_time = mp.Value('d', 0.0) + # Event for notifying that object detection needs a new frame + prepped_frame_grabbed = mp.Event() + prepped_frame_ready = mp.Event() # Condition for notifying that objects were parsed objects_parsed = mp.Condition() # Queue for detected objects object_queue = mp.Queue() + # Queue for prepped frames + prepped_frame_queue = queue.Queue() + prepped_frame_box = mp.Array(ctypes.c_uint16, 3) # shape current frame so it can be treated as an image frame_arr = tonumpyarray(shared_arr).reshape(frame_shape) @@ -96,21 +107,18 @@ def main(): capture_process.daemon = True # for each region, start a separate process for motion detection and object detection - detection_prep_processes = [] + detection_prep_threads = [] motion_processes = [] for region in regions: - # possibly try putting these on threads and putting prepped - # frames in a queue - detection_prep_process = mp.Process(target=prep_for_detection, args=(shared_arr, + detection_prep_threads.append(FramePrepper( + frame_arr, shared_frame_time, - frame_lock, frame_ready, + frame_ready, + frame_lock, region['motion_detected'], - frame_shape, region['size'], region['x_offset'], region['y_offset'], - region['prepped_frame_array'], region['prepped_frame_time'], - region['prepped_frame_lock'])) - detection_prep_process.daemon = True - detection_prep_processes.append(detection_prep_process) + prepped_frame_queue + )) motion_process = mp.Process(target=detect_motion, args=(shared_arr, shared_frame_time, @@ -124,13 +132,25 @@ def main(): motion_process.daemon = True motion_processes.append(motion_process) + prepped_queue_processor = PreppedQueueProcessor( + prepped_frame_array, + prepped_frame_time, + prepped_frame_ready, + prepped_frame_grabbed, + prepped_frame_box, + prepped_frame_queue + ) + prepped_queue_processor.start() + # create a process for object detection + # if the coprocessor is doing the work, can this run as a thread + # since it is waiting for IO? detection_process = mp.Process(target=detect_objects, args=( - [region['prepped_frame_array'] for region in regions], - [region['prepped_frame_time'] for region in regions], - [region['prepped_frame_lock'] for region in regions], - [[region['size'], region['x_offset'], region['y_offset']] for region in regions], - motion_changed, [region['motion_detected'] for region in regions], + prepped_frame_array, + prepped_frame_time, + prepped_frame_ready, + prepped_frame_grabbed, + prepped_frame_box, object_queue, DEBUG )) detection_process.daemon = True @@ -181,9 +201,8 @@ def main(): print("capture_process pid ", capture_process.pid) # start the object detection prep processes - for detection_prep_process in detection_prep_processes: - detection_prep_process.start() - print("detection_prep_process pid ", detection_prep_process.pid) + for detection_prep_thread in detection_prep_threads: + detection_prep_thread.start() detection_process.start() print("detection_process pid ", detection_process.pid) @@ -256,8 +275,8 @@ def main(): app.run(host='0.0.0.0', debug=False) capture_process.join() - for detection_prep_process in detection_prep_processes: - detection_prep_process.join() + for detection_prep_thread in detection_prep_threads: + detection_prep_thread.join() for motion_process in motion_processes: motion_process.join() detection_process.join() diff --git a/frigate/object_detection.py b/frigate/object_detection.py index da0375e96..faad222da 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -1,5 +1,6 @@ import datetime import cv2 +import threading import numpy as np from edgetpu.detection.engine import DetectionEngine from . util import tonumpyarray @@ -19,9 +20,11 @@ def ReadLabelFile(file_path): ret[int(pair[0])] = pair[1].strip() return ret -def detect_objects(prepped_frame_arrays, prepped_frame_times, prepped_frame_locks, - prepped_frame_boxes, motion_changed, motion_regions, object_queue, debug): - prepped_frame_nps = [tonumpyarray(prepped_frame_array) for prepped_frame_array in prepped_frame_arrays] +def detect_objects(prepped_frame_array, prepped_frame_time, + prepped_frame_ready, prepped_frame_grabbed, + prepped_frame_box, object_queue, debug): + prepped_frame_np = tonumpyarray(prepped_frame_array) + # Load the edgetpu engine and labels engine = DetectionEngine(PATH_TO_CKPT) labels = ReadLabelFile(PATH_TO_LABELS) @@ -29,85 +32,124 @@ def detect_objects(prepped_frame_arrays, prepped_frame_times, prepped_frame_lock frame_time = 0.0 region_box = [0,0,0] while True: - # while there is motion - while len([r for r in motion_regions if r.is_set()]) > 0: - - # loop over all the motion regions and look for objects - for i, motion_region in enumerate(motion_regions): - # skip the region if no motion - if not motion_region.is_set(): - continue + # wait until a frame is ready + prepped_frame_grabbed.clear() + prepped_frame_ready.wait() - # make a copy of the cropped frame - with prepped_frame_locks[i]: - prepped_frame_copy = prepped_frame_nps[i].copy() - frame_time = prepped_frame_times[i].value - region_box[:] = prepped_frame_boxes[i] + prepped_frame_copy = prepped_frame_np.copy() + frame_time = prepped_frame_time.value + region_box[:] = prepped_frame_box - # Actual detection. - objects = engine.DetectWithInputTensor(prepped_frame_copy, threshold=0.5, top_k=3) - # print(engine.get_inference_time()) - # put detected objects in the queue - if objects: - for obj in objects: - box = obj.bounding_box.flatten().tolist() - object_queue.put({ - 'frame_time': frame_time, - 'name': str(labels[obj.label_id]), - 'score': float(obj.score), - 'xmin': int((box[0] * region_box[0]) + region_box[1]), - 'ymin': int((box[1] * region_box[0]) + region_box[2]), - 'xmax': int((box[2] * region_box[0]) + region_box[1]), - 'ymax': int((box[3] * region_box[0]) + region_box[2]) - }) - else: - object_queue.put({ - 'frame_time': frame_time, - 'name': 'dummy', - 'score': 0.99, - 'xmin': int(0 + region_box[1]), - 'ymin': int(0 + region_box[2]), - 'xmax': int(10 + region_box[1]), - 'ymax': int(10 + region_box[2]) - }) - # wait for the global motion flag to change - with motion_changed: - motion_changed.wait() + prepped_frame_grabbed.set() -def prep_for_detection(shared_whole_frame_array, shared_frame_time, frame_lock, frame_ready, - motion_detected, frame_shape, region_size, region_x_offset, region_y_offset, - prepped_frame_array, prepped_frame_time, prepped_frame_lock): - # shape shared input array into frame for processing - shared_whole_frame = tonumpyarray(shared_whole_frame_array).reshape(frame_shape) + # Actual detection. + objects = engine.DetectWithInputTensor(prepped_frame_copy, threshold=0.5, top_k=3) + # print(engine.get_inference_time()) + # put detected objects in the queue + if objects: + for obj in objects: + box = obj.bounding_box.flatten().tolist() + object_queue.put({ + 'frame_time': frame_time, + 'name': str(labels[obj.label_id]), + 'score': float(obj.score), + 'xmin': int((box[0] * region_box[0]) + region_box[1]), + 'ymin': int((box[1] * region_box[0]) + region_box[2]), + 'xmax': int((box[2] * region_box[0]) + region_box[1]), + 'ymax': int((box[3] * region_box[0]) + region_box[2]) + }) + else: + object_queue.put({ + 'frame_time': frame_time, + 'name': 'dummy', + 'score': 0.99, + 'xmin': int(0 + region_box[1]), + 'ymin': int(0 + region_box[2]), + 'xmax': int(10 + region_box[1]), + 'ymax': int(10 + region_box[2]) + }) - shared_prepped_frame = tonumpyarray(prepped_frame_array).reshape((1,300,300,3)) +class PreppedQueueProcessor(threading.Thread): + def __init__(self, prepped_frame_array, + prepped_frame_time, + prepped_frame_ready, + prepped_frame_grabbed, + prepped_frame_box, + prepped_frame_queue): - frame_time = 0.0 - while True: - now = datetime.datetime.now().timestamp() + threading.Thread.__init__(self) + self.prepped_frame_array = prepped_frame_array + self.prepped_frame_time = prepped_frame_time + self.prepped_frame_ready = prepped_frame_ready + self.prepped_frame_grabbed = prepped_frame_grabbed + self.prepped_frame_box = prepped_frame_box + self.prepped_frame_queue = prepped_frame_queue - # wait until motion is detected - motion_detected.wait() + def run(self): + prepped_frame_np = tonumpyarray(self.prepped_frame_array) + # process queue... + while True: + frame = self.prepped_frame_queue.get() + print(self.prepped_frame_queue.qsize()) + prepped_frame_np[:] = frame['frame'] + self.prepped_frame_time.value = frame['frame_time'] + self.prepped_frame_box[0] = frame['region_size'] + self.prepped_frame_box[1] = frame['region_x_offset'] + self.prepped_frame_box[2] = frame['region_y_offset'] + self.prepped_frame_ready.set() + self.prepped_frame_grabbed.wait() + self.prepped_frame_ready.clear() - with frame_ready: - # if there isnt a frame ready for processing or it is old, wait for a new frame - if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5: - frame_ready.wait() - - # make a copy of the cropped frame - with frame_lock: - cropped_frame = shared_whole_frame[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy() - frame_time = shared_frame_time.value - - # convert to RGB - cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB) - # Resize to 300x300 if needed - if cropped_frame_rgb.shape != (300, 300, 3): - cropped_frame_rgb = cv2.resize(cropped_frame_rgb, dsize=(300, 300), interpolation=cv2.INTER_LINEAR) - # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] - frame_expanded = np.expand_dims(cropped_frame_rgb, axis=0) - # copy the prepped frame to the shared output array - with prepped_frame_lock: - shared_prepped_frame[:] = frame_expanded - prepped_frame_time.value = frame_time +# should this be a region class? +class FramePrepper(threading.Thread): + def __init__(self, shared_frame, frame_time, frame_ready, + frame_lock, motion_detected, + region_size, region_x_offset, region_y_offset, + prepped_frame_queue): + + threading.Thread.__init__(self) + self.shared_frame = shared_frame + self.frame_time = frame_time + self.frame_ready = frame_ready + self.frame_lock = frame_lock + self.motion_detected = motion_detected + self.region_size = region_size + self.region_x_offset = region_x_offset + self.region_y_offset = region_y_offset + self.prepped_frame_queue = prepped_frame_queue + + def run(self): + frame_time = 0.0 + while True: + now = datetime.datetime.now().timestamp() + + # wait until motion is detected + self.motion_detected.wait() + + with self.frame_ready: + # if there isnt a frame ready for processing or it is old, wait for a new frame + if self.frame_time.value == frame_time or (now - self.frame_time.value) > 0.5: + self.frame_ready.wait() + + # make a copy of the cropped frame + with self.frame_lock: + cropped_frame = self.shared_frame[self.region_y_offset:self.region_y_offset+self.region_size, self.region_x_offset:self.region_x_offset+self.region_size].copy() + frame_time = self.frame_time.value + + # convert to RGB + cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB) + # Resize to 300x300 if needed + if cropped_frame_rgb.shape != (300, 300, 3): + cropped_frame_rgb = cv2.resize(cropped_frame_rgb, dsize=(300, 300), interpolation=cv2.INTER_LINEAR) + # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] + frame_expanded = np.expand_dims(cropped_frame_rgb, axis=0) + + # add the frame to the queue + self.prepped_frame_queue.put({ + 'frame_time': frame_time, + 'frame': frame_expanded.flatten().copy(), + 'region_size': self.region_size, + 'region_x_offset': self.region_x_offset, + 'region_y_offset': self.region_y_offset + })