From dba21b606dc8eff41a5ffaf5364b4d0e0c480e8d Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 19 Feb 2024 06:26:59 -0700 Subject: [PATCH] Convert config updating and video/audio queues to use zmq (#9893) * Add config pub / sub pattern * remove recording from feature metrics * remove audio and feature metrics * Check for updates from all cameras * remove birdseye from camera metrics * remove motion and detection camera metrics * Ensure that all processes are stopped * Stop communicators * Detections * Cleanup video output queue * Use select for time sensitive polls * Use ipc instead of tcp --- frigate/app.py | 107 ++++--------------------- frigate/comms/config_updater.py | 51 ++++++++++++ frigate/comms/detections_updater.py | 102 ++++++++++++++++++++++++ frigate/comms/dispatcher.py | 87 ++++++++++---------- frigate/comms/inter_process.py | 19 ++--- frigate/config.py | 16 +++- frigate/const.py | 4 - frigate/events/audio.py | 31 +++++--- frigate/motion/__init__.py | 4 + frigate/motion/improved_motion.py | 35 +++++--- frigate/object_processing.py | 21 ++--- frigate/output/birdseye.py | 41 +++++----- frigate/output/output.py | 42 +++++----- frigate/record/maintainer.py | 119 ++++++++++++---------------- frigate/record/record.py | 11 +-- frigate/stats.py | 2 +- frigate/types.py | 12 --- frigate/video.py | 26 +++--- 18 files changed, 394 insertions(+), 336 deletions(-) create mode 100644 frigate/comms/config_updater.py create mode 100644 frigate/comms/detections_updater.py diff --git a/frigate/app.py b/frigate/app.py index 08bb694a0..81050e30b 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -17,11 +17,13 @@ from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase +from frigate.comms.config_updater import ConfigPublisher +from frigate.comms.detections_updater import DetectionProxy from frigate.comms.dispatcher import Communicator, Dispatcher from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient from frigate.comms.ws import WebSocketClient -from frigate.config import BirdseyeModeEnum, FrigateConfig +from frigate.config import FrigateConfig from frigate.const import ( CACHE_DIR, CLIPS_DIR, @@ -56,7 +58,7 @@ from frigate.record.record import manage_recordings from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor -from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes +from frigate.types import CameraMetricsTypes, PTZMetricsTypes from frigate.util.object import get_camera_regions_grid from frigate.version import VERSION from frigate.video import capture_camera, track_camera @@ -75,7 +77,6 @@ class FrigateApp: self.log_queue: Queue = mp.Queue() self.plus_api = PlusApi() self.camera_metrics: dict[str, CameraMetricsTypes] = {} - self.feature_metrics: dict[str, FeatureMetricsTypes] = {} self.ptz_metrics: dict[str, PTZMetricsTypes] = {} self.processes: dict[str, int] = {} self.region_grids: dict[str, list[list[dict[str, int]]]] = {} @@ -129,35 +130,6 @@ class FrigateApp: # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards "process_fps": mp.Value("d", 0.0), # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "detection_enabled": mp.Value( # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "i", - self.config.cameras[camera_name].detect.enabled, - ), - "motion_enabled": mp.Value("i", True), # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "improve_contrast_enabled": mp.Value( # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "i", - self.config.cameras[camera_name].motion.improve_contrast, - ), - "motion_threshold": mp.Value( # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "i", - self.config.cameras[camera_name].motion.threshold, - ), - "motion_contour_area": mp.Value( # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "i", - self.config.cameras[camera_name].motion.contour_area, - ), "detection_fps": mp.Value("d", 0.0), # type: ignore[typeddict-item] # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards @@ -175,20 +147,6 @@ class FrigateApp: "process": None, "audio_rms": mp.Value("d", 0.0), # type: ignore[typeddict-item] "audio_dBFS": mp.Value("d", 0.0), # type: ignore[typeddict-item] - "birdseye_enabled": mp.Value( # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "i", - self.config.cameras[camera_name].birdseye.enabled, - ), - "birdseye_mode": mp.Value( # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "i", - BirdseyeModeEnum.get_index( - self.config.cameras[camera_name].birdseye.mode.value - ), - ), } self.ptz_metrics[camera_name] = { "ptz_autotracker_enabled": mp.Value( # type: ignore[typeddict-item] @@ -220,20 +178,6 @@ class FrigateApp: # from mypy 0.981 onwards } self.ptz_metrics[camera_name]["ptz_motor_stopped"].set() - self.feature_metrics[camera_name] = { - "audio_enabled": mp.Value( # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "i", - self.config.cameras[camera_name].audio.enabled, - ), - "record_enabled": mp.Value( # type: ignore[typeddict-item] - # issue https://github.com/python/typeshed/issues/8799 - # from mypy 0.981 onwards - "i", - self.config.cameras[camera_name].record.enabled, - ), - } def set_log_levels(self) -> None: logging.getLogger().setLevel(self.config.logger.default.value.upper()) @@ -250,25 +194,12 @@ class FrigateApp: # Queues for clip processing self.event_queue: Queue = mp.Queue() self.event_processed_queue: Queue = mp.Queue() - self.video_output_queue: Queue = mp.Queue( - maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2 - ) # Queue for cameras to push tracked objects to self.detected_frames_queue: Queue = mp.Queue( maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2 ) - # Queue for object recordings info - self.object_recordings_info_queue: Queue = mp.Queue() - - # Queue for audio recordings info if enabled - self.audio_recordings_info_queue: Optional[Queue] = ( - mp.Queue() - if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0 - else None - ) - # Queue for timeline events self.timeline_queue: Queue = mp.Queue() @@ -344,12 +275,7 @@ class FrigateApp: recording_process = mp.Process( target=manage_recordings, name="recording_manager", - args=( - self.config, - self.object_recordings_info_queue, - self.audio_recordings_info_queue, - self.feature_metrics, - ), + args=(self.config,), ) recording_process.daemon = True self.recording_process = recording_process @@ -386,6 +312,8 @@ class FrigateApp: def init_inter_process_communicator(self) -> None: self.inter_process_communicator = InterProcessCommunicator() + self.inter_config_updater = ConfigPublisher() + self.inter_detection_proxy = DetectionProxy() def init_web_server(self) -> None: self.flask_app = create_app( @@ -413,9 +341,8 @@ class FrigateApp: self.dispatcher = Dispatcher( self.config, + self.inter_config_updater, self.onvif_controller, - self.camera_metrics, - self.feature_metrics, self.ptz_metrics, comms, ) @@ -474,8 +401,6 @@ class FrigateApp: self.detected_frames_queue, self.event_queue, self.event_processed_queue, - self.video_output_queue, - self.object_recordings_info_queue, self.ptz_autotracker_thread, self.stop_event, ) @@ -485,11 +410,7 @@ class FrigateApp: output_processor = mp.Process( target=output_frames, name="output_processor", - args=( - self.config, - self.video_output_queue, - self.camera_metrics, - ), + args=(self.config,), ) output_processor.daemon = True self.output_processor = output_processor @@ -559,9 +480,7 @@ class FrigateApp: name="audio_capture", args=( self.config, - self.audio_recordings_info_queue, self.camera_metrics, - self.feature_metrics, ), ) audio_process.daemon = True @@ -762,10 +681,7 @@ class FrigateApp: for queue in [ self.event_queue, self.event_processed_queue, - self.video_output_queue, self.detected_frames_queue, - self.object_recordings_info_queue, - self.audio_recordings_info_queue, self.log_queue, ]: if queue is not None: @@ -773,3 +689,8 @@ class FrigateApp: queue.get_nowait() queue.close() queue.join_thread() + + # Stop Communicators + self.inter_process_communicator.stop() + self.inter_config_updater.stop() + self.inter_detection_proxy.stop() diff --git a/frigate/comms/config_updater.py b/frigate/comms/config_updater.py new file mode 100644 index 000000000..273103911 --- /dev/null +++ b/frigate/comms/config_updater.py @@ -0,0 +1,51 @@ +"""Facilitates communication between processes.""" + +import multiprocessing as mp +from multiprocessing.synchronize import Event as MpEvent +from typing import Optional + +import zmq + +SOCKET_PUB_SUB = "ipc:///tmp/cache/config" + + +class ConfigPublisher: + """Publishes config changes to different processes.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.bind(SOCKET_PUB_SUB) + self.stop_event: MpEvent = mp.Event() + + def publish(self, topic: str, payload: any) -> None: + """There is no communication back to the processes.""" + self.socket.send_string(topic, flags=zmq.SNDMORE) + self.socket.send_pyobj(payload) + + def stop(self) -> None: + self.stop_event.set() + self.socket.close() + self.context.destroy() + + +class ConfigSubscriber: + """Simplifies receiving an updated config.""" + + def __init__(self, topic: str) -> None: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt_string(zmq.SUBSCRIBE, topic) + self.socket.connect(SOCKET_PUB_SUB) + + def check_for_update(self) -> Optional[tuple[str, any]]: + """Returns updated config or None if no update.""" + try: + topic = self.socket.recv_string(flags=zmq.NOBLOCK) + return (topic, self.socket.recv_pyobj()) + except zmq.ZMQError: + return (None, None) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() diff --git a/frigate/comms/detections_updater.py b/frigate/comms/detections_updater.py new file mode 100644 index 000000000..ff544dfbd --- /dev/null +++ b/frigate/comms/detections_updater.py @@ -0,0 +1,102 @@ +"""Facilitates communication between processes.""" + +import threading +from enum import Enum +from typing import Optional + +import zmq + +SOCKET_CONTROL = "inproc://control.detections_updater" +SOCKET_PUB = "ipc:///tmp/cache/detect_pub" +SOCKET_SUB = "ipc:///tmp/cache/detect_sun" + + +class DetectionTypeEnum(str, Enum): + all = "" + video = "video" + audio = "audio" + + +class DetectionProxyRunner(threading.Thread): + def __init__(self, context: zmq.Context[zmq.Socket]) -> None: + threading.Thread.__init__(self) + self.name = "detection_proxy" + self.context = context + + def run(self) -> None: + """Run the proxy.""" + control = self.context.socket(zmq.SUB) + control.connect(SOCKET_CONTROL) + control.setsockopt_string(zmq.SUBSCRIBE, "") + incoming = self.context.socket(zmq.XSUB) + incoming.bind(SOCKET_PUB) + outgoing = self.context.socket(zmq.XPUB) + outgoing.bind(SOCKET_SUB) + + zmq.proxy_steerable( + incoming, outgoing, None, control + ) # blocking, will unblock terminate message is received + incoming.close() + outgoing.close() + + +class DetectionProxy: + """Proxies video and audio detections.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.control = self.context.socket(zmq.PUB) + self.control.bind(SOCKET_CONTROL) + self.runner = DetectionProxyRunner(self.context) + self.runner.start() + + def stop(self) -> None: + self.control.send_string("TERMINATE") # tell the proxy to stop + self.runner.join() + self.context.destroy() + + +class DetectionPublisher: + """Simplifies receiving video and audio detections.""" + + def __init__(self, topic: DetectionTypeEnum) -> None: + self.topic = topic + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.connect(SOCKET_PUB) + + def send_data(self, payload: any) -> None: + """Publish detection.""" + self.socket.send_string(self.topic.value, flags=zmq.SNDMORE) + self.socket.send_pyobj(payload) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + +class DetectionSubscriber: + """Simplifies receiving video and audio detections.""" + + def __init__(self, topic: DetectionTypeEnum) -> None: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value) + self.socket.connect(SOCKET_SUB) + + def get_data(self, timeout: float = None) -> Optional[tuple[str, any]]: + """Returns detections or None if no update.""" + try: + has_update, _, _ = zmq.select([self.socket], [], [], timeout) + + if has_update: + topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)] + return (topic, self.socket.recv_pyobj()) + except zmq.ZMQError: + pass + + return (None, None) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 0af4ff37d..84b84eb3c 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -4,11 +4,12 @@ import logging from abc import ABC, abstractmethod from typing import Any, Callable, Optional +from frigate.comms.config_updater import ConfigPublisher from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID from frigate.models import Previews, Recordings from frigate.ptz.onvif import OnvifCommandEnum, OnvifController -from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes +from frigate.types import PTZMetricsTypes from frigate.util.object import get_camera_regions_grid from frigate.util.services import restart_frigate @@ -40,16 +41,14 @@ class Dispatcher: def __init__( self, config: FrigateConfig, + config_updater: ConfigPublisher, onvif: OnvifController, - camera_metrics: dict[str, CameraMetricsTypes], - feature_metrics: dict[str, FeatureMetricsTypes], ptz_metrics: dict[str, PTZMetricsTypes], communicators: list[Communicator], ) -> None: self.config = config + self.config_updater = config_updater self.onvif = onvif - self.camera_metrics = camera_metrics - self.feature_metrics = feature_metrics self.ptz_metrics = ptz_metrics self.comms = communicators @@ -118,44 +117,51 @@ class Dispatcher: def _on_detect_command(self, camera_name: str, payload: str) -> None: """Callback for detect topic.""" detect_settings = self.config.cameras[camera_name].detect + motion_settings = self.config.cameras[camera_name].motion if payload == "ON": - if not self.camera_metrics[camera_name]["detection_enabled"].value: + if not detect_settings.enabled: logger.info(f"Turning on detection for {camera_name}") - self.camera_metrics[camera_name]["detection_enabled"].value = True detect_settings.enabled = True - if not self.camera_metrics[camera_name]["motion_enabled"].value: + if not motion_settings.enabled: logger.info( f"Turning on motion for {camera_name} due to detection being enabled." ) - self.camera_metrics[camera_name]["motion_enabled"].value = True + motion_settings.enabled = True + self.config_updater.publish( + f"config/motion/{camera_name}", motion_settings + ) self.publish(f"{camera_name}/motion/state", payload, retain=True) elif payload == "OFF": - if self.camera_metrics[camera_name]["detection_enabled"].value: + if detect_settings.enabled: logger.info(f"Turning off detection for {camera_name}") - self.camera_metrics[camera_name]["detection_enabled"].value = False detect_settings.enabled = False + self.config_updater.publish(f"config/detect/{camera_name}", detect_settings) self.publish(f"{camera_name}/detect/state", payload, retain=True) def _on_motion_command(self, camera_name: str, payload: str) -> None: """Callback for motion topic.""" + detect_settings = self.config.cameras[camera_name].detect + motion_settings = self.config.cameras[camera_name].motion + if payload == "ON": - if not self.camera_metrics[camera_name]["motion_enabled"].value: + if not motion_settings.enabled: logger.info(f"Turning on motion for {camera_name}") - self.camera_metrics[camera_name]["motion_enabled"].value = True + motion_settings.enabled = True elif payload == "OFF": - if self.camera_metrics[camera_name]["detection_enabled"].value: + if detect_settings.enabled: logger.error( "Turning off motion is not allowed when detection is enabled." ) return - if self.camera_metrics[camera_name]["motion_enabled"].value: + if motion_settings.enabled: logger.info(f"Turning off motion for {camera_name}") - self.camera_metrics[camera_name]["motion_enabled"].value = False + motion_settings.enabled = False + self.config_updater.publish(f"config/motion/{camera_name}", motion_settings) self.publish(f"{camera_name}/motion/state", payload, retain=True) def _on_motion_improve_contrast_command( @@ -165,20 +171,15 @@ class Dispatcher: motion_settings = self.config.cameras[camera_name].motion if payload == "ON": - if not self.camera_metrics[camera_name]["improve_contrast_enabled"].value: + if not motion_settings.improve_contrast: logger.info(f"Turning on improve contrast for {camera_name}") - self.camera_metrics[camera_name][ - "improve_contrast_enabled" - ].value = True motion_settings.improve_contrast = True # type: ignore[union-attr] elif payload == "OFF": - if self.camera_metrics[camera_name]["improve_contrast_enabled"].value: + if motion_settings.improve_contrast: logger.info(f"Turning off improve contrast for {camera_name}") - self.camera_metrics[camera_name][ - "improve_contrast_enabled" - ].value = False motion_settings.improve_contrast = False # type: ignore[union-attr] + self.config_updater.publish(f"config/motion/{camera_name}", motion_settings) self.publish(f"{camera_name}/improve_contrast/state", payload, retain=True) def _on_ptz_autotracker_command(self, camera_name: str, payload: str) -> None: @@ -217,8 +218,8 @@ class Dispatcher: motion_settings = self.config.cameras[camera_name].motion logger.info(f"Setting motion contour area for {camera_name}: {payload}") - self.camera_metrics[camera_name]["motion_contour_area"].value = payload motion_settings.contour_area = payload # type: ignore[union-attr] + self.config_updater.publish(f"config/motion/{camera_name}", motion_settings) self.publish(f"{camera_name}/motion_contour_area/state", payload, retain=True) def _on_motion_threshold_command(self, camera_name: str, payload: int) -> None: @@ -231,8 +232,8 @@ class Dispatcher: motion_settings = self.config.cameras[camera_name].motion logger.info(f"Setting motion threshold for {camera_name}: {payload}") - self.camera_metrics[camera_name]["motion_threshold"].value = payload motion_settings.threshold = payload # type: ignore[union-attr] + self.config_updater.publish(f"config/motion/{camera_name}", motion_settings) self.publish(f"{camera_name}/motion_threshold/state", payload, retain=True) def _on_audio_command(self, camera_name: str, payload: str) -> None: @@ -249,13 +250,12 @@ class Dispatcher: if not audio_settings.enabled: logger.info(f"Turning on audio detection for {camera_name}") audio_settings.enabled = True - self.feature_metrics[camera_name]["audio_enabled"].value = True elif payload == "OFF": - if self.feature_metrics[camera_name]["audio_enabled"].value: + if audio_settings.enabled: logger.info(f"Turning off audio detection for {camera_name}") audio_settings.enabled = False - self.feature_metrics[camera_name]["audio_enabled"].value = False + self.config_updater.publish(f"config/audio/{camera_name}", audio_settings) self.publish(f"{camera_name}/audio/state", payload, retain=True) def _on_recordings_command(self, camera_name: str, payload: str) -> None: @@ -272,13 +272,12 @@ class Dispatcher: if not record_settings.enabled: logger.info(f"Turning on recordings for {camera_name}") record_settings.enabled = True - self.feature_metrics[camera_name]["record_enabled"].value = True elif payload == "OFF": - if self.feature_metrics[camera_name]["record_enabled"].value: + if record_settings.enabled: logger.info(f"Turning off recordings for {camera_name}") record_settings.enabled = False - self.feature_metrics[camera_name]["record_enabled"].value = False + self.config_updater.publish(f"config/record/{camera_name}", record_settings) self.publish(f"{camera_name}/recordings/state", payload, retain=True) def _on_snapshots_command(self, camera_name: str, payload: str) -> None: @@ -316,17 +315,16 @@ class Dispatcher: birdseye_settings = self.config.cameras[camera_name].birdseye if payload == "ON": - if not self.camera_metrics[camera_name]["birdseye_enabled"].value: + if not birdseye_settings.enabled: logger.info(f"Turning on birdseye for {camera_name}") - self.camera_metrics[camera_name]["birdseye_enabled"].value = True birdseye_settings.enabled = True elif payload == "OFF": - if self.camera_metrics[camera_name]["birdseye_enabled"].value: + if birdseye_settings.enabled: logger.info(f"Turning off birdseye for {camera_name}") - self.camera_metrics[camera_name]["birdseye_enabled"].value = False birdseye_settings.enabled = False + self.config_updater.publish(f"config/birdseye/{camera_name}", birdseye_settings) self.publish(f"{camera_name}/birdseye/state", payload, retain=True) def _on_birdseye_mode_command(self, camera_name: str, payload: str) -> None: @@ -336,17 +334,16 @@ class Dispatcher: logger.info(f"Invalid birdseye_mode command: {payload}") return - birdseye_config = self.config.cameras[camera_name].birdseye - if not birdseye_config.enabled: + birdseye_settings = self.config.cameras[camera_name].birdseye + + if not birdseye_settings.enabled: logger.info(f"Birdseye mode not enabled for {camera_name}") return - new_birdseye_mode = BirdseyeModeEnum(payload.lower()) - logger.info(f"Setting birdseye mode for {camera_name} to {new_birdseye_mode}") - - # update the metric (need the mode converted to an int) - self.camera_metrics[camera_name][ - "birdseye_mode" - ].value = BirdseyeModeEnum.get_index(new_birdseye_mode) + birdseye_settings.mode = BirdseyeModeEnum(payload.lower()) + logger.info( + f"Setting birdseye mode for {camera_name} to {birdseye_settings.mode}" + ) + self.config_updater.publish(f"config/birdseye/{camera_name}", birdseye_settings) self.publish(f"{camera_name}/birdseye_mode/state", payload, retain=True) diff --git a/frigate/comms/inter_process.py b/frigate/comms/inter_process.py index c312bf869..deec77a40 100644 --- a/frigate/comms/inter_process.py +++ b/frigate/comms/inter_process.py @@ -1,7 +1,6 @@ """Facilitates communication between processes.""" import multiprocessing as mp -import os import threading from multiprocessing.synchronize import Event as MpEvent from typing import Callable @@ -9,17 +8,15 @@ from typing import Callable import zmq from frigate.comms.dispatcher import Communicator -from frigate.const import PORT_INTER_PROCESS_COMM + +SOCKET_REP_REQ = "ipc:///tmp/cache/comms" class InterProcessCommunicator(Communicator): def __init__(self) -> None: - INTER_PROCESS_COMM_PORT = ( - os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM - ) self.context = zmq.Context() self.socket = self.context.socket(zmq.REP) - self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}") + self.socket.bind(SOCKET_REP_REQ) self.stop_event: MpEvent = mp.Event() def publish(self, topic: str, payload: str, retain: bool) -> None: @@ -32,8 +29,13 @@ class InterProcessCommunicator(Communicator): self.reader_thread.start() def read(self) -> None: - while not self.stop_event.wait(0.5): + while not self.stop_event.is_set(): while True: # load all messages that are queued + has_message, _, _ = zmq.select([self.socket], [], [], 1) + + if not has_message: + break + try: (topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK) @@ -57,10 +59,9 @@ class InterProcessRequestor: """Simplifies sending data to InterProcessCommunicator and getting a reply.""" def __init__(self) -> None: - port = os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) - self.socket.connect(f"tcp://127.0.0.1:{port}") + self.socket.connect(SOCKET_REP_REQ) def send_data(self, topic: str, data: any) -> any: """Sends data and then waits for reply.""" diff --git a/frigate/config.py b/frigate/config.py index 5ab51b026..3e240b9c8 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -300,6 +300,7 @@ class RecordConfig(FrigateBaseModel): class MotionConfig(FrigateBaseModel): + enabled: bool = Field(default=True, title="Enable motion on all cameras.") threshold: int = Field( default=30, title="Motion detection threshold (1-255).", @@ -321,6 +322,9 @@ class MotionConfig(FrigateBaseModel): default=30, title="Delay for updating MQTT with no motion detected.", ) + enabled_in_config: Optional[bool] = Field( + title="Keep track of original state of motion detection." + ) class RuntimeMotionConfig(MotionConfig): @@ -1041,6 +1045,14 @@ def verify_autotrack_zones(camera_config: CameraConfig) -> ValueError | None: ) +def verify_motion_and_detect(camera_config: CameraConfig) -> ValueError | None: + """Verify that required_zones are specified when autotracking is enabled.""" + if camera_config.detect.enabled and not camera_config.motion.enabled: + raise ValueError( + f"Camera {camera_config.name} has motion detection disabled and object detection enabled but object detection requires motion detection." + ) + + class FrigateConfig(FrigateBaseModel): mqtt: MqttConfig = Field(title="MQTT Configuration.") database: DatabaseConfig = Field( @@ -1202,8 +1214,8 @@ class FrigateConfig(FrigateBaseModel): **FRIGATE_ENV_VARS ) # set config pre-value - camera_config.record.enabled_in_config = camera_config.record.enabled camera_config.audio.enabled_in_config = camera_config.audio.enabled + camera_config.record.enabled_in_config = camera_config.record.enabled camera_config.onvif.autotracking.enabled_in_config = ( camera_config.onvif.autotracking.enabled ) @@ -1250,6 +1262,7 @@ class FrigateConfig(FrigateBaseModel): raw_mask=camera_config.motion.mask, **camera_config.motion.dict(exclude_unset=True), ) + camera_config.motion.enabled_in_config = camera_config.motion.enabled # Set live view stream if none is set if not camera_config.live.stream_name: @@ -1261,6 +1274,7 @@ class FrigateConfig(FrigateBaseModel): verify_recording_segments_setup_with_reasonable_time(camera_config) verify_zone_objects_are_tracked(camera_config) verify_autotrack_zones(camera_config) + verify_motion_and_detect(camera_config) # generate the ffmpeg commands camera_config.create_ffmpeg_cmds() diff --git a/frigate/const.py b/frigate/const.py index 158996116..73f66af2f 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -57,10 +57,6 @@ DRIVER_AMD = "radeonsi" DRIVER_INTEL_i965 = "i965" DRIVER_INTEL_iHD = "iHD" -# Ports - -PORT_INTER_PROCESS_COMM = 4892 - # Record Values CACHE_SEGMENT_FORMAT = "%Y%m%d%H%M%S%z" diff --git a/frigate/events/audio.py b/frigate/events/audio.py index a28b00e68..c1b38cfb4 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -13,6 +13,8 @@ import numpy as np import requests from setproctitle import setproctitle +from frigate.comms.config_updater import ConfigSubscriber +from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig from frigate.const import ( @@ -26,7 +28,7 @@ from frigate.const import ( from frigate.ffmpeg_presets import parse_preset_input from frigate.log import LogPipe from frigate.object_detection import load_labels -from frigate.types import CameraMetricsTypes, FeatureMetricsTypes +from frigate.types import CameraMetricsTypes from frigate.util.builtin import get_ffmpeg_arg_list from frigate.util.services import listen from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg @@ -67,9 +69,7 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: def listen_to_audio( config: FrigateConfig, - recordings_info_queue: mp.Queue, camera_metrics: dict[str, CameraMetricsTypes], - process_info: dict[str, FeatureMetricsTypes], ) -> None: stop_event = mp.Event() audio_threads: list[threading.Thread] = [] @@ -95,9 +95,7 @@ def listen_to_audio( if camera.enabled and camera.audio.enabled_in_config: audio = AudioEventMaintainer( camera, - recordings_info_queue, camera_metrics, - process_info, stop_event, ) audio_threads.append(audio) @@ -168,17 +166,13 @@ class AudioEventMaintainer(threading.Thread): def __init__( self, camera: CameraConfig, - recordings_info_queue: mp.Queue, camera_metrics: dict[str, CameraMetricsTypes], - feature_metrics: dict[str, FeatureMetricsTypes], stop_event: mp.Event, ) -> None: threading.Thread.__init__(self) self.name = f"{camera.name}_audio_event_processor" self.config = camera - self.recordings_info_queue = recordings_info_queue self.camera_metrics = camera_metrics - self.feature_metrics = feature_metrics self.detections: dict[dict[str, any]] = {} self.stop_event = stop_event self.detector = AudioTfl(stop_event, self.config.audio.num_threads) @@ -191,9 +185,11 @@ class AudioEventMaintainer(threading.Thread): # create communication for audio detections self.requestor = InterProcessRequestor() + self.config_subscriber = ConfigSubscriber(f"config/audio/{camera.name}") + self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) def detect_audio(self, audio) -> None: - if not self.feature_metrics[self.config.name]["audio_enabled"].value: + if not self.config.audio.enabled or self.stop_event.is_set(): return audio_as_float = audio.astype(np.float32) @@ -221,8 +217,8 @@ class AudioEventMaintainer(threading.Thread): self.handle_detection(label, score) audio_detections.append(label) - # add audio info to recordings queue - self.recordings_info_queue.put( + # send audio detection data + self.detection_publisher.send_data( ( self.config.name, datetime.datetime.now().timestamp(), @@ -339,8 +335,19 @@ class AudioEventMaintainer(threading.Thread): self.start_or_restart_ffmpeg() while not self.stop_event.is_set(): + # check if there is an updated config + ( + updated_topic, + updated_audio_config, + ) = self.config_subscriber.check_for_update() + + if updated_topic: + self.config.audio = updated_audio_config + self.read_audio() stop_ffmpeg(self.audio_listener, self.logger) self.logpipe.close() self.requestor.stop() + self.config_subscriber.stop() + self.detection_publisher.stop() diff --git a/frigate/motion/__init__.py b/frigate/motion/__init__.py index 248c37092..db5f25879 100644 --- a/frigate/motion/__init__.py +++ b/frigate/motion/__init__.py @@ -24,3 +24,7 @@ class MotionDetector(ABC): @abstractmethod def is_calibrating(self): pass + + @abstractmethod + def stop(self): + pass diff --git a/frigate/motion/improved_motion.py b/frigate/motion/improved_motion.py index 603d8fda4..e09c1547f 100644 --- a/frigate/motion/improved_motion.py +++ b/frigate/motion/improved_motion.py @@ -5,6 +5,7 @@ import imutils import numpy as np from scipy.ndimage import gaussian_filter +from frigate.comms.config_updater import ConfigSubscriber from frigate.config import MotionConfig from frigate.motion import MotionDetector @@ -17,9 +18,6 @@ class ImprovedMotionDetector(MotionDetector): frame_shape, config: MotionConfig, fps: int, - improve_contrast, - threshold, - contour_area, name="improved", blur_radius=1, interpolation=cv2.INTER_NEAREST, @@ -44,14 +42,12 @@ class ImprovedMotionDetector(MotionDetector): self.mask = np.where(resized_mask == [0]) self.save_images = False self.calibrating = True - self.improve_contrast = improve_contrast - self.threshold = threshold - self.contour_area = contour_area self.blur_radius = blur_radius self.interpolation = interpolation self.contrast_values = np.zeros((contrast_frame_history, 2), np.uint8) self.contrast_values[:, 1:2] = 255 self.contrast_values_index = 0 + self.config_subscriber = ConfigSubscriber(f"config/motion/{name}") def is_calibrating(self): return self.calibrating @@ -59,6 +55,15 @@ class ImprovedMotionDetector(MotionDetector): def detect(self, frame): motion_boxes = [] + # check for updated motion config + _, updated_motion_config = self.config_subscriber.check_for_update() + + if updated_motion_config: + self.config = updated_motion_config + + if not self.config.enabled: + return motion_boxes + gray = frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]] # resize frame @@ -72,7 +77,7 @@ class ImprovedMotionDetector(MotionDetector): resized_saved = resized_frame.copy() # Improve contrast - if self.improve_contrast.value: + if self.config.improve_contrast: # TODO tracking moving average of min/max to avoid sudden contrast changes minval = np.percentile(resized_frame, 4).astype(np.uint8) maxval = np.percentile(resized_frame, 96).astype(np.uint8) @@ -110,7 +115,7 @@ class ImprovedMotionDetector(MotionDetector): # compute the threshold image for the current frame thresh = cv2.threshold( - frameDelta, self.threshold.value, 255, cv2.THRESH_BINARY + frameDelta, self.config.threshold, 255, cv2.THRESH_BINARY )[1] # dilate the thresholded image to fill in holes, then find contours @@ -127,7 +132,7 @@ class ImprovedMotionDetector(MotionDetector): # if the contour is big enough, count it as motion contour_area = cv2.contourArea(c) total_contour_area += contour_area - if contour_area > self.contour_area.value: + if contour_area > self.config.contour_area: x, y, w, h = cv2.boundingRect(c) motion_boxes.append( ( @@ -170,9 +175,11 @@ class ImprovedMotionDetector(MotionDetector): ] cv2.imwrite( f"debug/frames/{self.name}-{self.frame_counter}.jpg", - cv2.hconcat(frames) - if self.frame_shape[0] > self.frame_shape[1] - else cv2.vconcat(frames), + ( + cv2.hconcat(frames) + if self.frame_shape[0] > self.frame_shape[1] + else cv2.vconcat(frames) + ), ) if len(motion_boxes) > 0: @@ -194,3 +201,7 @@ class ImprovedMotionDetector(MotionDetector): self.motion_frame_count = 0 return motion_boxes + + def stop(self) -> None: + """stop the motion detector.""" + self.config_subscriber.stop() diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 2d141b970..68cac4ec0 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -12,6 +12,7 @@ from typing import Callable import cv2 import numpy as np +from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.dispatcher import Dispatcher from frigate.config import ( CameraConfig, @@ -813,8 +814,6 @@ class TrackedObjectProcessor(threading.Thread): tracked_objects_queue, event_queue, event_processed_queue, - video_output_queue, - recordings_info_queue, ptz_autotracker_thread, stop_event, ): @@ -825,13 +824,12 @@ class TrackedObjectProcessor(threading.Thread): self.tracked_objects_queue = tracked_objects_queue self.event_queue = event_queue self.event_processed_queue = event_processed_queue - self.video_output_queue = video_output_queue - self.recordings_info_queue = recordings_info_queue self.stop_event = stop_event self.camera_states: dict[str, CameraState] = {} self.frame_manager = SharedMemoryFrameManager() self.last_motion_detected: dict[str, float] = {} self.ptz_autotracker_thread = ptz_autotracker_thread + self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video) def start(camera, obj: TrackedObject, current_frame_time): self.event_queue.put( @@ -1116,18 +1114,8 @@ class TrackedObjectProcessor(threading.Thread): o.to_dict() for o in camera_state.tracked_objects.values() ] - self.video_output_queue.put( - ( - camera, - frame_time, - tracked_objects, - motion_boxes, - regions, - ) - ) - - # send info on this frame to the recordings maintainer - self.recordings_info_queue.put( + # publish info on this frame + self.detection_publisher.send_data( ( camera, frame_time, @@ -1212,4 +1200,5 @@ class TrackedObjectProcessor(threading.Thread): event_id, camera = self.event_processed_queue.get() self.camera_states[camera].finished(event_id) + self.detection_publisher.stop() logger.info("Exiting object processor...") diff --git a/frigate/output/birdseye.py b/frigate/output/birdseye.py index ff3b7521f..99ff3a6fa 100644 --- a/frigate/output/birdseye.py +++ b/frigate/output/birdseye.py @@ -14,9 +14,9 @@ import traceback import cv2 import numpy as np +from frigate.comms.config_updater import ConfigSubscriber from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.const import BASE_DIR, BIRDSEYE_PIPE -from frigate.types import CameraMetricsTypes from frigate.util.image import ( SharedMemoryFrameManager, copy_yuv_to_position, @@ -267,7 +267,6 @@ class BirdsEyeFrameManager: config: FrigateConfig, frame_manager: SharedMemoryFrameManager, stop_event: mp.Event, - camera_metrics: dict[str, CameraMetricsTypes], ): self.config = config self.mode = config.birdseye.mode @@ -278,7 +277,6 @@ class BirdsEyeFrameManager: self.frame = np.ndarray(self.yuv_shape, dtype=np.uint8) self.canvas = Canvas(width, height, config.birdseye.layout.scaling_factor) self.stop_event = stop_event - self.camera_metrics = camera_metrics self.inactivity_threshold = config.birdseye.inactivity_threshold if config.birdseye.layout.max_cameras: @@ -435,10 +433,7 @@ class BirdsEyeFrameManager: # check if we need to reset the layout because there is a different number of cameras if len(self.active_cameras) - len(active_cameras) == 0: - if ( - len(self.active_cameras) == 1 - and self.active_cameras[0] == active_cameras[0] - ): + if len(self.active_cameras) == 1 and self.active_cameras != active_cameras: reset_layout = True elif max_camera_refresh: reset_layout = True @@ -675,15 +670,12 @@ class BirdsEyeFrameManager: def update(self, camera, object_count, motion_count, frame_time, frame) -> bool: # don't process if birdseye is disabled for this camera camera_config = self.config.cameras[camera].birdseye + if not camera_config.enabled: return False - # get our metrics (sync'd across processes) - # which allows us to control it via mqtt (or any other dispatcher) - camera_metrics = self.camera_metrics[camera] - # disabling birdseye is a little tricky - if not camera_metrics["birdseye_enabled"].value: + if not camera_config.enabled: # if we've rendered a frame (we have a value for last_active_frame) # then we need to set it to zero if self.cameras[camera]["last_active_frame"] > 0: @@ -691,12 +683,9 @@ class BirdsEyeFrameManager: return False - # get the birdseye mode state from camera metrics - birdseye_mode = BirdseyeModeEnum.get(camera_metrics["birdseye_mode"].value) - # update the last active frame for the camera self.cameras[camera]["current_frame"] = frame_time - if self.camera_active(birdseye_mode, object_count, motion_count): + if self.camera_active(camera_config.mode, object_count, motion_count): self.cameras[camera]["last_active_frame"] = frame_time now = datetime.datetime.now().timestamp() @@ -725,7 +714,6 @@ class Birdseye: self, config: FrigateConfig, frame_manager: SharedMemoryFrameManager, - camera_metrics: dict[str, CameraMetricsTypes], stop_event: mp.Event, websocket_server, ) -> None: @@ -745,9 +733,8 @@ class Birdseye: self.broadcaster = BroadcastThread( "birdseye", self.converter, websocket_server, stop_event ) - self.birdseye_manager = BirdsEyeFrameManager( - config, frame_manager, stop_event, camera_metrics - ) + self.birdseye_manager = BirdsEyeFrameManager(config, frame_manager, stop_event) + self.config_subscriber = ConfigSubscriber("config/birdseye/") if config.birdseye.restream: self.birdseye_buffer = frame_manager.create( @@ -766,6 +753,19 @@ class Birdseye: frame_time: float, frame, ) -> None: + # check if there is an updated config + while True: + ( + updated_topic, + updated_birdseye_config, + ) = self.config_subscriber.check_for_update() + + if not updated_topic: + break + + camera_name = updated_topic.rpartition("/")[-1] + self.config.cameras[camera_name].birdseye = updated_birdseye_config + if self.birdseye_manager.update( camera, len([o for o in current_tracked_objects if not o["stationary"]]), @@ -785,5 +785,6 @@ class Birdseye: pass def stop(self) -> None: + self.config_subscriber.stop() self.converter.join() self.broadcaster.join() diff --git a/frigate/output/output.py b/frigate/output/output.py index efcff4b7c..e717463b1 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -2,7 +2,6 @@ import logging import multiprocessing as mp -import queue import signal import threading from typing import Optional @@ -16,12 +15,12 @@ from ws4py.server.wsgirefserver import ( ) from ws4py.server.wsgiutils import WebSocketWSGIApplication +from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.ws import WebSocket from frigate.config import FrigateConfig from frigate.output.birdseye import Birdseye from frigate.output.camera import JsmpegCamera from frigate.output.preview import PreviewRecorder -from frigate.types import CameraMetricsTypes from frigate.util.image import SharedMemoryFrameManager logger = logging.getLogger(__name__) @@ -29,8 +28,6 @@ logger = logging.getLogger(__name__) def output_frames( config: FrigateConfig, - video_output_queue: mp.Queue, - camera_metrics: dict[str, CameraMetricsTypes], ): threading.current_thread().name = "output" setproctitle("frigate.output") @@ -58,6 +55,8 @@ def output_frames( websocket_server.initialize_websockets_manager() websocket_thread = threading.Thread(target=websocket_server.serve_forever) + detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) + jsmpeg_cameras: dict[str, JsmpegCamera] = {} birdseye: Optional[Birdseye] = None preview_recorders: dict[str, PreviewRecorder] = {} @@ -70,24 +69,24 @@ def output_frames( preview_recorders[camera] = PreviewRecorder(cam_config) if config.birdseye.enabled: - birdseye = Birdseye( - config, frame_manager, camera_metrics, stop_event, websocket_server - ) + birdseye = Birdseye(config, frame_manager, stop_event, websocket_server) websocket_thread.start() while not stop_event.is_set(): - try: - ( - camera, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = video_output_queue.get(True, 1) - except queue.Empty: + (topic, data) = detection_subscriber.get_data(timeout=10) + + if not topic: continue + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data + frame_id = f"{camera}{frame_time}" frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) @@ -126,19 +125,26 @@ def output_frames( previous_frames[camera] = frame_time - while not video_output_queue.empty(): + while True: + (topic, data) = detection_subscriber.get_data(timeout=0) + + if not topic: + break + ( camera, frame_time, current_tracked_objects, motion_boxes, regions, - ) = video_output_queue.get(True, 10) + ) = data frame_id = f"{camera}{frame_time}" frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame_manager.delete(frame_id) + detection_subscriber.stop() + for jsmpeg in jsmpeg_cameras.values(): jsmpeg.stop() diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index a67d84c16..2715dec89 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -3,9 +3,7 @@ import asyncio import datetime import logging -import multiprocessing as mp import os -import queue import random import string import threading @@ -17,6 +15,8 @@ from typing import Any, Optional, Tuple import numpy as np import psutil +from frigate.comms.config_updater import ConfigSubscriber +from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import ( @@ -28,7 +28,6 @@ from frigate.const import ( RECORD_DIR, ) from frigate.models import Event, Recordings -from frigate.types import FeatureMetricsTypes from frigate.util.image import area from frigate.util.services import get_video_properties @@ -57,24 +56,16 @@ class SegmentInfo: class RecordingMaintainer(threading.Thread): - def __init__( - self, - config: FrigateConfig, - object_recordings_info_queue: mp.Queue, - audio_recordings_info_queue: Optional[mp.Queue], - process_info: dict[str, FeatureMetricsTypes], - stop_event: MpEvent, - ): + def __init__(self, config: FrigateConfig, stop_event: MpEvent): threading.Thread.__init__(self) self.name = "recording_maintainer" self.config = config # create communication for retained recordings self.requestor = InterProcessRequestor() + self.config_subscriber = ConfigSubscriber("config/record/") + self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) - self.object_recordings_info_queue = object_recordings_info_queue - self.audio_recordings_info_queue = audio_recordings_info_queue - self.process_info = process_info self.stop_event = stop_event self.object_recordings_info: dict[str, list] = defaultdict(list) self.audio_recordings_info: dict[str, list] = defaultdict(list) @@ -200,7 +191,7 @@ class RecordingMaintainer(threading.Thread): # Just delete files if recordings are turned off if ( camera not in self.config.cameras - or not self.process_info[camera]["record_enabled"].value + or not self.config.cameras[camera].record.enabled ): Path(cache_path).unlink(missing_ok=True) self.end_time_cache.pop(cache_path, None) @@ -437,30 +428,45 @@ class RecordingMaintainer(threading.Thread): return None def run(self) -> None: - camera_count = sum(camera.enabled for camera in self.config.cameras.values()) # Check for new files every 5 seconds wait_time = 0.0 while not self.stop_event.wait(wait_time): run_start = datetime.datetime.now().timestamp() + + # check if there is an updated config + while True: + ( + updated_topic, + updated_record_config, + ) = self.config_subscriber.check_for_update() + + if not updated_topic: + break + + camera_name = updated_topic.rpartition("/")[-1] + self.config.cameras[camera_name].record = updated_record_config + stale_frame_count = 0 stale_frame_count_threshold = 10 # empty the object recordings info queue while True: - try: + (topic, data) = self.detection_subscriber.get_data( + timeout=QUEUE_READ_TIMEOUT + ) + + if not topic: + break + + if topic == DetectionTypeEnum.video: ( camera, frame_time, current_tracked_objects, motion_boxes, regions, - ) = self.object_recordings_info_queue.get( - True, timeout=QUEUE_READ_TIMEOUT - ) + ) = data - if frame_time < run_start - stale_frame_count_threshold: - stale_frame_count += 1 - - if self.process_info[camera]["record_enabled"].value: + if self.config.cameras[camera].record.enabled: self.object_recordings_info[camera].append( ( frame_time, @@ -469,56 +475,29 @@ class RecordingMaintainer(threading.Thread): regions, ) ) - except queue.Empty: - q_size = self.object_recordings_info_queue.qsize() - if q_size > camera_count: - logger.debug( - f"object_recordings_info loop queue not empty ({q_size})." + elif topic == DetectionTypeEnum.audio: + ( + camera, + frame_time, + dBFS, + audio_detections, + ) = data + + if self.config.cameras[camera].record.enabled: + self.audio_recordings_info[camera].append( + ( + frame_time, + dBFS, + audio_detections, + ) ) - break + + if frame_time < run_start - stale_frame_count_threshold: + stale_frame_count += 1 if stale_frame_count > 0: logger.debug(f"Found {stale_frame_count} old frames.") - # empty the audio recordings info queue if audio is enabled - if self.audio_recordings_info_queue: - stale_frame_count = 0 - - while True: - try: - ( - camera, - frame_time, - dBFS, - audio_detections, - ) = self.audio_recordings_info_queue.get( - True, timeout=QUEUE_READ_TIMEOUT - ) - - if frame_time < run_start - stale_frame_count_threshold: - stale_frame_count += 1 - - if self.process_info[camera]["record_enabled"].value: - self.audio_recordings_info[camera].append( - ( - frame_time, - dBFS, - audio_detections, - ) - ) - except queue.Empty: - q_size = self.audio_recordings_info_queue.qsize() - if q_size > camera_count: - logger.debug( - f"object_recordings_info loop audio queue not empty ({q_size})." - ) - break - - if stale_frame_count > 0: - logger.error( - f"Found {stale_frame_count} old audio frames, segments from recordings may be missing" - ) - try: asyncio.run(self.move_files()) except Exception as e: @@ -530,4 +509,6 @@ class RecordingMaintainer(threading.Thread): wait_time = max(0, 5 - duration) self.requestor.stop() + self.config_subscriber.stop() + self.detection_subscriber.stop() logger.info("Exiting recording maintenance...") diff --git a/frigate/record/record.py b/frigate/record/record.py index 8fc2ed2b0..1ffd5394d 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -13,18 +13,12 @@ from setproctitle import setproctitle from frigate.config import FrigateConfig from frigate.models import Event, Recordings from frigate.record.maintainer import RecordingMaintainer -from frigate.types import FeatureMetricsTypes from frigate.util.services import listen logger = logging.getLogger(__name__) -def manage_recordings( - config: FrigateConfig, - object_recordings_info_queue: mp.Queue, - audio_recordings_info_queue: mp.Queue, - process_info: dict[str, FeatureMetricsTypes], -) -> None: +def manage_recordings(config: FrigateConfig) -> None: stop_event = mp.Event() def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: @@ -51,9 +45,6 @@ def manage_recordings( maintainer = RecordingMaintainer( config, - object_recordings_info_queue, - audio_recordings_info_queue, - process_info, stop_event, ) maintainer.start() diff --git a/frigate/stats.py b/frigate/stats.py index addcd4a5b..c542e8022 100644 --- a/frigate/stats.py +++ b/frigate/stats.py @@ -265,7 +265,7 @@ def stats_snapshot( "process_fps": round(camera_stats["process_fps"].value, 2), "skipped_fps": round(camera_stats["skipped_fps"].value, 2), "detection_fps": round(camera_stats["detection_fps"].value, 2), - "detection_enabled": camera_stats["detection_enabled"].value, + "detection_enabled": config.cameras[name].detect.enabled, "pid": pid, "capture_pid": cpid, "ffmpeg_pid": ffmpeg_pid, diff --git a/frigate/types.py b/frigate/types.py index 4963b13db..e93391fb0 100644 --- a/frigate/types.py +++ b/frigate/types.py @@ -10,23 +10,16 @@ from frigate.object_detection import ObjectDetectProcess class CameraMetricsTypes(TypedDict): camera_fps: Synchronized capture_process: Optional[Process] - detection_enabled: Synchronized detection_fps: Synchronized detection_frame: Synchronized ffmpeg_pid: Synchronized frame_queue: Queue - motion_enabled: Synchronized - improve_contrast_enabled: Synchronized - motion_threshold: Synchronized - motion_contour_area: Synchronized process: Optional[Process] process_fps: Synchronized read_start: Synchronized skipped_fps: Synchronized audio_rms: Synchronized audio_dBFS: Synchronized - birdseye_enabled: Synchronized - birdseye_mode: Synchronized class PTZMetricsTypes(TypedDict): @@ -42,11 +35,6 @@ class PTZMetricsTypes(TypedDict): ptz_min_zoom: Synchronized -class FeatureMetricsTypes(TypedDict): - audio_enabled: Synchronized - record_enabled: Synchronized - - class StatsTrackingTypes(TypedDict): camera_metrics: dict[str, CameraMetricsTypes] detectors: dict[str, ObjectDetectProcess] diff --git a/frigate/video.py b/frigate/video.py index 774da4c99..6d6230de7 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -11,6 +11,7 @@ import time import cv2 from setproctitle import setproctitle +from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.const import ( @@ -406,11 +407,6 @@ def track_camera( listen() frame_queue = process_info["frame_queue"] - detection_enabled = process_info["detection_enabled"] - motion_enabled = process_info["motion_enabled"] - improve_contrast_enabled = process_info["improve_contrast_enabled"] - motion_threshold = process_info["motion_threshold"] - motion_contour_area = process_info["motion_contour_area"] frame_shape = config.frame_shape objects_to_track = config.objects.track @@ -420,9 +416,6 @@ def track_camera( frame_shape, config.motion, config.detect.fps, - improve_contrast_enabled, - motion_threshold, - motion_contour_area, ) object_detector = RemoteObjectDetector( name, labelmap, detection_queue, result_connection, model_config, stop_event @@ -450,8 +443,6 @@ def track_camera( process_info, objects_to_track, object_filters, - detection_enabled, - motion_enabled, stop_event, ptz_metrics, region_grid, @@ -519,8 +510,6 @@ def process_frames( process_info: dict, objects_to_track: list[str], object_filters, - detection_enabled: mp.Value, - motion_enabled: mp.Value, stop_event, ptz_metrics: PTZMetricsTypes, region_grid, @@ -530,6 +519,7 @@ def process_frames( detection_fps = process_info["detection_fps"] current_frame_time = process_info["detection_frame"] next_region_update = get_tomorrow_at_time(2) + config_subscriber = ConfigSubscriber(f"config/detect/{camera_name}") fps_tracker = EventsPerSecond() fps_tracker.start() @@ -540,6 +530,12 @@ def process_frames( region_min_size = get_min_region_size(model_config) while not stop_event.is_set(): + # check for updated detect config + _, updated_detect_config = config_subscriber.check_for_update() + + if updated_detect_config: + detect_config = updated_detect_config + if ( datetime.datetime.now().astimezone(datetime.timezone.utc) > next_region_update @@ -570,13 +566,13 @@ def process_frames( continue # look for motion if enabled - motion_boxes = motion_detector.detect(frame) if motion_enabled.value else [] + motion_boxes = motion_detector.detect(frame) regions = [] consolidated_detections = [] # if detection is disabled - if not detection_enabled.value: + if not detect_config.enabled: object_tracker.match_and_update(frame_time, []) else: # get stationary object ids @@ -821,4 +817,6 @@ def process_frames( detection_fps.value = object_detector.fps.eps() frame_manager.close(f"{camera_name}{frame_time}") + motion_detector.stop() requestor.stop() + config_subscriber.stop()