skip regions when the queue is too full and add more locks

This commit is contained in:
Blake Blackshear 2020-01-14 07:00:53 -06:00
parent b615b84f57
commit 3d5faa956c
3 changed files with 29 additions and 19 deletions

View File

@ -72,7 +72,6 @@ def main():
client.loop_start() client.loop_start()
# Queue for prepped frames, max size set to number of regions * 3 # Queue for prepped frames, max size set to number of regions * 3
max_queue_size = sum([len(camera['regions'])*3 for name, camera in CONFIG['cameras'].items()])
prepped_frame_queue = queue.Queue() prepped_frame_queue = queue.Queue()
cameras = {} cameras = {}
@ -81,17 +80,14 @@ def main():
prepped_frame_queue, client, MQTT_TOPIC_PREFIX) prepped_frame_queue, client, MQTT_TOPIC_PREFIX)
fps_tracker = EventsPerSecond() fps_tracker = EventsPerSecond()
queue_full_tracker = EventsPerSecond()
prepped_queue_processor = PreppedQueueProcessor( prepped_queue_processor = PreppedQueueProcessor(
cameras, cameras,
prepped_frame_queue, prepped_frame_queue,
fps_tracker, fps_tracker
queue_full_tracker
) )
prepped_queue_processor.start() prepped_queue_processor.start()
fps_tracker.start() fps_tracker.start()
queue_full_tracker.start()
for name, camera in cameras.items(): for name, camera in cameras.items():
camera.start() camera.start()
@ -111,8 +107,7 @@ def main():
'coral': { 'coral': {
'fps': fps_tracker.eps(), 'fps': fps_tracker.eps(),
'inference_speed': prepped_queue_processor.avg_inference_speed, 'inference_speed': prepped_queue_processor.avg_inference_speed,
'queue_length': prepped_frame_queue.qsize(), 'queue_length': prepped_frame_queue.qsize()
'queue_full_events_per_min': queue_full_tracker.eps(60)
} }
} }

View File

@ -2,6 +2,7 @@ import datetime
import time import time
import cv2 import cv2
import threading import threading
import copy
import prctl import prctl
import numpy as np import numpy as np
from edgetpu.detection.engine import DetectionEngine from edgetpu.detection.engine import DetectionEngine
@ -9,7 +10,7 @@ from edgetpu.detection.engine import DetectionEngine
from frigate.util import tonumpyarray, LABELS, PATH_TO_CKPT, calculate_region from frigate.util import tonumpyarray, LABELS, PATH_TO_CKPT, calculate_region
class PreppedQueueProcessor(threading.Thread): class PreppedQueueProcessor(threading.Thread):
def __init__(self, cameras, prepped_frame_queue, fps, queue_full): def __init__(self, cameras, prepped_frame_queue, fps):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.cameras = cameras self.cameras = cameras
@ -19,16 +20,12 @@ class PreppedQueueProcessor(threading.Thread):
self.engine = DetectionEngine(PATH_TO_CKPT) self.engine = DetectionEngine(PATH_TO_CKPT)
self.labels = LABELS self.labels = LABELS
self.fps = fps self.fps = fps
self.queue_full = queue_full
self.avg_inference_speed = 10 self.avg_inference_speed = 10
def run(self): def run(self):
prctl.set_name(self.__class__.__name__) prctl.set_name(self.__class__.__name__)
# process queue... # process queue...
while True: while True:
if self.prepped_frame_queue.full():
self.queue_full.update()
frame = self.prepped_frame_queue.get() frame = self.prepped_frame_queue.get()
# Actual detection. # Actual detection.
@ -58,7 +55,8 @@ class RegionRequester(threading.Thread):
frame_time = self.camera.frame_time.value frame_time = self.camera.frame_time.value
# grab the current tracked objects # grab the current tracked objects
tracked_objects = list(self.camera.object_tracker.tracked_objects.values()).copy() with self.camera.object_tracker.tracked_objects_lock:
tracked_objects = copy.deepcopy(self.camera.object_tracker.tracked_objects).values()
with self.camera.regions_in_process_lock: with self.camera.regions_in_process_lock:
self.camera.regions_in_process[frame_time] = len(self.camera.config['regions']) self.camera.regions_in_process[frame_time] = len(self.camera.config['regions'])
@ -93,8 +91,9 @@ class RegionRequester(threading.Thread):
class RegionPrepper(threading.Thread): class RegionPrepper(threading.Thread):
def __init__(self, frame_cache, resize_request_queue, prepped_frame_queue): def __init__(self, camera, frame_cache, resize_request_queue, prepped_frame_queue):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.camera = camera
self.frame_cache = frame_cache self.frame_cache = frame_cache
self.resize_request_queue = resize_request_queue self.resize_request_queue = resize_request_queue
self.prepped_frame_queue = prepped_frame_queue self.prepped_frame_queue = prepped_frame_queue
@ -105,6 +104,15 @@ class RegionPrepper(threading.Thread):
resize_request = self.resize_request_queue.get() resize_request = self.resize_request_queue.get()
# if the queue is over 100 items long, only prep dynamic regions
if resize_request['region_id'] != -1 and self.prepped_frame_queue.qsize() > 100:
with self.camera.regions_in_process_lock:
self.camera.regions_in_process[resize_request['frame_time']] -= 1
if self.camera.regions_in_process[resize_request['frame_time']] == 0:
del self.camera.regions_in_process[resize_request['frame_time']]
self.camera.skipped_region_tracker.update()
continue
frame = self.frame_cache.get(resize_request['frame_time'], None) frame = self.frame_cache.get(resize_request['frame_time'], None)
if frame is None: if frame is None:

View File

@ -167,7 +167,6 @@ class Camera:
self.objects_tracked = mp.Condition() self.objects_tracked = mp.Condition()
# Queue for prepped frames, max size set to (number of regions * 5) # Queue for prepped frames, max size set to (number of regions * 5)
max_queue_size = len(self.config['regions'])*5
self.resize_queue = queue.Queue() self.resize_queue = queue.Queue()
# Queue for raw detected objects # Queue for raw detected objects
@ -184,6 +183,7 @@ class Camera:
self.ffmpeg_process = None self.ffmpeg_process = None
self.capture_thread = None self.capture_thread = None
self.fps = EventsPerSecond() self.fps = EventsPerSecond()
self.skipped_region_tracker = EventsPerSecond()
# combine tracked objects lists # combine tracked objects lists
self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', [])) self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', []))
@ -214,7 +214,7 @@ class Camera:
self.frame_tracker.start() self.frame_tracker.start()
# start a thread to resize regions # start a thread to resize regions
self.region_prepper = RegionPrepper(self.frame_cache, self.resize_queue, prepped_frame_queue) self.region_prepper = RegionPrepper(self, self.frame_cache, self.resize_queue, prepped_frame_queue)
self.region_prepper.start() self.region_prepper.start()
# start a thread to store the highest scoring recent frames for monitored object types # start a thread to store the highest scoring recent frames for monitored object types
@ -275,6 +275,7 @@ class Camera:
print("Starting a new capture thread...") print("Starting a new capture thread...")
self.capture_thread.start() self.capture_thread.start()
self.fps.start() self.fps.start()
self.skipped_region_tracker.start()
def start_ffmpeg(self): def start_ffmpeg(self):
ffmpeg_cmd = (['ffmpeg'] + ffmpeg_cmd = (['ffmpeg'] +
@ -310,11 +311,16 @@ class Camera:
'finished_frame_queue': self.finished_frame_queue.qsize(), 'finished_frame_queue': self.finished_frame_queue.qsize(),
'refined_frame_queue': self.refined_frame_queue.qsize(), 'refined_frame_queue': self.refined_frame_queue.qsize(),
'regions_in_process': self.regions_in_process, 'regions_in_process': self.regions_in_process,
'dynamic_regions_per_sec': self.dynamic_region_fps.eps() 'dynamic_regions_per_sec': self.dynamic_region_fps.eps(),
'skipped_regions_per_sec': self.skipped_region_tracker.eps(60)
} }
def frame_with_objects(self, frame_time, tracked_objects=None): def frame_with_objects(self, frame_time, tracked_objects=None):
frame = self.frame_cache[frame_time].copy() if not frame_time in self.frame_cache:
frame = np.zeros(self.frame_shape, np.uint8)
else:
frame = self.frame_cache[frame_time].copy()
detected_objects = self.detected_objects[frame_time].copy() detected_objects = self.detected_objects[frame_time].copy()
for region in self.regions: for region in self.regions:
@ -326,7 +332,8 @@ class Camera:
# draw the bounding boxes on the screen # draw the bounding boxes on the screen
if tracked_objects is None: if tracked_objects is None:
tracked_objects = copy.deepcopy(self.object_tracker.tracked_objects) with self.object_tracker.tracked_objects_lock:
tracked_objects = copy.deepcopy(self.object_tracker.tracked_objects)
for obj in detected_objects: for obj in detected_objects:
draw_box_with_label(frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], "{}% {}".format(int(obj['score']*100), obj['area']), thickness=3) draw_box_with_label(frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], "{}% {}".format(int(obj['score']*100), obj['area']), thickness=3)