use a queue instead

This commit is contained in:
blakeblackshear 2019-03-25 06:24:36 -05:00
parent 7d3027e056
commit bca4e78e9a
2 changed files with 159 additions and 98 deletions

View File

@ -6,6 +6,7 @@ import datetime
import ctypes import ctypes
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import queue
import threading import threading
import json import json
from contextlib import closing from contextlib import closing
@ -19,7 +20,7 @@ from frigate.mqtt import MqttMotionPublisher, MqttObjectPublisher
from frigate.objects import ObjectParser, ObjectCleaner, BestPersonFrame from frigate.objects import ObjectParser, ObjectCleaner, BestPersonFrame
from frigate.motion import detect_motion from frigate.motion import detect_motion
from frigate.video import fetch_frames, FrameTracker from frigate.video import fetch_frames, FrameTracker
from frigate.object_detection import prep_for_detection, detect_objects from frigate.object_detection import FramePrepper, PreppedQueueProcessor, detect_objects
RTSP_URL = os.getenv('RTSP_URL') RTSP_URL = os.getenv('RTSP_URL')
@ -82,10 +83,20 @@ def main():
frame_ready = mp.Condition() frame_ready = mp.Condition()
# Condition for notifying that motion status changed globally # Condition for notifying that motion status changed globally
motion_changed = mp.Condition() motion_changed = mp.Condition()
prepped_frame_array = mp.Array(ctypes.c_uint8, 300*300*3)
# create shared value for storing the frame_time
prepped_frame_time = mp.Value('d', 0.0)
# Event for notifying that object detection needs a new frame
prepped_frame_grabbed = mp.Event()
prepped_frame_ready = mp.Event()
# Condition for notifying that objects were parsed # Condition for notifying that objects were parsed
objects_parsed = mp.Condition() objects_parsed = mp.Condition()
# Queue for detected objects # Queue for detected objects
object_queue = mp.Queue() object_queue = mp.Queue()
# Queue for prepped frames
prepped_frame_queue = queue.Queue()
prepped_frame_box = mp.Array(ctypes.c_uint16, 3)
# shape current frame so it can be treated as an image # shape current frame so it can be treated as an image
frame_arr = tonumpyarray(shared_arr).reshape(frame_shape) frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
@ -96,21 +107,18 @@ def main():
capture_process.daemon = True capture_process.daemon = True
# for each region, start a separate process for motion detection and object detection # for each region, start a separate process for motion detection and object detection
detection_prep_processes = [] detection_prep_threads = []
motion_processes = [] motion_processes = []
for region in regions: for region in regions:
# possibly try putting these on threads and putting prepped detection_prep_threads.append(FramePrepper(
# frames in a queue frame_arr,
detection_prep_process = mp.Process(target=prep_for_detection, args=(shared_arr,
shared_frame_time, shared_frame_time,
frame_lock, frame_ready, frame_ready,
frame_lock,
region['motion_detected'], region['motion_detected'],
frame_shape,
region['size'], region['x_offset'], region['y_offset'], region['size'], region['x_offset'], region['y_offset'],
region['prepped_frame_array'], region['prepped_frame_time'], prepped_frame_queue
region['prepped_frame_lock'])) ))
detection_prep_process.daemon = True
detection_prep_processes.append(detection_prep_process)
motion_process = mp.Process(target=detect_motion, args=(shared_arr, motion_process = mp.Process(target=detect_motion, args=(shared_arr,
shared_frame_time, shared_frame_time,
@ -124,13 +132,25 @@ def main():
motion_process.daemon = True motion_process.daemon = True
motion_processes.append(motion_process) motion_processes.append(motion_process)
prepped_queue_processor = PreppedQueueProcessor(
prepped_frame_array,
prepped_frame_time,
prepped_frame_ready,
prepped_frame_grabbed,
prepped_frame_box,
prepped_frame_queue
)
prepped_queue_processor.start()
# create a process for object detection # create a process for object detection
# if the coprocessor is doing the work, can this run as a thread
# since it is waiting for IO?
detection_process = mp.Process(target=detect_objects, args=( detection_process = mp.Process(target=detect_objects, args=(
[region['prepped_frame_array'] for region in regions], prepped_frame_array,
[region['prepped_frame_time'] for region in regions], prepped_frame_time,
[region['prepped_frame_lock'] for region in regions], prepped_frame_ready,
[[region['size'], region['x_offset'], region['y_offset']] for region in regions], prepped_frame_grabbed,
motion_changed, [region['motion_detected'] for region in regions], prepped_frame_box,
object_queue, DEBUG object_queue, DEBUG
)) ))
detection_process.daemon = True detection_process.daemon = True
@ -181,9 +201,8 @@ def main():
print("capture_process pid ", capture_process.pid) print("capture_process pid ", capture_process.pid)
# start the object detection prep processes # start the object detection prep processes
for detection_prep_process in detection_prep_processes: for detection_prep_thread in detection_prep_threads:
detection_prep_process.start() detection_prep_thread.start()
print("detection_prep_process pid ", detection_prep_process.pid)
detection_process.start() detection_process.start()
print("detection_process pid ", detection_process.pid) print("detection_process pid ", detection_process.pid)
@ -256,8 +275,8 @@ def main():
app.run(host='0.0.0.0', debug=False) app.run(host='0.0.0.0', debug=False)
capture_process.join() capture_process.join()
for detection_prep_process in detection_prep_processes: for detection_prep_thread in detection_prep_threads:
detection_prep_process.join() detection_prep_thread.join()
for motion_process in motion_processes: for motion_process in motion_processes:
motion_process.join() motion_process.join()
detection_process.join() detection_process.join()

View File

@ -1,5 +1,6 @@
import datetime import datetime
import cv2 import cv2
import threading
import numpy as np import numpy as np
from edgetpu.detection.engine import DetectionEngine from edgetpu.detection.engine import DetectionEngine
from . util import tonumpyarray from . util import tonumpyarray
@ -19,9 +20,11 @@ def ReadLabelFile(file_path):
ret[int(pair[0])] = pair[1].strip() ret[int(pair[0])] = pair[1].strip()
return ret return ret
def detect_objects(prepped_frame_arrays, prepped_frame_times, prepped_frame_locks, def detect_objects(prepped_frame_array, prepped_frame_time,
prepped_frame_boxes, motion_changed, motion_regions, object_queue, debug): prepped_frame_ready, prepped_frame_grabbed,
prepped_frame_nps = [tonumpyarray(prepped_frame_array) for prepped_frame_array in prepped_frame_arrays] prepped_frame_box, object_queue, debug):
prepped_frame_np = tonumpyarray(prepped_frame_array)
# Load the edgetpu engine and labels # Load the edgetpu engine and labels
engine = DetectionEngine(PATH_TO_CKPT) engine = DetectionEngine(PATH_TO_CKPT)
labels = ReadLabelFile(PATH_TO_LABELS) labels = ReadLabelFile(PATH_TO_LABELS)
@ -29,85 +32,124 @@ def detect_objects(prepped_frame_arrays, prepped_frame_times, prepped_frame_lock
frame_time = 0.0 frame_time = 0.0
region_box = [0,0,0] region_box = [0,0,0]
while True: while True:
# while there is motion # wait until a frame is ready
while len([r for r in motion_regions if r.is_set()]) > 0: prepped_frame_grabbed.clear()
prepped_frame_ready.wait()
# loop over all the motion regions and look for objects
for i, motion_region in enumerate(motion_regions):
# skip the region if no motion
if not motion_region.is_set():
continue
# make a copy of the cropped frame prepped_frame_copy = prepped_frame_np.copy()
with prepped_frame_locks[i]: frame_time = prepped_frame_time.value
prepped_frame_copy = prepped_frame_nps[i].copy() region_box[:] = prepped_frame_box
frame_time = prepped_frame_times[i].value
region_box[:] = prepped_frame_boxes[i]
# Actual detection. prepped_frame_grabbed.set()
objects = engine.DetectWithInputTensor(prepped_frame_copy, threshold=0.5, top_k=3)
# print(engine.get_inference_time())
# put detected objects in the queue
if objects:
for obj in objects:
box = obj.bounding_box.flatten().tolist()
object_queue.put({
'frame_time': frame_time,
'name': str(labels[obj.label_id]),
'score': float(obj.score),
'xmin': int((box[0] * region_box[0]) + region_box[1]),
'ymin': int((box[1] * region_box[0]) + region_box[2]),
'xmax': int((box[2] * region_box[0]) + region_box[1]),
'ymax': int((box[3] * region_box[0]) + region_box[2])
})
else:
object_queue.put({
'frame_time': frame_time,
'name': 'dummy',
'score': 0.99,
'xmin': int(0 + region_box[1]),
'ymin': int(0 + region_box[2]),
'xmax': int(10 + region_box[1]),
'ymax': int(10 + region_box[2])
})
# wait for the global motion flag to change
with motion_changed:
motion_changed.wait()
def prep_for_detection(shared_whole_frame_array, shared_frame_time, frame_lock, frame_ready, # Actual detection.
motion_detected, frame_shape, region_size, region_x_offset, region_y_offset, objects = engine.DetectWithInputTensor(prepped_frame_copy, threshold=0.5, top_k=3)
prepped_frame_array, prepped_frame_time, prepped_frame_lock): # print(engine.get_inference_time())
# shape shared input array into frame for processing # put detected objects in the queue
shared_whole_frame = tonumpyarray(shared_whole_frame_array).reshape(frame_shape) if objects:
for obj in objects:
box = obj.bounding_box.flatten().tolist()
object_queue.put({
'frame_time': frame_time,
'name': str(labels[obj.label_id]),
'score': float(obj.score),
'xmin': int((box[0] * region_box[0]) + region_box[1]),
'ymin': int((box[1] * region_box[0]) + region_box[2]),
'xmax': int((box[2] * region_box[0]) + region_box[1]),
'ymax': int((box[3] * region_box[0]) + region_box[2])
})
else:
object_queue.put({
'frame_time': frame_time,
'name': 'dummy',
'score': 0.99,
'xmin': int(0 + region_box[1]),
'ymin': int(0 + region_box[2]),
'xmax': int(10 + region_box[1]),
'ymax': int(10 + region_box[2])
})
shared_prepped_frame = tonumpyarray(prepped_frame_array).reshape((1,300,300,3)) class PreppedQueueProcessor(threading.Thread):
def __init__(self, prepped_frame_array,
prepped_frame_time,
prepped_frame_ready,
prepped_frame_grabbed,
prepped_frame_box,
prepped_frame_queue):
frame_time = 0.0 threading.Thread.__init__(self)
while True: self.prepped_frame_array = prepped_frame_array
now = datetime.datetime.now().timestamp() self.prepped_frame_time = prepped_frame_time
self.prepped_frame_ready = prepped_frame_ready
self.prepped_frame_grabbed = prepped_frame_grabbed
self.prepped_frame_box = prepped_frame_box
self.prepped_frame_queue = prepped_frame_queue
# wait until motion is detected def run(self):
motion_detected.wait() prepped_frame_np = tonumpyarray(self.prepped_frame_array)
# process queue...
while True:
frame = self.prepped_frame_queue.get()
print(self.prepped_frame_queue.qsize())
prepped_frame_np[:] = frame['frame']
self.prepped_frame_time.value = frame['frame_time']
self.prepped_frame_box[0] = frame['region_size']
self.prepped_frame_box[1] = frame['region_x_offset']
self.prepped_frame_box[2] = frame['region_y_offset']
self.prepped_frame_ready.set()
self.prepped_frame_grabbed.wait()
self.prepped_frame_ready.clear()
with frame_ready:
# if there isnt a frame ready for processing or it is old, wait for a new frame
if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5:
frame_ready.wait()
# make a copy of the cropped frame
with frame_lock:
cropped_frame = shared_whole_frame[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy()
frame_time = shared_frame_time.value
# convert to RGB
cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB)
# Resize to 300x300 if needed
if cropped_frame_rgb.shape != (300, 300, 3):
cropped_frame_rgb = cv2.resize(cropped_frame_rgb, dsize=(300, 300), interpolation=cv2.INTER_LINEAR)
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
frame_expanded = np.expand_dims(cropped_frame_rgb, axis=0)
# copy the prepped frame to the shared output array # should this be a region class?
with prepped_frame_lock: class FramePrepper(threading.Thread):
shared_prepped_frame[:] = frame_expanded def __init__(self, shared_frame, frame_time, frame_ready,
prepped_frame_time.value = frame_time frame_lock, motion_detected,
region_size, region_x_offset, region_y_offset,
prepped_frame_queue):
threading.Thread.__init__(self)
self.shared_frame = shared_frame
self.frame_time = frame_time
self.frame_ready = frame_ready
self.frame_lock = frame_lock
self.motion_detected = motion_detected
self.region_size = region_size
self.region_x_offset = region_x_offset
self.region_y_offset = region_y_offset
self.prepped_frame_queue = prepped_frame_queue
def run(self):
frame_time = 0.0
while True:
now = datetime.datetime.now().timestamp()
# wait until motion is detected
self.motion_detected.wait()
with self.frame_ready:
# if there isnt a frame ready for processing or it is old, wait for a new frame
if self.frame_time.value == frame_time or (now - self.frame_time.value) > 0.5:
self.frame_ready.wait()
# make a copy of the cropped frame
with self.frame_lock:
cropped_frame = self.shared_frame[self.region_y_offset:self.region_y_offset+self.region_size, self.region_x_offset:self.region_x_offset+self.region_size].copy()
frame_time = self.frame_time.value
# convert to RGB
cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB)
# Resize to 300x300 if needed
if cropped_frame_rgb.shape != (300, 300, 3):
cropped_frame_rgb = cv2.resize(cropped_frame_rgb, dsize=(300, 300), interpolation=cv2.INTER_LINEAR)
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
frame_expanded = np.expand_dims(cropped_frame_rgb, axis=0)
# add the frame to the queue
self.prepped_frame_queue.put({
'frame_time': frame_time,
'frame': frame_expanded.flatten().copy(),
'region_size': self.region_size,
'region_x_offset': self.region_x_offset,
'region_y_offset': self.region_y_offset
})