mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-21 19:07:46 +01:00
skip regions when the queue is too full and add more locks
This commit is contained in:
parent
cfffb219ae
commit
50bcad8b77
@ -72,7 +72,6 @@ def main():
|
|||||||
client.loop_start()
|
client.loop_start()
|
||||||
|
|
||||||
# Queue for prepped frames, max size set to number of regions * 3
|
# Queue for prepped frames, max size set to number of regions * 3
|
||||||
max_queue_size = sum([len(camera['regions'])*3 for name, camera in CONFIG['cameras'].items()])
|
|
||||||
prepped_frame_queue = queue.Queue()
|
prepped_frame_queue = queue.Queue()
|
||||||
|
|
||||||
cameras = {}
|
cameras = {}
|
||||||
@ -81,17 +80,14 @@ def main():
|
|||||||
prepped_frame_queue, client, MQTT_TOPIC_PREFIX)
|
prepped_frame_queue, client, MQTT_TOPIC_PREFIX)
|
||||||
|
|
||||||
fps_tracker = EventsPerSecond()
|
fps_tracker = EventsPerSecond()
|
||||||
queue_full_tracker = EventsPerSecond()
|
|
||||||
|
|
||||||
prepped_queue_processor = PreppedQueueProcessor(
|
prepped_queue_processor = PreppedQueueProcessor(
|
||||||
cameras,
|
cameras,
|
||||||
prepped_frame_queue,
|
prepped_frame_queue,
|
||||||
fps_tracker,
|
fps_tracker
|
||||||
queue_full_tracker
|
|
||||||
)
|
)
|
||||||
prepped_queue_processor.start()
|
prepped_queue_processor.start()
|
||||||
fps_tracker.start()
|
fps_tracker.start()
|
||||||
queue_full_tracker.start()
|
|
||||||
|
|
||||||
for name, camera in cameras.items():
|
for name, camera in cameras.items():
|
||||||
camera.start()
|
camera.start()
|
||||||
@ -111,8 +107,7 @@ def main():
|
|||||||
'coral': {
|
'coral': {
|
||||||
'fps': fps_tracker.eps(),
|
'fps': fps_tracker.eps(),
|
||||||
'inference_speed': prepped_queue_processor.avg_inference_speed,
|
'inference_speed': prepped_queue_processor.avg_inference_speed,
|
||||||
'queue_length': prepped_frame_queue.qsize(),
|
'queue_length': prepped_frame_queue.qsize()
|
||||||
'queue_full_events_per_min': queue_full_tracker.eps(60)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ import datetime
|
|||||||
import time
|
import time
|
||||||
import cv2
|
import cv2
|
||||||
import threading
|
import threading
|
||||||
|
import copy
|
||||||
import prctl
|
import prctl
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from edgetpu.detection.engine import DetectionEngine
|
from edgetpu.detection.engine import DetectionEngine
|
||||||
@ -9,7 +10,7 @@ from edgetpu.detection.engine import DetectionEngine
|
|||||||
from frigate.util import tonumpyarray, LABELS, PATH_TO_CKPT, calculate_region
|
from frigate.util import tonumpyarray, LABELS, PATH_TO_CKPT, calculate_region
|
||||||
|
|
||||||
class PreppedQueueProcessor(threading.Thread):
|
class PreppedQueueProcessor(threading.Thread):
|
||||||
def __init__(self, cameras, prepped_frame_queue, fps, queue_full):
|
def __init__(self, cameras, prepped_frame_queue, fps):
|
||||||
|
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.cameras = cameras
|
self.cameras = cameras
|
||||||
@ -19,16 +20,12 @@ class PreppedQueueProcessor(threading.Thread):
|
|||||||
self.engine = DetectionEngine(PATH_TO_CKPT)
|
self.engine = DetectionEngine(PATH_TO_CKPT)
|
||||||
self.labels = LABELS
|
self.labels = LABELS
|
||||||
self.fps = fps
|
self.fps = fps
|
||||||
self.queue_full = queue_full
|
|
||||||
self.avg_inference_speed = 10
|
self.avg_inference_speed = 10
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
prctl.set_name(self.__class__.__name__)
|
prctl.set_name(self.__class__.__name__)
|
||||||
# process queue...
|
# process queue...
|
||||||
while True:
|
while True:
|
||||||
if self.prepped_frame_queue.full():
|
|
||||||
self.queue_full.update()
|
|
||||||
|
|
||||||
frame = self.prepped_frame_queue.get()
|
frame = self.prepped_frame_queue.get()
|
||||||
|
|
||||||
# Actual detection.
|
# Actual detection.
|
||||||
@ -58,7 +55,8 @@ class RegionRequester(threading.Thread):
|
|||||||
frame_time = self.camera.frame_time.value
|
frame_time = self.camera.frame_time.value
|
||||||
|
|
||||||
# grab the current tracked objects
|
# grab the current tracked objects
|
||||||
tracked_objects = list(self.camera.object_tracker.tracked_objects.values()).copy()
|
with self.camera.object_tracker.tracked_objects_lock:
|
||||||
|
tracked_objects = copy.deepcopy(self.camera.object_tracker.tracked_objects).values()
|
||||||
|
|
||||||
with self.camera.regions_in_process_lock:
|
with self.camera.regions_in_process_lock:
|
||||||
self.camera.regions_in_process[frame_time] = len(self.camera.config['regions'])
|
self.camera.regions_in_process[frame_time] = len(self.camera.config['regions'])
|
||||||
@ -93,8 +91,9 @@ class RegionRequester(threading.Thread):
|
|||||||
|
|
||||||
|
|
||||||
class RegionPrepper(threading.Thread):
|
class RegionPrepper(threading.Thread):
|
||||||
def __init__(self, frame_cache, resize_request_queue, prepped_frame_queue):
|
def __init__(self, camera, frame_cache, resize_request_queue, prepped_frame_queue):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
|
self.camera = camera
|
||||||
self.frame_cache = frame_cache
|
self.frame_cache = frame_cache
|
||||||
self.resize_request_queue = resize_request_queue
|
self.resize_request_queue = resize_request_queue
|
||||||
self.prepped_frame_queue = prepped_frame_queue
|
self.prepped_frame_queue = prepped_frame_queue
|
||||||
@ -105,6 +104,15 @@ class RegionPrepper(threading.Thread):
|
|||||||
|
|
||||||
resize_request = self.resize_request_queue.get()
|
resize_request = self.resize_request_queue.get()
|
||||||
|
|
||||||
|
# if the queue is over 100 items long, only prep dynamic regions
|
||||||
|
if resize_request['region_id'] != -1 and self.prepped_frame_queue.qsize() > 100:
|
||||||
|
with self.camera.regions_in_process_lock:
|
||||||
|
self.camera.regions_in_process[resize_request['frame_time']] -= 1
|
||||||
|
if self.camera.regions_in_process[resize_request['frame_time']] == 0:
|
||||||
|
del self.camera.regions_in_process[resize_request['frame_time']]
|
||||||
|
self.camera.skipped_region_tracker.update()
|
||||||
|
continue
|
||||||
|
|
||||||
frame = self.frame_cache.get(resize_request['frame_time'], None)
|
frame = self.frame_cache.get(resize_request['frame_time'], None)
|
||||||
|
|
||||||
if frame is None:
|
if frame is None:
|
||||||
|
@ -167,7 +167,6 @@ class Camera:
|
|||||||
self.objects_tracked = mp.Condition()
|
self.objects_tracked = mp.Condition()
|
||||||
|
|
||||||
# Queue for prepped frames, max size set to (number of regions * 5)
|
# Queue for prepped frames, max size set to (number of regions * 5)
|
||||||
max_queue_size = len(self.config['regions'])*5
|
|
||||||
self.resize_queue = queue.Queue()
|
self.resize_queue = queue.Queue()
|
||||||
|
|
||||||
# Queue for raw detected objects
|
# Queue for raw detected objects
|
||||||
@ -184,6 +183,7 @@ class Camera:
|
|||||||
self.ffmpeg_process = None
|
self.ffmpeg_process = None
|
||||||
self.capture_thread = None
|
self.capture_thread = None
|
||||||
self.fps = EventsPerSecond()
|
self.fps = EventsPerSecond()
|
||||||
|
self.skipped_region_tracker = EventsPerSecond()
|
||||||
|
|
||||||
# combine tracked objects lists
|
# combine tracked objects lists
|
||||||
self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', []))
|
self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', []))
|
||||||
@ -214,7 +214,7 @@ class Camera:
|
|||||||
self.frame_tracker.start()
|
self.frame_tracker.start()
|
||||||
|
|
||||||
# start a thread to resize regions
|
# start a thread to resize regions
|
||||||
self.region_prepper = RegionPrepper(self.frame_cache, self.resize_queue, prepped_frame_queue)
|
self.region_prepper = RegionPrepper(self, self.frame_cache, self.resize_queue, prepped_frame_queue)
|
||||||
self.region_prepper.start()
|
self.region_prepper.start()
|
||||||
|
|
||||||
# start a thread to store the highest scoring recent frames for monitored object types
|
# start a thread to store the highest scoring recent frames for monitored object types
|
||||||
@ -275,6 +275,7 @@ class Camera:
|
|||||||
print("Starting a new capture thread...")
|
print("Starting a new capture thread...")
|
||||||
self.capture_thread.start()
|
self.capture_thread.start()
|
||||||
self.fps.start()
|
self.fps.start()
|
||||||
|
self.skipped_region_tracker.start()
|
||||||
|
|
||||||
def start_ffmpeg(self):
|
def start_ffmpeg(self):
|
||||||
ffmpeg_cmd = (['ffmpeg'] +
|
ffmpeg_cmd = (['ffmpeg'] +
|
||||||
@ -310,11 +311,16 @@ class Camera:
|
|||||||
'finished_frame_queue': self.finished_frame_queue.qsize(),
|
'finished_frame_queue': self.finished_frame_queue.qsize(),
|
||||||
'refined_frame_queue': self.refined_frame_queue.qsize(),
|
'refined_frame_queue': self.refined_frame_queue.qsize(),
|
||||||
'regions_in_process': self.regions_in_process,
|
'regions_in_process': self.regions_in_process,
|
||||||
'dynamic_regions_per_sec': self.dynamic_region_fps.eps()
|
'dynamic_regions_per_sec': self.dynamic_region_fps.eps(),
|
||||||
|
'skipped_regions_per_sec': self.skipped_region_tracker.eps(60)
|
||||||
}
|
}
|
||||||
|
|
||||||
def frame_with_objects(self, frame_time, tracked_objects=None):
|
def frame_with_objects(self, frame_time, tracked_objects=None):
|
||||||
|
if not frame_time in self.frame_cache:
|
||||||
|
frame = np.zeros(self.frame_shape, np.uint8)
|
||||||
|
else:
|
||||||
frame = self.frame_cache[frame_time].copy()
|
frame = self.frame_cache[frame_time].copy()
|
||||||
|
|
||||||
detected_objects = self.detected_objects[frame_time].copy()
|
detected_objects = self.detected_objects[frame_time].copy()
|
||||||
|
|
||||||
for region in self.regions:
|
for region in self.regions:
|
||||||
@ -326,6 +332,7 @@ class Camera:
|
|||||||
# draw the bounding boxes on the screen
|
# draw the bounding boxes on the screen
|
||||||
|
|
||||||
if tracked_objects is None:
|
if tracked_objects is None:
|
||||||
|
with self.object_tracker.tracked_objects_lock:
|
||||||
tracked_objects = copy.deepcopy(self.object_tracker.tracked_objects)
|
tracked_objects = copy.deepcopy(self.object_tracker.tracked_objects)
|
||||||
|
|
||||||
for obj in detected_objects:
|
for obj in detected_objects:
|
||||||
|
Loading…
Reference in New Issue
Block a user