From 4180c710cd93a6a67d163d5f565ef308c50e6dc9 Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Sat, 21 Dec 2019 07:15:39 -0600 Subject: [PATCH] refactor resizing into generic priority queues --- detect_objects.py | 5 +-- frigate/object_detection.py | 65 +++++++++++++++++++++++++++++++++++++ frigate/video.py | 38 ++++++++++++---------- 3 files changed, 88 insertions(+), 20 deletions(-) diff --git a/detect_objects.py b/detect_objects.py index 565f36f30..186490417 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -72,11 +72,12 @@ def main(): # Queue for prepped frames, max size set to (number of cameras * 5) max_queue_size = len(CONFIG['cameras'].items())*5 - prepped_frame_queue = queue.Queue(max_queue_size) + prepped_frame_queue = queue.PriorityQueue(max_queue_size) cameras = {} for name, config in CONFIG['cameras'].items(): - cameras[name] = Camera(name, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, config, prepped_frame_queue, client, MQTT_TOPIC_PREFIX) + cameras[name] = Camera(name, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, config, + prepped_frame_queue, client, MQTT_TOPIC_PREFIX) prepped_queue_processor = PreppedQueueProcessor( cameras, diff --git a/frigate/object_detection.py b/frigate/object_detection.py index b45a08918..98ffbad52 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -27,6 +27,7 @@ class PreppedQueueProcessor(threading.Thread): # print(self.engine.get_inference_time()) # parse and pass detected objects back to the camera + # TODO: just send this back with all the same info you received and objects as a new property parsed_objects = [] for obj in objects: parsed_objects.append({ @@ -92,3 +93,67 @@ class FramePrepper(threading.Thread): }) else: print("queue full. moving on") + +class RegionRequester(threading.Thread): + def __init__(self, camera): + self.camera = camera + + def run(self): + frame_time = 0.0 + while True: + now = datetime.datetime.now().timestamp() + + with self.camera.frame_ready: + # if there isnt a frame ready for processing or it is old, wait for a new frame + if self.camera.frame_time.value == frame_time or (now - self.camera.frame_time.value) > 0.5: + self.camera.frame_ready.wait() + + # make a copy of the frame_time + frame_time = self.camera.frame_time.value + + for index, region in enumerate(self.camera.config['regions']): + # queue with priority 1 + self.camera.resize_queue.put((1, { + 'camera_name': self.camera.name, + 'frame_time': frame_time, + 'region_id': index, + 'size': region['size'], + 'x_offset': region['x_offset'], + 'y_offset': region['y_offset'] + })) + +class RegionPrepper(threading.Thread): + def __init__(self, frame_cache, resize_request_queue, prepped_frame_queue): + + threading.Thread.__init__(self) + self.frame_cache = frame_cache + self.resize_request_queue = resize_request_queue + self.prepped_frame_queue = prepped_frame_queue + + def run(self): + while True: + + resize_request = self.resize_request_queue.get() + + frame = self.frame_cache.get(resize_request['frame_time'], None) + + if frame is None: + print("RegionPrepper: frame_time not in frame_cache") + continue + + # make a copy of the region + cropped_frame = frame[resize_request['y_offset']:resize_request['y_offset']+resize_request['size'], resize_request['x_offset']:resize_request['x_offset']+resize_request['size']].copy() + + # Resize to 300x300 if needed + if cropped_frame.shape != (300, 300, 3): + cropped_frame = cv2.resize(cropped_frame, dsize=(300, 300), interpolation=cv2.INTER_LINEAR) + # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] + frame_expanded = np.expand_dims(cropped_frame, axis=0) + + # add the frame to the queue + if not self.prepped_frame_queue.full(): + resize_request['frame'] = frame_expanded.flatten().copy() + # add to queue with priority 1 + self.prepped_frame_queue.put((1, resize_request)) + else: + print("queue full. moving on") \ No newline at end of file diff --git a/frigate/video.py b/frigate/video.py index e8532f6d3..ed3ab5ce3 100644 --- a/frigate/video.py +++ b/frigate/video.py @@ -2,6 +2,7 @@ import os import time import datetime import cv2 +import queue import threading import ctypes import multiprocessing as mp @@ -9,11 +10,11 @@ import subprocess as sp import numpy as np from collections import defaultdict from . util import tonumpyarray, draw_box_with_label -from . object_detection import FramePrepper +from . object_detection import FramePrepper, RegionPrepper, RegionRequester from . objects import ObjectCleaner, BestFrames from . mqtt import MqttObjectPublisher -# Stores 2 seconds worth of frames when motion is detected so they can be used for other threads +# Stores 2 seconds worth of frames so they can be used for other threads class FrameTracker(threading.Thread): def __init__(self, shared_frame, frame_time, frame_ready, frame_lock, recent_frames): threading.Thread.__init__(self) @@ -116,7 +117,7 @@ class Camera: self.name = name self.config = config self.detected_objects = [] - self.recent_frames = {} + self.frame_cache = {} self.ffmpeg = config.get('ffmpeg', {}) self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input']) @@ -144,6 +145,10 @@ class Camera: self.frame_ready = mp.Condition() # Condition for notifying that objects were parsed self.objects_parsed = mp.Condition() + + # Queue for prepped frames, max size set to (number of regions * 5) + max_queue_size = len(self.config['regions'])*5 + self.resize_queue = queue.PriorityQueue(max_queue_size) # initialize the frame cache self.cached_frame_with_objects = { @@ -154,9 +159,9 @@ class Camera: self.ffmpeg_process = None self.capture_thread = None - # for each region, create a separate thread to resize the region and prep for detection + # for each region, merge the object config self.detection_prep_threads = [] - for index, region in enumerate(self.config['regions']): + for region in self.config['regions']: region_objects = region.get('objects', {}) # build objects config for region objects_with_config = set().union(global_objects_config.keys(), camera_objects_config.keys(), region_objects.keys()) @@ -166,23 +171,20 @@ class Camera: region['objects'] = merged_objects_config - self.detection_prep_threads.append(FramePrepper( - self.name, - self.current_frame, - self.frame_time, - self.frame_ready, - self.frame_lock, - region['size'], region['x_offset'], region['y_offset'], index, - prepped_frame_queue - )) - - # start a thread to store recent motion frames for processing + # start a thread to queue resize requests for regions + self.region_requester = RegionRequester(self) + + # start a thread to cache recent frames for processing self.frame_tracker = FrameTracker(self.current_frame, self.frame_time, - self.frame_ready, self.frame_lock, self.recent_frames) + self.frame_ready, self.frame_lock, self.frame_cache) self.frame_tracker.start() + # start a thread to resize regions + self.region_prepper = RegionPrepper(self.frame_cache, self.resize_queue, prepped_frame_queue) + self.region_prepper.start() + # start a thread to store the highest scoring recent frames for monitored object types - self.best_frames = BestFrames(self.objects_parsed, self.recent_frames, self.detected_objects) + self.best_frames = BestFrames(self.objects_parsed, self.frame_cache, self.detected_objects) self.best_frames.start() # start a thread to expire objects from the detected objects list