From 76a114a3cd9725942afe7e3aec9d62daaea08cfa Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Sat, 23 Mar 2024 10:11:32 -0600 Subject: [PATCH] Rewrite events communication to use zmq instead of mp.Queue (#10627) * Move to using zmq for events updating * Use event updater in manual events handler * Formatting --- frigate/app.py | 10 ++----- frigate/comms/events_updater.py | 53 +++++++++++++++++++++++++++++++++ frigate/events/external.py | 24 ++++++++++----- frigate/events/maintainer.py | 39 +++++++++++------------- frigate/events/types.py | 14 +++++++++ frigate/object_processing.py | 24 +++++++++------ 6 files changed, 117 insertions(+), 47 deletions(-) create mode 100644 frigate/comms/events_updater.py create mode 100644 frigate/events/types.py diff --git a/frigate/app.py b/frigate/app.py index 9d02ddeeb..2b1198278 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -195,7 +195,6 @@ class FrigateApp: def init_queues(self) -> None: # Queues for clip processing - self.event_queue: Queue = mp.Queue() self.event_processed_queue: Queue = mp.Queue() # Queue for cameras to push tracked objects to @@ -324,9 +323,7 @@ class FrigateApp: self.db.bind(models) def init_external_event_processor(self) -> None: - self.external_event_processor = ExternalEventProcessor( - self.config, self.event_queue - ) + self.external_event_processor = ExternalEventProcessor(self.config) def init_inter_process_communicator(self) -> None: self.inter_process_communicator = InterProcessCommunicator() @@ -417,7 +414,6 @@ class FrigateApp: self.config, self.dispatcher, self.detected_frames_queue, - self.event_queue, self.event_processed_queue, self.ptz_autotracker_thread, self.stop_event, @@ -515,8 +511,6 @@ class FrigateApp: def start_event_processor(self) -> None: self.event_processor = EventProcessor( self.config, - self.camera_metrics, - self.event_queue, self.event_processed_queue, self.timeline_queue, self.stop_event, @@ -682,6 +676,7 @@ class FrigateApp: self.detection_queue.close() self.detection_queue.join_thread() + self.external_event_processor.stop() self.dispatcher.stop() self.detected_frames_processor.join() self.ptz_autotracker_thread.join() @@ -698,7 +693,6 @@ class FrigateApp: shm.unlink() for queue in [ - self.event_queue, self.event_processed_queue, self.detected_frames_queue, self.log_queue, diff --git a/frigate/comms/events_updater.py b/frigate/comms/events_updater.py new file mode 100644 index 000000000..cb18667a1 --- /dev/null +++ b/frigate/comms/events_updater.py @@ -0,0 +1,53 @@ +"""Facilitates communication between processes.""" + +import zmq + +from frigate.events.types import EventStateEnum, EventTypeEnum + +SOCKET_PUSH_PULL = "ipc:///tmp/cache/events" + + +class EventUpdatePublisher: + """Publishes events (objects, audio, manual).""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUSH) + self.socket.connect(SOCKET_PUSH_PULL) + + def publish( + self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]] + ) -> None: + """There is no communication back to the processes.""" + self.socket.send_pyobj(payload) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + +class EventUpdateSubscriber: + """Receives event updates.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PULL) + self.socket.bind(SOCKET_PUSH_PULL) + + def check_for_update( + self, timeout=1 + ) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]: + """Returns updated config or None if no update.""" + try: + has_update, _, _ = zmq.select([self.socket], [], [], timeout) + + if has_update: + return self.socket.recv_pyobj() + except zmq.ZMQError: + pass + + return None + + def stop(self) -> None: + self.socket.close() + self.context.destroy() diff --git a/frigate/events/external.py b/frigate/events/external.py index 9c99ef50c..7bae21071 100644 --- a/frigate/events/external.py +++ b/frigate/events/external.py @@ -6,24 +6,24 @@ import logging import os import random import string -from multiprocessing import Queue from typing import Optional import cv2 +from frigate.comms.events_updater import EventUpdatePublisher from frigate.config import CameraConfig, FrigateConfig from frigate.const import CLIPS_DIR -from frigate.events.maintainer import EventTypeEnum +from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.util.image import draw_box_with_label logger = logging.getLogger(__name__) class ExternalEventProcessor: - def __init__(self, config: FrigateConfig, queue: Queue) -> None: + def __init__(self, config: FrigateConfig) -> None: self.config = config - self.queue = queue self.default_thumbnail = None + self.event_sender = EventUpdatePublisher() def create_manual_event( self, @@ -48,10 +48,10 @@ class ExternalEventProcessor: camera_config, label, event_id, draw, snapshot_frame ) - self.queue.put( + self.event_sender.publish( ( EventTypeEnum.api, - "new", + EventStateEnum.start, camera, { "id": event_id, @@ -77,8 +77,13 @@ class ExternalEventProcessor: def finish_manual_event(self, event_id: str, end_time: float) -> None: """Finish external event with indeterminate duration.""" - self.queue.put( - (EventTypeEnum.api, "end", None, {"id": event_id, "end_time": end_time}) + self.event_sender.publish( + ( + EventTypeEnum.api, + EventStateEnum.end, + None, + {"id": event_id, "end_time": end_time}, + ) ) def _write_images( @@ -135,3 +140,6 @@ class ExternalEventProcessor: thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA) ret, jpg = cv2.imencode(".jpg", thumb) return base64.b64encode(jpg.tobytes()).decode("utf-8") + + def stop(self): + self.event_sender.stop() diff --git a/frigate/events/maintainer.py b/frigate/events/maintainer.py index eadf888c9..720022e05 100644 --- a/frigate/events/maintainer.py +++ b/frigate/events/maintainer.py @@ -1,25 +1,19 @@ import datetime import logging -import queue import threading -from enum import Enum from multiprocessing import Queue from multiprocessing.synchronize import Event as MpEvent from typing import Dict +from frigate.comms.events_updater import EventUpdateSubscriber from frigate.config import EventsConfig, FrigateConfig +from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.models import Event -from frigate.types import CameraMetricsTypes from frigate.util.builtin import to_relative_box logger = logging.getLogger(__name__) -class EventTypeEnum(str, Enum): - api = "api" - tracked_object = "tracked_object" - - def should_update_db(prev_event: Event, current_event: Event) -> bool: """If current_event has updated fields and (clip or snapshot).""" if current_event["has_clip"] or current_event["has_snapshot"]: @@ -58,8 +52,6 @@ class EventProcessor(threading.Thread): def __init__( self, config: FrigateConfig, - camera_processes: dict[str, CameraMetricsTypes], - event_queue: Queue, event_processed_queue: Queue, timeline_queue: Queue, stop_event: MpEvent, @@ -67,13 +59,13 @@ class EventProcessor(threading.Thread): threading.Thread.__init__(self) self.name = "event_processor" self.config = config - self.camera_processes = camera_processes - self.event_queue = event_queue self.event_processed_queue = event_processed_queue self.timeline_queue = timeline_queue self.events_in_process: Dict[str, Event] = {} self.stop_event = stop_event + self.event_receiver = EventUpdateSubscriber() + def run(self) -> None: # set an end_time on events without an end_time on startup Event.update(end_time=Event.start_time + 30).where( @@ -81,13 +73,13 @@ class EventProcessor(threading.Thread): ).execute() while not self.stop_event.is_set(): - try: - source_type, event_type, camera, event_data = self.event_queue.get( - timeout=1 - ) - except queue.Empty: + update = self.event_receiver.check_for_update() + + if update == None: continue + source_type, event_type, camera, event_data = update + logger.debug( f"Event received: {source_type} {event_type} {camera} {event_data['id']}" ) @@ -103,7 +95,7 @@ class EventProcessor(threading.Thread): ) ) - if event_type == "start": + if event_type == EventStateEnum.start: self.events_in_process[event_data["id"]] = event_data continue @@ -125,6 +117,7 @@ class EventProcessor(threading.Thread): Event.update(end_time=datetime.datetime.now().timestamp()).where( Event.end_time == None ).execute() + self.event_receiver.stop() logger.info("Exiting event processor...") def handle_object_detection( @@ -247,12 +240,14 @@ class EventProcessor(threading.Thread): # update the stored copy for comparison on future update messages self.events_in_process[event_data["id"]] = event_data - if event_type == "end": + if event_type == EventStateEnum.end: del self.events_in_process[event_data["id"]] self.event_processed_queue.put((event_data["id"], camera)) - def handle_external_detection(self, event_type: str, event_data: Event) -> None: - if event_type == "new": + def handle_external_detection( + self, event_type: EventStateEnum, event_data: Event + ) -> None: + if event_type == EventStateEnum.start: event = { Event.id: event_data["id"], Event.label: event_data["label"], @@ -271,7 +266,7 @@ class EventProcessor(threading.Thread): }, } Event.insert(event).execute() - elif event_type == "end": + elif event_type == EventStateEnum.end: event = { Event.id: event_data["id"], Event.end_time: event_data["end_time"], diff --git a/frigate/events/types.py b/frigate/events/types.py new file mode 100644 index 000000000..1750b3e7b --- /dev/null +++ b/frigate/events/types.py @@ -0,0 +1,14 @@ +"""Types for event management.""" + +from enum import Enum + + +class EventTypeEnum(str, Enum): + api = "api" + tracked_object = "tracked_object" + + +class EventStateEnum(str, Enum): + start = "start" + update = "update" + end = "end" diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 4cbaa11d6..c5e8101dc 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -14,6 +14,7 @@ import numpy as np from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.dispatcher import Dispatcher +from frigate.comms.events_updater import EventUpdatePublisher from frigate.config import ( CameraConfig, FrigateConfig, @@ -23,7 +24,7 @@ from frigate.config import ( ZoomingModeEnum, ) from frigate.const import CLIPS_DIR -from frigate.events.maintainer import EventTypeEnum +from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.util.image import ( SharedMemoryFrameManager, @@ -826,7 +827,6 @@ class TrackedObjectProcessor(threading.Thread): config: FrigateConfig, dispatcher: Dispatcher, tracked_objects_queue, - event_queue, event_processed_queue, ptz_autotracker_thread, stop_event, @@ -836,7 +836,6 @@ class TrackedObjectProcessor(threading.Thread): self.config = config self.dispatcher = dispatcher self.tracked_objects_queue = tracked_objects_queue - self.event_queue = event_queue self.event_processed_queue = event_processed_queue self.stop_event = stop_event self.camera_states: dict[str, CameraState] = {} @@ -844,10 +843,16 @@ class TrackedObjectProcessor(threading.Thread): self.last_motion_detected: dict[str, float] = {} self.ptz_autotracker_thread = ptz_autotracker_thread self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video) + self.event_sender = EventUpdatePublisher() def start(camera, obj: TrackedObject, current_frame_time): - self.event_queue.put( - (EventTypeEnum.tracked_object, "start", camera, obj.to_dict()) + self.event_sender.publish( + ( + EventTypeEnum.tracked_object, + EventStateEnum.start, + camera, + obj.to_dict(), + ) ) def update(camera, obj: TrackedObject, current_frame_time): @@ -861,10 +866,10 @@ class TrackedObjectProcessor(threading.Thread): } self.dispatcher.publish("events", json.dumps(message), retain=False) obj.previous = after - self.event_queue.put( + self.event_sender.publish( ( EventTypeEnum.tracked_object, - "update", + EventStateEnum.update, camera, obj.to_dict(include_thumbnail=True), ) @@ -923,10 +928,10 @@ class TrackedObjectProcessor(threading.Thread): self.dispatcher.publish("events", json.dumps(message), retain=False) self.ptz_autotracker_thread.ptz_autotracker.end_object(camera, obj) - self.event_queue.put( + self.event_sender.publish( ( EventTypeEnum.tracked_object, - "end", + EventStateEnum.end, camera, obj.to_dict(include_thumbnail=True), ) @@ -1215,4 +1220,5 @@ class TrackedObjectProcessor(threading.Thread): self.camera_states[camera].finished(event_id) self.detection_publisher.stop() + self.event_sender.stop() logger.info("Exiting object processor...")