From 7a6ffb10321f642d68576ee4698ce4f358c425b9 Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Sat, 29 May 2021 13:27:00 -0500 Subject: [PATCH] adding output process to handle downstream processing of frames --- frigate/app.py | 19 ++++++++++++++++++- frigate/object_processing.py | 19 +++++++++++-------- frigate/output.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 9 deletions(-) create mode 100644 frigate/output.py diff --git a/frigate/app.py b/frigate/app.py index f838c8d65..a58931710 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -24,6 +24,7 @@ from frigate.log import log_process, root_configurer from frigate.models import Event, Recordings from frigate.mqtt import create_mqtt_client from frigate.object_processing import TrackedObjectProcessor +from frigate.output import output_frames from frigate.record import RecordingMaintainer from frigate.stats import StatsEmitter, stats_init from frigate.video import capture_camera, track_camera @@ -128,6 +129,7 @@ class FrigateApp: # Queues for clip processing self.event_queue = mp.Queue() self.event_processed_queue = mp.Queue() + self.video_output_queue = mp.Queue() # Queue for cameras to push tracked objects to self.detected_frames_queue = mp.Queue( @@ -214,10 +216,25 @@ class FrigateApp: self.detected_frames_queue, self.event_queue, self.event_processed_queue, + self.video_output_queue, self.stop_event, ) self.detected_frames_processor.start() + def start_video_output_processor(self): + output_processor = mp.Process( + target=output_frames, + name=f"output_processor", + args=( + self.config, + self.video_output_queue, + self.stop_event, + ), + ) + output_processor.daemon = True + self.output_processor = output_processor + output_processor.start() + def start_camera_processors(self): model_shape = (self.config.model.height, self.config.model.width) for name, config in self.config.cameras.items(): @@ -308,10 +325,10 @@ class FrigateApp: self.log_process.terminate() sys.exit(1) self.start_detectors() + self.start_video_output_processor() self.start_detected_frames_processor() self.start_camera_processors() self.start_camera_capture_processes() - self.start_birdseye_outputter() self.init_stats() self.init_web_server() self.start_event_processor() diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 835b6d9c8..6359cfd85 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -538,7 +538,7 @@ class CameraState: self.regions = regions self._current_frame = current_frame if self.previous_frame_id is not None: - self.frame_manager.delete(self.previous_frame_id) + self.frame_manager.close(self.previous_frame_id) self.previous_frame_id = frame_id @@ -551,6 +551,7 @@ class TrackedObjectProcessor(threading.Thread): tracked_objects_queue, event_queue, event_processed_queue, + video_output_queue, stop_event, ): threading.Thread.__init__(self) @@ -561,10 +562,10 @@ class TrackedObjectProcessor(threading.Thread): self.tracked_objects_queue = tracked_objects_queue self.event_queue = event_queue self.event_processed_queue = event_processed_queue + self.video_output_queue = video_output_queue self.stop_event = stop_event self.camera_states: Dict[str, CameraState] = {} self.frame_manager = SharedMemoryFrameManager() - self.birdseye_frame_manager = BirdsEyeFrameManager() def start(camera, obj: TrackedObject, current_frame_time): self.event_queue.put(("start", camera, obj.to_dict())) @@ -719,12 +720,14 @@ class TrackedObjectProcessor(threading.Thread): frame_time, current_tracked_objects, motion_boxes, regions ) - self.birdseye_frame_manager.update_frame( - camera, - len(current_tracked_objects), - len(motion_boxes), - camera_state.current_frame_time, - camera_state._current_frame, + self.video_output_queue.put( + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) ) # update zone counts for each label diff --git a/frigate/output.py b/frigate/output.py new file mode 100644 index 000000000..d6809b1e1 --- /dev/null +++ b/frigate/output.py @@ -0,0 +1,29 @@ +import queue +from multiprocessing import shared_memory +from frigate.util import SharedMemoryFrameManager + + +def output_frames(config, video_output_queue, stop_event): + frame_manager = SharedMemoryFrameManager() + previous_frames = {} + + while True: + try: + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = video_output_queue.get(True, 10) + except queue.Empty: + continue + + frame_id = f"{camera}{frame_time}" + + frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + + if camera in previous_frames: + frame_manager.delete(previous_frames[camera]) + + previous_frames[camera] = frame_id