adding output process to handle downstream processing of frames

This commit is contained in:
Blake Blackshear 2021-05-29 13:27:00 -05:00
parent fd51c7a955
commit 7a6ffb1032
3 changed files with 58 additions and 9 deletions

View File

@ -24,6 +24,7 @@ from frigate.log import log_process, root_configurer
from frigate.models import Event, Recordings from frigate.models import Event, Recordings
from frigate.mqtt import create_mqtt_client from frigate.mqtt import create_mqtt_client
from frigate.object_processing import TrackedObjectProcessor from frigate.object_processing import TrackedObjectProcessor
from frigate.output import output_frames
from frigate.record import RecordingMaintainer from frigate.record import RecordingMaintainer
from frigate.stats import StatsEmitter, stats_init from frigate.stats import StatsEmitter, stats_init
from frigate.video import capture_camera, track_camera from frigate.video import capture_camera, track_camera
@ -128,6 +129,7 @@ class FrigateApp:
# Queues for clip processing # Queues for clip processing
self.event_queue = mp.Queue() self.event_queue = mp.Queue()
self.event_processed_queue = mp.Queue() self.event_processed_queue = mp.Queue()
self.video_output_queue = mp.Queue()
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
self.detected_frames_queue = mp.Queue( self.detected_frames_queue = mp.Queue(
@ -214,10 +216,25 @@ class FrigateApp:
self.detected_frames_queue, self.detected_frames_queue,
self.event_queue, self.event_queue,
self.event_processed_queue, self.event_processed_queue,
self.video_output_queue,
self.stop_event, self.stop_event,
) )
self.detected_frames_processor.start() self.detected_frames_processor.start()
def start_video_output_processor(self):
output_processor = mp.Process(
target=output_frames,
name=f"output_processor",
args=(
self.config,
self.video_output_queue,
self.stop_event,
),
)
output_processor.daemon = True
self.output_processor = output_processor
output_processor.start()
def start_camera_processors(self): def start_camera_processors(self):
model_shape = (self.config.model.height, self.config.model.width) model_shape = (self.config.model.height, self.config.model.width)
for name, config in self.config.cameras.items(): for name, config in self.config.cameras.items():
@ -308,10 +325,10 @@ class FrigateApp:
self.log_process.terminate() self.log_process.terminate()
sys.exit(1) sys.exit(1)
self.start_detectors() self.start_detectors()
self.start_video_output_processor()
self.start_detected_frames_processor() self.start_detected_frames_processor()
self.start_camera_processors() self.start_camera_processors()
self.start_camera_capture_processes() self.start_camera_capture_processes()
self.start_birdseye_outputter()
self.init_stats() self.init_stats()
self.init_web_server() self.init_web_server()
self.start_event_processor() self.start_event_processor()

View File

@ -538,7 +538,7 @@ class CameraState:
self.regions = regions self.regions = regions
self._current_frame = current_frame self._current_frame = current_frame
if self.previous_frame_id is not None: if self.previous_frame_id is not None:
self.frame_manager.delete(self.previous_frame_id) self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_id self.previous_frame_id = frame_id
@ -551,6 +551,7 @@ class TrackedObjectProcessor(threading.Thread):
tracked_objects_queue, tracked_objects_queue,
event_queue, event_queue,
event_processed_queue, event_processed_queue,
video_output_queue,
stop_event, stop_event,
): ):
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -561,10 +562,10 @@ class TrackedObjectProcessor(threading.Thread):
self.tracked_objects_queue = tracked_objects_queue self.tracked_objects_queue = tracked_objects_queue
self.event_queue = event_queue self.event_queue = event_queue
self.event_processed_queue = event_processed_queue self.event_processed_queue = event_processed_queue
self.video_output_queue = video_output_queue
self.stop_event = stop_event self.stop_event = stop_event
self.camera_states: Dict[str, CameraState] = {} self.camera_states: Dict[str, CameraState] = {}
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
self.birdseye_frame_manager = BirdsEyeFrameManager()
def start(camera, obj: TrackedObject, current_frame_time): def start(camera, obj: TrackedObject, current_frame_time):
self.event_queue.put(("start", camera, obj.to_dict())) self.event_queue.put(("start", camera, obj.to_dict()))
@ -719,12 +720,14 @@ class TrackedObjectProcessor(threading.Thread):
frame_time, current_tracked_objects, motion_boxes, regions frame_time, current_tracked_objects, motion_boxes, regions
) )
self.birdseye_frame_manager.update_frame( self.video_output_queue.put(
(
camera, camera,
len(current_tracked_objects), frame_time,
len(motion_boxes), current_tracked_objects,
camera_state.current_frame_time, motion_boxes,
camera_state._current_frame, regions,
)
) )
# update zone counts for each label # update zone counts for each label

29
frigate/output.py Normal file
View File

@ -0,0 +1,29 @@
import queue
from multiprocessing import shared_memory
from frigate.util import SharedMemoryFrameManager
def output_frames(config, video_output_queue, stop_event):
frame_manager = SharedMemoryFrameManager()
previous_frames = {}
while True:
try:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = video_output_queue.get(True, 10)
except queue.Empty:
continue
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
if camera in previous_frames:
frame_manager.delete(previous_frames[camera])
previous_frames[camera] = frame_id