diff --git a/docker/main/requirements-wheels.txt b/docker/main/requirements-wheels.txt index 69da2a0e3..99724d57e 100644 --- a/docker/main/requirements-wheels.txt +++ b/docker/main/requirements-wheels.txt @@ -4,11 +4,11 @@ imutils == 0.5.* markupsafe == 2.1.* matplotlib == 3.7.* mypy == 1.6.1 -numpy == 1.23.* +numpy == 1.26.* onvif_zeep == 0.2.12 opencv-python-headless == 4.7.0.* paho-mqtt == 2.0.* -pandas == 2.1.4 +pandas == 2.2.* peewee == 3.17.* peewee_migrate == 1.12.* psutil == 5.9.* diff --git a/frigate/app.py b/frigate/app.py index d0d5b76c7..74d3f2f58 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -200,9 +200,6 @@ class FrigateApp: logging.getLogger("ws4py").setLevel("ERROR") def init_queues(self) -> None: - # Queues for clip processing - self.event_processed_queue: Queue = mp.Queue() - # Queue for cameras to push tracked objects to self.detected_frames_queue: Queue = mp.Queue( maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2 @@ -420,7 +417,6 @@ class FrigateApp: self.config, self.dispatcher, self.detected_frames_queue, - self.event_processed_queue, self.ptz_autotracker_thread, self.stop_event, ) @@ -517,7 +513,6 @@ class FrigateApp: def start_event_processor(self) -> None: self.event_processor = EventProcessor( self.config, - self.event_processed_queue, self.timeline_queue, self.stop_event, ) @@ -704,7 +699,6 @@ class FrigateApp: shm.unlink() for queue in [ - self.event_processed_queue, self.detected_frames_queue, self.log_queue, ]: diff --git a/frigate/comms/detections_updater.py b/frigate/comms/detections_updater.py index fa4f56252..37da1586e 100644 --- a/frigate/comms/detections_updater.py +++ b/frigate/comms/detections_updater.py @@ -8,7 +8,7 @@ import zmq SOCKET_CONTROL = "inproc://control.detections_updater" SOCKET_PUB = "ipc:///tmp/cache/detect_pub" -SOCKET_SUB = "ipc:///tmp/cache/detect_sun" +SOCKET_SUB = "ipc:///tmp/cache/detect_sub" class DetectionTypeEnum(str, Enum): diff --git a/frigate/comms/events_updater.py b/frigate/comms/events_updater.py index cb18667a1..29207df33 100644 --- a/frigate/comms/events_updater.py +++ b/frigate/comms/events_updater.py @@ -5,6 +5,7 @@ import zmq from frigate.events.types import EventStateEnum, EventTypeEnum SOCKET_PUSH_PULL = "ipc:///tmp/cache/events" +SOCKET_PUSH_PULL_END = "ipc:///tmp/cache/events_ended" class EventUpdatePublisher: @@ -37,7 +38,53 @@ class EventUpdateSubscriber: def check_for_update( self, timeout=1 ) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]: - """Returns updated config or None if no update.""" + """Returns events or None if no update.""" + try: + has_update, _, _ = zmq.select([self.socket], [], [], timeout) + + if has_update: + return self.socket.recv_pyobj() + except zmq.ZMQError: + pass + + return None + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + +class EventEndPublisher: + """Publishes events that have ended.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUSH) + self.socket.connect(SOCKET_PUSH_PULL_END) + + def publish( + self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]] + ) -> None: + """There is no communication back to the processes.""" + self.socket.send_pyobj(payload) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + +class EventEndSubscriber: + """Receives events that have ended.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PULL) + self.socket.bind(SOCKET_PUSH_PULL_END) + + def check_for_update( + self, timeout=1 + ) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]: + """Returns events ended or None if no update.""" try: has_update, _, _ = zmq.select([self.socket], [], [], timeout) diff --git a/frigate/events/maintainer.py b/frigate/events/maintainer.py index 720022e05..e6f66edc3 100644 --- a/frigate/events/maintainer.py +++ b/frigate/events/maintainer.py @@ -5,7 +5,7 @@ from multiprocessing import Queue from multiprocessing.synchronize import Event as MpEvent from typing import Dict -from frigate.comms.events_updater import EventUpdateSubscriber +from frigate.comms.events_updater import EventEndPublisher, EventUpdateSubscriber from frigate.config import EventsConfig, FrigateConfig from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.models import Event @@ -52,19 +52,18 @@ class EventProcessor(threading.Thread): def __init__( self, config: FrigateConfig, - event_processed_queue: Queue, timeline_queue: Queue, stop_event: MpEvent, ): threading.Thread.__init__(self) self.name = "event_processor" self.config = config - self.event_processed_queue = event_processed_queue self.timeline_queue = timeline_queue self.events_in_process: Dict[str, Event] = {} self.stop_event = stop_event self.event_receiver = EventUpdateSubscriber() + self.event_end_publisher = EventEndPublisher() def run(self) -> None: # set an end_time on events without an end_time on startup @@ -118,6 +117,7 @@ class EventProcessor(threading.Thread): Event.end_time == None ).execute() self.event_receiver.stop() + self.event_end_publisher.stop() logger.info("Exiting event processor...") def handle_object_detection( @@ -242,7 +242,7 @@ class EventProcessor(threading.Thread): if event_type == EventStateEnum.end: del self.events_in_process[event_data["id"]] - self.event_processed_queue.put((event_data["id"], camera)) + self.event_end_publisher.publish((event_data["id"], camera)) def handle_external_detection( self, event_type: EventStateEnum, event_data: Event diff --git a/frigate/object_processing.py b/frigate/object_processing.py index c5e8101dc..a244838d1 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -6,6 +6,7 @@ import os import queue import threading from collections import Counter, defaultdict +from multiprocessing.synchronize import Event as MpEvent from statistics import median from typing import Callable @@ -14,7 +15,7 @@ import numpy as np from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.dispatcher import Dispatcher -from frigate.comms.events_updater import EventUpdatePublisher +from frigate.comms.events_updater import EventEndSubscriber, EventUpdatePublisher from frigate.config import ( CameraConfig, FrigateConfig, @@ -827,7 +828,6 @@ class TrackedObjectProcessor(threading.Thread): config: FrigateConfig, dispatcher: Dispatcher, tracked_objects_queue, - event_processed_queue, ptz_autotracker_thread, stop_event, ): @@ -836,14 +836,14 @@ class TrackedObjectProcessor(threading.Thread): self.config = config self.dispatcher = dispatcher self.tracked_objects_queue = tracked_objects_queue - self.event_processed_queue = event_processed_queue - self.stop_event = stop_event + self.stop_event: MpEvent = stop_event self.camera_states: dict[str, CameraState] = {} self.frame_manager = SharedMemoryFrameManager() self.last_motion_detected: dict[str, float] = {} self.ptz_autotracker_thread = ptz_autotracker_thread self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video) self.event_sender = EventUpdatePublisher() + self.event_end_subscriber = EventEndSubscriber() def start(camera, obj: TrackedObject, current_frame_time): self.event_sender.publish( @@ -1215,10 +1215,16 @@ class TrackedObjectProcessor(threading.Thread): ) # cleanup event finished queue - while not self.event_processed_queue.empty(): - event_id, camera = self.event_processed_queue.get() + while not self.stop_event.is_set(): + update = self.event_end_subscriber.check_for_update(timeout=0.01) + + if not update: + break + + event_id, camera = update self.camera_states[camera].finished(event_id) self.detection_publisher.stop() self.event_sender.stop() + self.event_end_subscriber.stop() logger.info("Exiting object processor...")