refactor resizing into generic priority queues

This commit is contained in:
Blake Blackshear 2019-12-21 07:15:39 -06:00
parent ab3e70b4db
commit 4180c710cd
3 changed files with 88 additions and 20 deletions

View File

@ -72,11 +72,12 @@ def main():
# Queue for prepped frames, max size set to (number of cameras * 5) # Queue for prepped frames, max size set to (number of cameras * 5)
max_queue_size = len(CONFIG['cameras'].items())*5 max_queue_size = len(CONFIG['cameras'].items())*5
prepped_frame_queue = queue.Queue(max_queue_size) prepped_frame_queue = queue.PriorityQueue(max_queue_size)
cameras = {} cameras = {}
for name, config in CONFIG['cameras'].items(): for name, config in CONFIG['cameras'].items():
cameras[name] = Camera(name, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, config, prepped_frame_queue, client, MQTT_TOPIC_PREFIX) cameras[name] = Camera(name, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, config,
prepped_frame_queue, client, MQTT_TOPIC_PREFIX)
prepped_queue_processor = PreppedQueueProcessor( prepped_queue_processor = PreppedQueueProcessor(
cameras, cameras,

View File

@ -27,6 +27,7 @@ class PreppedQueueProcessor(threading.Thread):
# print(self.engine.get_inference_time()) # print(self.engine.get_inference_time())
# parse and pass detected objects back to the camera # parse and pass detected objects back to the camera
# TODO: just send this back with all the same info you received and objects as a new property
parsed_objects = [] parsed_objects = []
for obj in objects: for obj in objects:
parsed_objects.append({ parsed_objects.append({
@ -92,3 +93,67 @@ class FramePrepper(threading.Thread):
}) })
else: else:
print("queue full. moving on") print("queue full. moving on")
class RegionRequester(threading.Thread):
def __init__(self, camera):
self.camera = camera
def run(self):
frame_time = 0.0
while True:
now = datetime.datetime.now().timestamp()
with self.camera.frame_ready:
# if there isnt a frame ready for processing or it is old, wait for a new frame
if self.camera.frame_time.value == frame_time or (now - self.camera.frame_time.value) > 0.5:
self.camera.frame_ready.wait()
# make a copy of the frame_time
frame_time = self.camera.frame_time.value
for index, region in enumerate(self.camera.config['regions']):
# queue with priority 1
self.camera.resize_queue.put((1, {
'camera_name': self.camera.name,
'frame_time': frame_time,
'region_id': index,
'size': region['size'],
'x_offset': region['x_offset'],
'y_offset': region['y_offset']
}))
class RegionPrepper(threading.Thread):
def __init__(self, frame_cache, resize_request_queue, prepped_frame_queue):
threading.Thread.__init__(self)
self.frame_cache = frame_cache
self.resize_request_queue = resize_request_queue
self.prepped_frame_queue = prepped_frame_queue
def run(self):
while True:
resize_request = self.resize_request_queue.get()
frame = self.frame_cache.get(resize_request['frame_time'], None)
if frame is None:
print("RegionPrepper: frame_time not in frame_cache")
continue
# make a copy of the region
cropped_frame = frame[resize_request['y_offset']:resize_request['y_offset']+resize_request['size'], resize_request['x_offset']:resize_request['x_offset']+resize_request['size']].copy()
# Resize to 300x300 if needed
if cropped_frame.shape != (300, 300, 3):
cropped_frame = cv2.resize(cropped_frame, dsize=(300, 300), interpolation=cv2.INTER_LINEAR)
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
frame_expanded = np.expand_dims(cropped_frame, axis=0)
# add the frame to the queue
if not self.prepped_frame_queue.full():
resize_request['frame'] = frame_expanded.flatten().copy()
# add to queue with priority 1
self.prepped_frame_queue.put((1, resize_request))
else:
print("queue full. moving on")

View File

@ -2,6 +2,7 @@ import os
import time import time
import datetime import datetime
import cv2 import cv2
import queue
import threading import threading
import ctypes import ctypes
import multiprocessing as mp import multiprocessing as mp
@ -9,11 +10,11 @@ import subprocess as sp
import numpy as np import numpy as np
from collections import defaultdict from collections import defaultdict
from . util import tonumpyarray, draw_box_with_label from . util import tonumpyarray, draw_box_with_label
from . object_detection import FramePrepper from . object_detection import FramePrepper, RegionPrepper, RegionRequester
from . objects import ObjectCleaner, BestFrames from . objects import ObjectCleaner, BestFrames
from . mqtt import MqttObjectPublisher from . mqtt import MqttObjectPublisher
# Stores 2 seconds worth of frames when motion is detected so they can be used for other threads # Stores 2 seconds worth of frames so they can be used for other threads
class FrameTracker(threading.Thread): class FrameTracker(threading.Thread):
def __init__(self, shared_frame, frame_time, frame_ready, frame_lock, recent_frames): def __init__(self, shared_frame, frame_time, frame_ready, frame_lock, recent_frames):
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -116,7 +117,7 @@ class Camera:
self.name = name self.name = name
self.config = config self.config = config
self.detected_objects = [] self.detected_objects = []
self.recent_frames = {} self.frame_cache = {}
self.ffmpeg = config.get('ffmpeg', {}) self.ffmpeg = config.get('ffmpeg', {})
self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input']) self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input'])
@ -145,6 +146,10 @@ class Camera:
# Condition for notifying that objects were parsed # Condition for notifying that objects were parsed
self.objects_parsed = mp.Condition() self.objects_parsed = mp.Condition()
# Queue for prepped frames, max size set to (number of regions * 5)
max_queue_size = len(self.config['regions'])*5
self.resize_queue = queue.PriorityQueue(max_queue_size)
# initialize the frame cache # initialize the frame cache
self.cached_frame_with_objects = { self.cached_frame_with_objects = {
'frame_bytes': [], 'frame_bytes': [],
@ -154,9 +159,9 @@ class Camera:
self.ffmpeg_process = None self.ffmpeg_process = None
self.capture_thread = None self.capture_thread = None
# for each region, create a separate thread to resize the region and prep for detection # for each region, merge the object config
self.detection_prep_threads = [] self.detection_prep_threads = []
for index, region in enumerate(self.config['regions']): for region in self.config['regions']:
region_objects = region.get('objects', {}) region_objects = region.get('objects', {})
# build objects config for region # build objects config for region
objects_with_config = set().union(global_objects_config.keys(), camera_objects_config.keys(), region_objects.keys()) objects_with_config = set().union(global_objects_config.keys(), camera_objects_config.keys(), region_objects.keys())
@ -166,23 +171,20 @@ class Camera:
region['objects'] = merged_objects_config region['objects'] = merged_objects_config
self.detection_prep_threads.append(FramePrepper( # start a thread to queue resize requests for regions
self.name, self.region_requester = RegionRequester(self)
self.current_frame,
self.frame_time,
self.frame_ready,
self.frame_lock,
region['size'], region['x_offset'], region['y_offset'], index,
prepped_frame_queue
))
# start a thread to store recent motion frames for processing # start a thread to cache recent frames for processing
self.frame_tracker = FrameTracker(self.current_frame, self.frame_time, self.frame_tracker = FrameTracker(self.current_frame, self.frame_time,
self.frame_ready, self.frame_lock, self.recent_frames) self.frame_ready, self.frame_lock, self.frame_cache)
self.frame_tracker.start() self.frame_tracker.start()
# start a thread to resize regions
self.region_prepper = RegionPrepper(self.frame_cache, self.resize_queue, prepped_frame_queue)
self.region_prepper.start()
# start a thread to store the highest scoring recent frames for monitored object types # start a thread to store the highest scoring recent frames for monitored object types
self.best_frames = BestFrames(self.objects_parsed, self.recent_frames, self.detected_objects) self.best_frames = BestFrames(self.objects_parsed, self.frame_cache, self.detected_objects)
self.best_frames.start() self.best_frames.start()
# start a thread to expire objects from the detected objects list # start a thread to expire objects from the detected objects list