Rewrite events communication to use zmq instead of mp.Queue (#10627)

* Move to using zmq for events updating

* Use event updater in manual events handler

* Formatting
This commit is contained in:
Nicolas Mowen 2024-03-23 10:11:32 -06:00 committed by GitHub
parent 4159334520
commit 76a114a3cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 117 additions and 47 deletions

View File

@ -195,7 +195,6 @@ class FrigateApp:
def init_queues(self) -> None: def init_queues(self) -> None:
# Queues for clip processing # Queues for clip processing
self.event_queue: Queue = mp.Queue()
self.event_processed_queue: Queue = mp.Queue() self.event_processed_queue: Queue = mp.Queue()
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
@ -324,9 +323,7 @@ class FrigateApp:
self.db.bind(models) self.db.bind(models)
def init_external_event_processor(self) -> None: def init_external_event_processor(self) -> None:
self.external_event_processor = ExternalEventProcessor( self.external_event_processor = ExternalEventProcessor(self.config)
self.config, self.event_queue
)
def init_inter_process_communicator(self) -> None: def init_inter_process_communicator(self) -> None:
self.inter_process_communicator = InterProcessCommunicator() self.inter_process_communicator = InterProcessCommunicator()
@ -417,7 +414,6 @@ class FrigateApp:
self.config, self.config,
self.dispatcher, self.dispatcher,
self.detected_frames_queue, self.detected_frames_queue,
self.event_queue,
self.event_processed_queue, self.event_processed_queue,
self.ptz_autotracker_thread, self.ptz_autotracker_thread,
self.stop_event, self.stop_event,
@ -515,8 +511,6 @@ class FrigateApp:
def start_event_processor(self) -> None: def start_event_processor(self) -> None:
self.event_processor = EventProcessor( self.event_processor = EventProcessor(
self.config, self.config,
self.camera_metrics,
self.event_queue,
self.event_processed_queue, self.event_processed_queue,
self.timeline_queue, self.timeline_queue,
self.stop_event, self.stop_event,
@ -682,6 +676,7 @@ class FrigateApp:
self.detection_queue.close() self.detection_queue.close()
self.detection_queue.join_thread() self.detection_queue.join_thread()
self.external_event_processor.stop()
self.dispatcher.stop() self.dispatcher.stop()
self.detected_frames_processor.join() self.detected_frames_processor.join()
self.ptz_autotracker_thread.join() self.ptz_autotracker_thread.join()
@ -698,7 +693,6 @@ class FrigateApp:
shm.unlink() shm.unlink()
for queue in [ for queue in [
self.event_queue,
self.event_processed_queue, self.event_processed_queue,
self.detected_frames_queue, self.detected_frames_queue,
self.log_queue, self.log_queue,

View File

@ -0,0 +1,53 @@
"""Facilitates communication between processes."""
import zmq
from frigate.events.types import EventStateEnum, EventTypeEnum
SOCKET_PUSH_PULL = "ipc:///tmp/cache/events"
class EventUpdatePublisher:
"""Publishes events (objects, audio, manual)."""
def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUSH)
self.socket.connect(SOCKET_PUSH_PULL)
def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]
) -> None:
"""There is no communication back to the processes."""
self.socket.send_pyobj(payload)
def stop(self) -> None:
self.socket.close()
self.context.destroy()
class EventUpdateSubscriber:
"""Receives event updates."""
def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
self.socket.bind(SOCKET_PUSH_PULL)
def check_for_update(
self, timeout=1
) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]:
"""Returns updated config or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
if has_update:
return self.socket.recv_pyobj()
except zmq.ZMQError:
pass
return None
def stop(self) -> None:
self.socket.close()
self.context.destroy()

View File

@ -6,24 +6,24 @@ import logging
import os import os
import random import random
import string import string
from multiprocessing import Queue
from typing import Optional from typing import Optional
import cv2 import cv2
from frigate.comms.events_updater import EventUpdatePublisher
from frigate.config import CameraConfig, FrigateConfig from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.events.maintainer import EventTypeEnum from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.util.image import draw_box_with_label from frigate.util.image import draw_box_with_label
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ExternalEventProcessor: class ExternalEventProcessor:
def __init__(self, config: FrigateConfig, queue: Queue) -> None: def __init__(self, config: FrigateConfig) -> None:
self.config = config self.config = config
self.queue = queue
self.default_thumbnail = None self.default_thumbnail = None
self.event_sender = EventUpdatePublisher()
def create_manual_event( def create_manual_event(
self, self,
@ -48,10 +48,10 @@ class ExternalEventProcessor:
camera_config, label, event_id, draw, snapshot_frame camera_config, label, event_id, draw, snapshot_frame
) )
self.queue.put( self.event_sender.publish(
( (
EventTypeEnum.api, EventTypeEnum.api,
"new", EventStateEnum.start,
camera, camera,
{ {
"id": event_id, "id": event_id,
@ -77,8 +77,13 @@ class ExternalEventProcessor:
def finish_manual_event(self, event_id: str, end_time: float) -> None: def finish_manual_event(self, event_id: str, end_time: float) -> None:
"""Finish external event with indeterminate duration.""" """Finish external event with indeterminate duration."""
self.queue.put( self.event_sender.publish(
(EventTypeEnum.api, "end", None, {"id": event_id, "end_time": end_time}) (
EventTypeEnum.api,
EventStateEnum.end,
None,
{"id": event_id, "end_time": end_time},
)
) )
def _write_images( def _write_images(
@ -135,3 +140,6 @@ class ExternalEventProcessor:
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA) thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
ret, jpg = cv2.imencode(".jpg", thumb) ret, jpg = cv2.imencode(".jpg", thumb)
return base64.b64encode(jpg.tobytes()).decode("utf-8") return base64.b64encode(jpg.tobytes()).decode("utf-8")
def stop(self):
self.event_sender.stop()

View File

@ -1,25 +1,19 @@
import datetime import datetime
import logging import logging
import queue
import threading import threading
from enum import Enum
from multiprocessing import Queue from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Dict from typing import Dict
from frigate.comms.events_updater import EventUpdateSubscriber
from frigate.config import EventsConfig, FrigateConfig from frigate.config import EventsConfig, FrigateConfig
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event from frigate.models import Event
from frigate.types import CameraMetricsTypes
from frigate.util.builtin import to_relative_box from frigate.util.builtin import to_relative_box
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class EventTypeEnum(str, Enum):
api = "api"
tracked_object = "tracked_object"
def should_update_db(prev_event: Event, current_event: Event) -> bool: def should_update_db(prev_event: Event, current_event: Event) -> bool:
"""If current_event has updated fields and (clip or snapshot).""" """If current_event has updated fields and (clip or snapshot)."""
if current_event["has_clip"] or current_event["has_snapshot"]: if current_event["has_clip"] or current_event["has_snapshot"]:
@ -58,8 +52,6 @@ class EventProcessor(threading.Thread):
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
camera_processes: dict[str, CameraMetricsTypes],
event_queue: Queue,
event_processed_queue: Queue, event_processed_queue: Queue,
timeline_queue: Queue, timeline_queue: Queue,
stop_event: MpEvent, stop_event: MpEvent,
@ -67,13 +59,13 @@ class EventProcessor(threading.Thread):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "event_processor" self.name = "event_processor"
self.config = config self.config = config
self.camera_processes = camera_processes
self.event_queue = event_queue
self.event_processed_queue = event_processed_queue self.event_processed_queue = event_processed_queue
self.timeline_queue = timeline_queue self.timeline_queue = timeline_queue
self.events_in_process: Dict[str, Event] = {} self.events_in_process: Dict[str, Event] = {}
self.stop_event = stop_event self.stop_event = stop_event
self.event_receiver = EventUpdateSubscriber()
def run(self) -> None: def run(self) -> None:
# set an end_time on events without an end_time on startup # set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where( Event.update(end_time=Event.start_time + 30).where(
@ -81,13 +73,13 @@ class EventProcessor(threading.Thread):
).execute() ).execute()
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: update = self.event_receiver.check_for_update()
source_type, event_type, camera, event_data = self.event_queue.get(
timeout=1 if update == None:
)
except queue.Empty:
continue continue
source_type, event_type, camera, event_data = update
logger.debug( logger.debug(
f"Event received: {source_type} {event_type} {camera} {event_data['id']}" f"Event received: {source_type} {event_type} {camera} {event_data['id']}"
) )
@ -103,7 +95,7 @@ class EventProcessor(threading.Thread):
) )
) )
if event_type == "start": if event_type == EventStateEnum.start:
self.events_in_process[event_data["id"]] = event_data self.events_in_process[event_data["id"]] = event_data
continue continue
@ -125,6 +117,7 @@ class EventProcessor(threading.Thread):
Event.update(end_time=datetime.datetime.now().timestamp()).where( Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None Event.end_time == None
).execute() ).execute()
self.event_receiver.stop()
logger.info("Exiting event processor...") logger.info("Exiting event processor...")
def handle_object_detection( def handle_object_detection(
@ -247,12 +240,14 @@ class EventProcessor(threading.Thread):
# update the stored copy for comparison on future update messages # update the stored copy for comparison on future update messages
self.events_in_process[event_data["id"]] = event_data self.events_in_process[event_data["id"]] = event_data
if event_type == "end": if event_type == EventStateEnum.end:
del self.events_in_process[event_data["id"]] del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera)) self.event_processed_queue.put((event_data["id"], camera))
def handle_external_detection(self, event_type: str, event_data: Event) -> None: def handle_external_detection(
if event_type == "new": self, event_type: EventStateEnum, event_data: Event
) -> None:
if event_type == EventStateEnum.start:
event = { event = {
Event.id: event_data["id"], Event.id: event_data["id"],
Event.label: event_data["label"], Event.label: event_data["label"],
@ -271,7 +266,7 @@ class EventProcessor(threading.Thread):
}, },
} }
Event.insert(event).execute() Event.insert(event).execute()
elif event_type == "end": elif event_type == EventStateEnum.end:
event = { event = {
Event.id: event_data["id"], Event.id: event_data["id"],
Event.end_time: event_data["end_time"], Event.end_time: event_data["end_time"],

14
frigate/events/types.py Normal file
View File

@ -0,0 +1,14 @@
"""Types for event management."""
from enum import Enum
class EventTypeEnum(str, Enum):
api = "api"
tracked_object = "tracked_object"
class EventStateEnum(str, Enum):
start = "start"
update = "update"
end = "end"

View File

@ -14,6 +14,7 @@ import numpy as np
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.dispatcher import Dispatcher from frigate.comms.dispatcher import Dispatcher
from frigate.comms.events_updater import EventUpdatePublisher
from frigate.config import ( from frigate.config import (
CameraConfig, CameraConfig,
FrigateConfig, FrigateConfig,
@ -23,7 +24,7 @@ from frigate.config import (
ZoomingModeEnum, ZoomingModeEnum,
) )
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.events.maintainer import EventTypeEnum from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.util.image import ( from frigate.util.image import (
SharedMemoryFrameManager, SharedMemoryFrameManager,
@ -826,7 +827,6 @@ class TrackedObjectProcessor(threading.Thread):
config: FrigateConfig, config: FrigateConfig,
dispatcher: Dispatcher, dispatcher: Dispatcher,
tracked_objects_queue, tracked_objects_queue,
event_queue,
event_processed_queue, event_processed_queue,
ptz_autotracker_thread, ptz_autotracker_thread,
stop_event, stop_event,
@ -836,7 +836,6 @@ class TrackedObjectProcessor(threading.Thread):
self.config = config self.config = config
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.tracked_objects_queue = tracked_objects_queue self.tracked_objects_queue = tracked_objects_queue
self.event_queue = event_queue
self.event_processed_queue = event_processed_queue self.event_processed_queue = event_processed_queue
self.stop_event = stop_event self.stop_event = stop_event
self.camera_states: dict[str, CameraState] = {} self.camera_states: dict[str, CameraState] = {}
@ -844,10 +843,16 @@ class TrackedObjectProcessor(threading.Thread):
self.last_motion_detected: dict[str, float] = {} self.last_motion_detected: dict[str, float] = {}
self.ptz_autotracker_thread = ptz_autotracker_thread self.ptz_autotracker_thread = ptz_autotracker_thread
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video)
self.event_sender = EventUpdatePublisher()
def start(camera, obj: TrackedObject, current_frame_time): def start(camera, obj: TrackedObject, current_frame_time):
self.event_queue.put( self.event_sender.publish(
(EventTypeEnum.tracked_object, "start", camera, obj.to_dict()) (
EventTypeEnum.tracked_object,
EventStateEnum.start,
camera,
obj.to_dict(),
)
) )
def update(camera, obj: TrackedObject, current_frame_time): def update(camera, obj: TrackedObject, current_frame_time):
@ -861,10 +866,10 @@ class TrackedObjectProcessor(threading.Thread):
} }
self.dispatcher.publish("events", json.dumps(message), retain=False) self.dispatcher.publish("events", json.dumps(message), retain=False)
obj.previous = after obj.previous = after
self.event_queue.put( self.event_sender.publish(
( (
EventTypeEnum.tracked_object, EventTypeEnum.tracked_object,
"update", EventStateEnum.update,
camera, camera,
obj.to_dict(include_thumbnail=True), obj.to_dict(include_thumbnail=True),
) )
@ -923,10 +928,10 @@ class TrackedObjectProcessor(threading.Thread):
self.dispatcher.publish("events", json.dumps(message), retain=False) self.dispatcher.publish("events", json.dumps(message), retain=False)
self.ptz_autotracker_thread.ptz_autotracker.end_object(camera, obj) self.ptz_autotracker_thread.ptz_autotracker.end_object(camera, obj)
self.event_queue.put( self.event_sender.publish(
( (
EventTypeEnum.tracked_object, EventTypeEnum.tracked_object,
"end", EventStateEnum.end,
camera, camera,
obj.to_dict(include_thumbnail=True), obj.to_dict(include_thumbnail=True),
) )
@ -1215,4 +1220,5 @@ class TrackedObjectProcessor(threading.Thread):
self.camera_states[camera].finished(event_id) self.camera_states[camera].finished(event_id)
self.detection_publisher.stop() self.detection_publisher.stop()
self.event_sender.stop()
logger.info("Exiting object processor...") logger.info("Exiting object processor...")