diff --git a/frigate/app.py b/frigate/app.py index 858247866..4c7b175b8 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -28,6 +28,9 @@ from frigate.comms.object_detector_signaler import DetectorProxy from frigate.comms.webpush import WebPushClient from frigate.comms.ws import WebSocketClient from frigate.comms.zmq_proxy import ZmqProxy +from frigate.comms.zmq_req_router_broker import ( + ZmqReqRouterBroker, +) from frigate.config.camera.updater import CameraConfigUpdatePublisher from frigate.config.config import FrigateConfig from frigate.const import ( @@ -307,6 +310,14 @@ class FrigateApp: self.event_metadata_updater = EventMetadataPublisher() self.inter_zmq_proxy = ZmqProxy() self.detection_proxy = DetectorProxy() + self.zmq_router_broker: ZmqReqRouterBroker | None = None + + zmq_detectors = [ + det for det in self.config.detectors.values() if det.type == "zmq" + ] + if any(zmq_detectors): + backend_endpoint = zmq_detectors[0].endpoint + self.zmq_router_broker = ZmqReqRouterBroker(backend_endpoint) def init_onvif(self) -> None: self.onvif_controller = OnvifController(self.config, self.ptz_metrics) @@ -644,6 +655,9 @@ class FrigateApp: self.inter_zmq_proxy.stop() self.detection_proxy.stop() + if self.zmq_router_broker: + self.zmq_router_broker.stop() + while len(self.detection_shms) > 0: shm = self.detection_shms.pop() shm.close() diff --git a/frigate/comms/zmq_req_router_broker.py b/frigate/comms/zmq_req_router_broker.py new file mode 100644 index 000000000..cfdde2586 --- /dev/null +++ b/frigate/comms/zmq_req_router_broker.py @@ -0,0 +1,61 @@ +"""ZMQ REQ/ROUTER front-end to DEALER/REP back-end broker. + +This module provides a small proxy that: +- Binds a ROUTER socket on a fixed local endpoint for REQ clients +- Connects a DEALER socket to the user-configured backend endpoint (REP servers) + +Pattern: REQ -> ROUTER === proxy === DEALER -> REP + +The goal is to allow multiple REQ clients and/or multiple backend workers +to share a single configured connection, enabling multiple models/runners +behind the same broker while keeping local clients stable via constants. +""" + +from __future__ import annotations + +import threading + +import zmq + +REQ_ROUTER_ENDPOINT = "ipc:///tmp/cache/zmq_detector_router" + + +class _RouterDealerRunner(threading.Thread): + def __init__(self, context: zmq.Context[zmq.Socket], backend_endpoint: str) -> None: + super().__init__(name="zmq_router_dealer_broker", daemon=True) + self.context = context + self.backend_endpoint = backend_endpoint + + def run(self) -> None: + frontend = self.context.socket(zmq.ROUTER) + frontend.bind(REQ_ROUTER_ENDPOINT) + + backend = self.context.socket(zmq.DEALER) + backend.connect(self.backend_endpoint) + + try: + zmq.proxy(frontend, backend) + except zmq.ZMQError: + # Unblocked when context is destroyed in the controller + pass + + +class ZmqReqRouterBroker: + """Starts a ROUTER/DEALER proxy bridging local REQ clients to backend REP. + + - ROUTER binds to REQ_ROUTER_ENDPOINT (constant, local) + - DEALER connects to the provided backend_endpoint (user-configured) + """ + + def __init__(self, backend_endpoint: str) -> None: + self.backend_endpoint = backend_endpoint + self.context = zmq.Context() + self.runner = _RouterDealerRunner(self.context, backend_endpoint) + self.runner.start() + + def stop(self) -> None: + # Destroying the context signals the proxy to stop + try: + self.context.destroy() + finally: + self.runner.join() diff --git a/frigate/detectors/plugins/zmq_ipc.py b/frigate/detectors/plugins/zmq_ipc.py index cd397aefa..2ef3d4e03 100644 --- a/frigate/detectors/plugins/zmq_ipc.py +++ b/frigate/detectors/plugins/zmq_ipc.py @@ -8,6 +8,7 @@ import zmq from pydantic import Field from typing_extensions import Literal +from frigate.comms.zmq_req_router_broker import REQ_ROUTER_ENDPOINT from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detector_config import BaseDetectorConfig @@ -60,7 +61,7 @@ class ZmqIpcDetector(DetectionApi): super().__init__(detector_config) self._context = zmq.Context() - self._endpoint = detector_config.endpoint + self._endpoint = REQ_ROUTER_ENDPOINT self._request_timeout_ms = detector_config.request_timeout_ms self._linger_ms = detector_config.linger_ms self._socket = None