Implement broker/dealer router

This commit is contained in:
Nicolas Mowen 2025-09-22 11:14:53 -06:00
parent bdb7a18602
commit 8b78c85bda
3 changed files with 77 additions and 1 deletions

View File

@ -28,6 +28,9 @@ from frigate.comms.object_detector_signaler import DetectorProxy
from frigate.comms.webpush import WebPushClient
from frigate.comms.ws import WebSocketClient
from frigate.comms.zmq_proxy import ZmqProxy
from frigate.comms.zmq_req_router_broker import (
ZmqReqRouterBroker,
)
from frigate.config.camera.updater import CameraConfigUpdatePublisher
from frigate.config.config import FrigateConfig
from frigate.const import (
@ -307,6 +310,14 @@ class FrigateApp:
self.event_metadata_updater = EventMetadataPublisher()
self.inter_zmq_proxy = ZmqProxy()
self.detection_proxy = DetectorProxy()
self.zmq_router_broker: ZmqReqRouterBroker | None = None
zmq_detectors = [
det for det in self.config.detectors.values() if det.type == "zmq"
]
if any(zmq_detectors):
backend_endpoint = zmq_detectors[0].endpoint
self.zmq_router_broker = ZmqReqRouterBroker(backend_endpoint)
def init_onvif(self) -> None:
self.onvif_controller = OnvifController(self.config, self.ptz_metrics)
@ -644,6 +655,9 @@ class FrigateApp:
self.inter_zmq_proxy.stop()
self.detection_proxy.stop()
if self.zmq_router_broker:
self.zmq_router_broker.stop()
while len(self.detection_shms) > 0:
shm = self.detection_shms.pop()
shm.close()

View File

@ -0,0 +1,61 @@
"""ZMQ REQ/ROUTER front-end to DEALER/REP back-end broker.
This module provides a small proxy that:
- Binds a ROUTER socket on a fixed local endpoint for REQ clients
- Connects a DEALER socket to the user-configured backend endpoint (REP servers)
Pattern: REQ -> ROUTER === proxy === DEALER -> REP
The goal is to allow multiple REQ clients and/or multiple backend workers
to share a single configured connection, enabling multiple models/runners
behind the same broker while keeping local clients stable via constants.
"""
from __future__ import annotations
import threading
import zmq
REQ_ROUTER_ENDPOINT = "ipc:///tmp/cache/zmq_detector_router"
class _RouterDealerRunner(threading.Thread):
def __init__(self, context: zmq.Context[zmq.Socket], backend_endpoint: str) -> None:
super().__init__(name="zmq_router_dealer_broker", daemon=True)
self.context = context
self.backend_endpoint = backend_endpoint
def run(self) -> None:
frontend = self.context.socket(zmq.ROUTER)
frontend.bind(REQ_ROUTER_ENDPOINT)
backend = self.context.socket(zmq.DEALER)
backend.connect(self.backend_endpoint)
try:
zmq.proxy(frontend, backend)
except zmq.ZMQError:
# Unblocked when context is destroyed in the controller
pass
class ZmqReqRouterBroker:
"""Starts a ROUTER/DEALER proxy bridging local REQ clients to backend REP.
- ROUTER binds to REQ_ROUTER_ENDPOINT (constant, local)
- DEALER connects to the provided backend_endpoint (user-configured)
"""
def __init__(self, backend_endpoint: str) -> None:
self.backend_endpoint = backend_endpoint
self.context = zmq.Context()
self.runner = _RouterDealerRunner(self.context, backend_endpoint)
self.runner.start()
def stop(self) -> None:
# Destroying the context signals the proxy to stop
try:
self.context.destroy()
finally:
self.runner.join()

View File

@ -8,6 +8,7 @@ import zmq
from pydantic import Field
from typing_extensions import Literal
from frigate.comms.zmq_req_router_broker import REQ_ROUTER_ENDPOINT
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig
@ -60,7 +61,7 @@ class ZmqIpcDetector(DetectionApi):
super().__init__(detector_config)
self._context = zmq.Context()
self._endpoint = detector_config.endpoint
self._endpoint = REQ_ROUTER_ENDPOINT
self._request_timeout_ms = detector_config.request_timeout_ms
self._linger_ms = detector_config.linger_ms
self._socket = None