Use zmq for event end queue and update python deps (#10886)

* Use zmq for events ended

* Cleanup

* Update deps

* formatting
This commit is contained in:
Nicolas Mowen 2024-04-08 17:19:45 -06:00 committed by GitHub
parent c577361923
commit 8163c036ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 67 additions and 20 deletions

View File

@ -4,11 +4,11 @@ imutils == 0.5.*
markupsafe == 2.1.* markupsafe == 2.1.*
matplotlib == 3.7.* matplotlib == 3.7.*
mypy == 1.6.1 mypy == 1.6.1
numpy == 1.23.* numpy == 1.26.*
onvif_zeep == 0.2.12 onvif_zeep == 0.2.12
opencv-python-headless == 4.7.0.* opencv-python-headless == 4.7.0.*
paho-mqtt == 2.0.* paho-mqtt == 2.0.*
pandas == 2.1.4 pandas == 2.2.*
peewee == 3.17.* peewee == 3.17.*
peewee_migrate == 1.12.* peewee_migrate == 1.12.*
psutil == 5.9.* psutil == 5.9.*

View File

@ -200,9 +200,6 @@ class FrigateApp:
logging.getLogger("ws4py").setLevel("ERROR") logging.getLogger("ws4py").setLevel("ERROR")
def init_queues(self) -> None: def init_queues(self) -> None:
# Queues for clip processing
self.event_processed_queue: Queue = mp.Queue()
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue( self.detected_frames_queue: Queue = mp.Queue(
maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2 maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2
@ -420,7 +417,6 @@ class FrigateApp:
self.config, self.config,
self.dispatcher, self.dispatcher,
self.detected_frames_queue, self.detected_frames_queue,
self.event_processed_queue,
self.ptz_autotracker_thread, self.ptz_autotracker_thread,
self.stop_event, self.stop_event,
) )
@ -517,7 +513,6 @@ class FrigateApp:
def start_event_processor(self) -> None: def start_event_processor(self) -> None:
self.event_processor = EventProcessor( self.event_processor = EventProcessor(
self.config, self.config,
self.event_processed_queue,
self.timeline_queue, self.timeline_queue,
self.stop_event, self.stop_event,
) )
@ -704,7 +699,6 @@ class FrigateApp:
shm.unlink() shm.unlink()
for queue in [ for queue in [
self.event_processed_queue,
self.detected_frames_queue, self.detected_frames_queue,
self.log_queue, self.log_queue,
]: ]:

View File

@ -8,7 +8,7 @@ import zmq
SOCKET_CONTROL = "inproc://control.detections_updater" SOCKET_CONTROL = "inproc://control.detections_updater"
SOCKET_PUB = "ipc:///tmp/cache/detect_pub" SOCKET_PUB = "ipc:///tmp/cache/detect_pub"
SOCKET_SUB = "ipc:///tmp/cache/detect_sun" SOCKET_SUB = "ipc:///tmp/cache/detect_sub"
class DetectionTypeEnum(str, Enum): class DetectionTypeEnum(str, Enum):

View File

@ -5,6 +5,7 @@ import zmq
from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.events.types import EventStateEnum, EventTypeEnum
SOCKET_PUSH_PULL = "ipc:///tmp/cache/events" SOCKET_PUSH_PULL = "ipc:///tmp/cache/events"
SOCKET_PUSH_PULL_END = "ipc:///tmp/cache/events_ended"
class EventUpdatePublisher: class EventUpdatePublisher:
@ -37,7 +38,53 @@ class EventUpdateSubscriber:
def check_for_update( def check_for_update(
self, timeout=1 self, timeout=1
) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]: ) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]:
"""Returns updated config or None if no update.""" """Returns events or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
if has_update:
return self.socket.recv_pyobj()
except zmq.ZMQError:
pass
return None
def stop(self) -> None:
self.socket.close()
self.context.destroy()
class EventEndPublisher:
"""Publishes events that have ended."""
def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUSH)
self.socket.connect(SOCKET_PUSH_PULL_END)
def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]
) -> None:
"""There is no communication back to the processes."""
self.socket.send_pyobj(payload)
def stop(self) -> None:
self.socket.close()
self.context.destroy()
class EventEndSubscriber:
"""Receives events that have ended."""
def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
self.socket.bind(SOCKET_PUSH_PULL_END)
def check_for_update(
self, timeout=1
) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]:
"""Returns events ended or None if no update."""
try: try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout) has_update, _, _ = zmq.select([self.socket], [], [], timeout)

View File

@ -5,7 +5,7 @@ from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Dict from typing import Dict
from frigate.comms.events_updater import EventUpdateSubscriber from frigate.comms.events_updater import EventEndPublisher, EventUpdateSubscriber
from frigate.config import EventsConfig, FrigateConfig from frigate.config import EventsConfig, FrigateConfig
from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event from frigate.models import Event
@ -52,19 +52,18 @@ class EventProcessor(threading.Thread):
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
event_processed_queue: Queue,
timeline_queue: Queue, timeline_queue: Queue,
stop_event: MpEvent, stop_event: MpEvent,
): ):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "event_processor" self.name = "event_processor"
self.config = config self.config = config
self.event_processed_queue = event_processed_queue
self.timeline_queue = timeline_queue self.timeline_queue = timeline_queue
self.events_in_process: Dict[str, Event] = {} self.events_in_process: Dict[str, Event] = {}
self.stop_event = stop_event self.stop_event = stop_event
self.event_receiver = EventUpdateSubscriber() self.event_receiver = EventUpdateSubscriber()
self.event_end_publisher = EventEndPublisher()
def run(self) -> None: def run(self) -> None:
# set an end_time on events without an end_time on startup # set an end_time on events without an end_time on startup
@ -118,6 +117,7 @@ class EventProcessor(threading.Thread):
Event.end_time == None Event.end_time == None
).execute() ).execute()
self.event_receiver.stop() self.event_receiver.stop()
self.event_end_publisher.stop()
logger.info("Exiting event processor...") logger.info("Exiting event processor...")
def handle_object_detection( def handle_object_detection(
@ -242,7 +242,7 @@ class EventProcessor(threading.Thread):
if event_type == EventStateEnum.end: if event_type == EventStateEnum.end:
del self.events_in_process[event_data["id"]] del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera)) self.event_end_publisher.publish((event_data["id"], camera))
def handle_external_detection( def handle_external_detection(
self, event_type: EventStateEnum, event_data: Event self, event_type: EventStateEnum, event_data: Event

View File

@ -6,6 +6,7 @@ import os
import queue import queue
import threading import threading
from collections import Counter, defaultdict from collections import Counter, defaultdict
from multiprocessing.synchronize import Event as MpEvent
from statistics import median from statistics import median
from typing import Callable from typing import Callable
@ -14,7 +15,7 @@ import numpy as np
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.dispatcher import Dispatcher from frigate.comms.dispatcher import Dispatcher
from frigate.comms.events_updater import EventUpdatePublisher from frigate.comms.events_updater import EventEndSubscriber, EventUpdatePublisher
from frigate.config import ( from frigate.config import (
CameraConfig, CameraConfig,
FrigateConfig, FrigateConfig,
@ -827,7 +828,6 @@ class TrackedObjectProcessor(threading.Thread):
config: FrigateConfig, config: FrigateConfig,
dispatcher: Dispatcher, dispatcher: Dispatcher,
tracked_objects_queue, tracked_objects_queue,
event_processed_queue,
ptz_autotracker_thread, ptz_autotracker_thread,
stop_event, stop_event,
): ):
@ -836,14 +836,14 @@ class TrackedObjectProcessor(threading.Thread):
self.config = config self.config = config
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.tracked_objects_queue = tracked_objects_queue self.tracked_objects_queue = tracked_objects_queue
self.event_processed_queue = event_processed_queue self.stop_event: MpEvent = stop_event
self.stop_event = stop_event
self.camera_states: dict[str, CameraState] = {} self.camera_states: dict[str, CameraState] = {}
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
self.last_motion_detected: dict[str, float] = {} self.last_motion_detected: dict[str, float] = {}
self.ptz_autotracker_thread = ptz_autotracker_thread self.ptz_autotracker_thread = ptz_autotracker_thread
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video)
self.event_sender = EventUpdatePublisher() self.event_sender = EventUpdatePublisher()
self.event_end_subscriber = EventEndSubscriber()
def start(camera, obj: TrackedObject, current_frame_time): def start(camera, obj: TrackedObject, current_frame_time):
self.event_sender.publish( self.event_sender.publish(
@ -1215,10 +1215,16 @@ class TrackedObjectProcessor(threading.Thread):
) )
# cleanup event finished queue # cleanup event finished queue
while not self.event_processed_queue.empty(): while not self.stop_event.is_set():
event_id, camera = self.event_processed_queue.get() update = self.event_end_subscriber.check_for_update(timeout=0.01)
if not update:
break
event_id, camera = update
self.camera_states[camera].finished(event_id) self.camera_states[camera].finished(event_id)
self.detection_publisher.stop() self.detection_publisher.stop()
self.event_sender.stop() self.event_sender.stop()
self.event_end_subscriber.stop()
logger.info("Exiting object processor...") logger.info("Exiting object processor...")