diff --git a/frigate/app.py b/frigate/app.py index 703877f6d..1b78181ff 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -18,7 +18,6 @@ from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.camera import CameraMetrics, PTZMetrics from frigate.comms.base_communicator import Communicator -from frigate.comms.config_updater import ConfigPublisher from frigate.comms.dispatcher import Dispatcher from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.inter_process import InterProcessCommunicator @@ -26,6 +25,7 @@ from frigate.comms.mqtt import MqttClient from frigate.comms.webpush import WebPushClient from frigate.comms.ws import WebSocketClient from frigate.comms.zmq_proxy import ZmqProxy +from frigate.config.camera.updater import CameraConfigUpdatePublisher from frigate.config.config import FrigateConfig from frigate.const import ( CACHE_DIR, @@ -319,7 +319,7 @@ class FrigateApp: def init_inter_process_communicator(self) -> None: self.inter_process_communicator = InterProcessCommunicator() - self.inter_config_updater = ConfigPublisher() + self.inter_config_updater = CameraConfigUpdatePublisher() self.event_metadata_updater = EventMetadataPublisher() self.inter_zmq_proxy = ZmqProxy() @@ -479,7 +479,7 @@ class FrigateApp: capture_process = util.Process( target=capture_camera, name=f"camera_capture:{name}", - args=(name, config, shm_frame_count, self.camera_metrics[name]), + args=(config, shm_frame_count, self.camera_metrics[name]), ) capture_process.daemon = True self.camera_metrics[name].capture_process = capture_process diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 87891ec88..33f3ec158 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -8,9 +8,13 @@ from typing import Any, Callable, Optional from frigate.camera import PTZMetrics from frigate.camera.activity_manager import CameraActivityManager from frigate.comms.base_communicator import Communicator -from frigate.comms.config_updater import ConfigPublisher from frigate.comms.webpush import WebPushClient from frigate.config import BirdseyeModeEnum, FrigateConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdatePublisher, + CameraConfigUpdateTopic, +) from frigate.const import ( CLEAR_ONGOING_REVIEW_SEGMENTS, INSERT_MANY_RECORDINGS, @@ -38,7 +42,7 @@ class Dispatcher: def __init__( self, config: FrigateConfig, - config_updater: ConfigPublisher, + config_updater: CameraConfigUpdatePublisher, onvif: OnvifController, ptz_metrics: dict[str, PTZMetrics], communicators: list[Communicator], @@ -273,8 +277,11 @@ class Dispatcher: f"Turning on motion for {camera_name} due to detection being enabled." ) motion_settings.enabled = True - self.config_updater.publish( - f"config/motion/{camera_name}", motion_settings + self.config_updater.publish_update( + CameraConfigUpdateTopic( + CameraConfigUpdateEnum.motion, camera_name + ), + motion_settings, ) self.publish(f"{camera_name}/motion/state", payload, retain=True) elif payload == "OFF": @@ -303,7 +310,10 @@ class Dispatcher: logger.info(f"Turning off camera {camera_name}") camera_settings.enabled = False - self.config_updater.publish(f"config/enabled/{camera_name}", camera_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.enabled, camera_name), + camera_settings.enabled, + ) self.publish(f"{camera_name}/enabled/state", payload, retain=True) def _on_motion_command(self, camera_name: str, payload: str) -> None: @@ -326,7 +336,10 @@ class Dispatcher: logger.info(f"Turning off motion for {camera_name}") motion_settings.enabled = False - self.config_updater.publish(f"config/motion/{camera_name}", motion_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name), + motion_settings, + ) self.publish(f"{camera_name}/motion/state", payload, retain=True) def _on_motion_improve_contrast_command( @@ -344,7 +357,10 @@ class Dispatcher: logger.info(f"Turning off improve contrast for {camera_name}") motion_settings.improve_contrast = False # type: ignore[union-attr] - self.config_updater.publish(f"config/motion/{camera_name}", motion_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name), + motion_settings, + ) self.publish(f"{camera_name}/improve_contrast/state", payload, retain=True) def _on_ptz_autotracker_command(self, camera_name: str, payload: str) -> None: @@ -384,7 +400,10 @@ class Dispatcher: motion_settings = self.config.cameras[camera_name].motion logger.info(f"Setting motion contour area for {camera_name}: {payload}") motion_settings.contour_area = payload # type: ignore[union-attr] - self.config_updater.publish(f"config/motion/{camera_name}", motion_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name), + motion_settings, + ) self.publish(f"{camera_name}/motion_contour_area/state", payload, retain=True) def _on_motion_threshold_command(self, camera_name: str, payload: int) -> None: @@ -398,7 +417,10 @@ class Dispatcher: motion_settings = self.config.cameras[camera_name].motion logger.info(f"Setting motion threshold for {camera_name}: {payload}") motion_settings.threshold = payload # type: ignore[union-attr] - self.config_updater.publish(f"config/motion/{camera_name}", motion_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name), + motion_settings, + ) self.publish(f"{camera_name}/motion_threshold/state", payload, retain=True) def _on_global_notification_command(self, payload: str) -> None: @@ -410,8 +432,8 @@ class Dispatcher: notification_settings = self.config.notifications logger.info(f"Setting all notifications: {payload}") notification_settings.enabled = payload == "ON" # type: ignore[union-attr] - self.config_updater.publish( - "config/notifications", {"_global_notifications": notification_settings} + self.config_updater.publisher.publish( + "config/notifications", notification_settings ) self.publish("notifications/state", payload, retain=True) @@ -434,7 +456,10 @@ class Dispatcher: logger.info(f"Turning off audio detection for {camera_name}") audio_settings.enabled = False - self.config_updater.publish(f"config/audio/{camera_name}", audio_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.audio, camera_name), + audio_settings, + ) self.publish(f"{camera_name}/audio/state", payload, retain=True) def _on_recordings_command(self, camera_name: str, payload: str) -> None: @@ -456,7 +481,10 @@ class Dispatcher: logger.info(f"Turning off recordings for {camera_name}") record_settings.enabled = False - self.config_updater.publish(f"config/record/{camera_name}", record_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.record, camera_name), + record_settings, + ) self.publish(f"{camera_name}/recordings/state", payload, retain=True) def _on_snapshots_command(self, camera_name: str, payload: str) -> None: @@ -472,6 +500,10 @@ class Dispatcher: logger.info(f"Turning off snapshots for {camera_name}") snapshots_settings.enabled = False + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.snapshots, camera_name), + snapshots_settings, + ) self.publish(f"{camera_name}/snapshots/state", payload, retain=True) def _on_ptz_command(self, camera_name: str, payload: str) -> None: @@ -506,7 +538,10 @@ class Dispatcher: logger.info(f"Turning off birdseye for {camera_name}") birdseye_settings.enabled = False - self.config_updater.publish(f"config/birdseye/{camera_name}", birdseye_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.birdseye, camera_name), + birdseye_settings, + ) self.publish(f"{camera_name}/birdseye/state", payload, retain=True) def _on_birdseye_mode_command(self, camera_name: str, payload: str) -> None: @@ -527,7 +562,10 @@ class Dispatcher: f"Setting birdseye mode for {camera_name} to {birdseye_settings.mode}" ) - self.config_updater.publish(f"config/birdseye/{camera_name}", birdseye_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.birdseye, camera_name), + birdseye_settings, + ) self.publish(f"{camera_name}/birdseye_mode/state", payload, retain=True) def _on_camera_notification_command(self, camera_name: str, payload: str) -> None: @@ -559,8 +597,9 @@ class Dispatcher: ): self.web_push_client.suspended_cameras[camera_name] = 0 - self.config_updater.publish( - "config/notifications", {camera_name: notification_settings} + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.notifications, camera_name), + notification_settings, ) self.publish(f"{camera_name}/notifications/state", payload, retain=True) self.publish(f"{camera_name}/notifications/suspended", "0", retain=True) @@ -617,7 +656,10 @@ class Dispatcher: logger.info(f"Turning off alerts for {camera_name}") review_settings.alerts.enabled = False - self.config_updater.publish(f"config/review/{camera_name}", review_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.review, camera_name), + review_settings, + ) self.publish(f"{camera_name}/review_alerts/state", payload, retain=True) def _on_detections_command(self, camera_name: str, payload: str) -> None: @@ -639,5 +681,8 @@ class Dispatcher: logger.info(f"Turning off detections for {camera_name}") review_settings.detections.enabled = False - self.config_updater.publish(f"config/review/{camera_name}", review_settings) + self.config_updater.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.review, camera_name), + review_settings, + ) self.publish(f"{camera_name}/review_detections/state", payload, retain=True) diff --git a/frigate/comms/webpush.py b/frigate/comms/webpush.py index c825c0617..91027d1a4 100644 --- a/frigate/comms/webpush.py +++ b/frigate/comms/webpush.py @@ -17,6 +17,10 @@ from titlecase import titlecase from frigate.comms.base_communicator import Communicator from frigate.comms.config_updater import ConfigSubscriber from frigate.config import FrigateConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import CONFIG_DIR from frigate.models import User @@ -73,7 +77,12 @@ class WebPushClient(Communicator): # type: ignore[misc] self.web_pushers[user["username"]].append(WebPusher(sub)) # notification config updater - self.config_subscriber = ConfigSubscriber("config/notifications") + self.global_config_subscriber = ConfigSubscriber( + "config/notifications", exact=True + ) + self.config_subscriber = CameraConfigUpdateSubscriber( + self.config.cameras, [CameraConfigUpdateEnum.notifications] + ) def subscribe(self, receiver: Callable) -> None: """Wrapper for allowing dispatcher to subscribe.""" @@ -154,15 +163,14 @@ class WebPushClient(Communicator): # type: ignore[misc] def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Wrapper for publishing when client is in valid state.""" # check for updated notification config - _, updated_notification_config = self.config_subscriber.check_for_update() + _, updated_notification_config = ( + self.global_config_subscriber.check_for_update() + ) if updated_notification_config: - for key, value in updated_notification_config.items(): - if key == "_global_notifications": - self.config.notifications = value + self.config.notifications = updated_notification_config - elif key in self.config.cameras: - self.config.cameras[key].notifications = value + self.config_subscriber.check_for_updates() if topic == "reviews": decoded = json.loads(payload) diff --git a/frigate/config/camera/updater.py b/frigate/config/camera/updater.py new file mode 100644 index 000000000..5abca57eb --- /dev/null +++ b/frigate/config/camera/updater.py @@ -0,0 +1,119 @@ +"""Convenience classes for updating configurations dynamically.""" + +from dataclasses import dataclass +from enum import Enum +from typing import Any + +from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber +from frigate.config import CameraConfig + + +class CameraConfigUpdateEnum(str, Enum): + """Supported camera config update types.""" + + audio = "audio" + birdseye = "birdseye" + detect = "detect" + enabled = "enabled" + motion = "motion" # includes motion and motion masks + notifications = "notifications" + record = "record" + review = "review" + snapshots = "snapshots" + zones = "zones" + + +@dataclass +class CameraConfigUpdateTopic: + update_type: CameraConfigUpdateEnum + camera: str + + @property + def topic(self) -> str: + return f"config/cameras/{self.camera}/{self.update_type.name}" + + +class CameraConfigUpdatePublisher: + def __init__(self): + self.publisher = ConfigPublisher() + + def publish_update(self, topic: CameraConfigUpdateTopic, config: Any) -> None: + self.publisher.publish(topic.topic, config) + + def stop(self) -> None: + self.publisher.stop() + + +class CameraConfigUpdateSubscriber: + def __init__( + self, + camera_configs: dict[str, CameraConfig], + topics: list[CameraConfigUpdateEnum], + ): + self.camera_configs = camera_configs + self.topics = topics + + base_topic = "config/cameras" + + if len(self.camera_configs) == 1: + base_topic += f"/{list(self.camera_configs.keys())[0]}" + + self.subscriber = ConfigSubscriber( + base_topic, + exact=False, + ) + + def __update_config( + self, camera: str, update_type: CameraConfigUpdateEnum, updated_config: Any + ) -> None: + config = self.camera_configs[camera] + + if not config: + return + + if update_type == CameraConfigUpdateEnum.audio: + config.audio = updated_config + elif update_type == CameraConfigUpdateEnum.birdseye: + config.birdseye = updated_config + elif update_type == CameraConfigUpdateEnum.detect: + config.detect = updated_config + elif update_type == CameraConfigUpdateEnum.enabled: + config.enabled = updated_config + elif update_type == CameraConfigUpdateEnum.motion: + config.motion = updated_config + elif update_type == CameraConfigUpdateEnum.notifications: + config.notifications = updated_config + elif update_type == CameraConfigUpdateEnum.record: + config.record = updated_config + elif update_type == CameraConfigUpdateEnum.review: + config.review = updated_config + elif update_type == CameraConfigUpdateEnum.snapshots: + config.snapshots = updated_config + elif update_type == CameraConfigUpdateEnum.zones: + config.zones = updated_config + + def check_for_updates(self) -> dict[str, list[str]]: + updated_topics: dict[str, list[str]] = {} + + # get all updates available + while True: + update_topic, update_config = self.subscriber.check_for_update() + + if update_topic is None or update_config is None: + break + + _, _, camera, raw_type = update_topic.split("/") + update_type = CameraConfigUpdateEnum[raw_type] + + if update_type in self.topics: + if update_type.name in updated_topics: + updated_topics[update_type.name].append(camera) + else: + updated_topics[update_type.name] = [camera] + + self.__update_config(camera, update_type, update_config) + + return updated_topics + + def stop(self) -> None: + self.subscriber.stop() diff --git a/frigate/events/audio.py b/frigate/events/audio.py index f2a217fd3..8a929c8ff 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -12,7 +12,6 @@ import numpy as np import frigate.util as util from frigate.camera import CameraMetrics -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.event_metadata_updater import ( EventMetadataPublisher, @@ -20,6 +19,10 @@ from frigate.comms.event_metadata_updater import ( ) from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, CameraInput, FfmpegConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import ( AUDIO_DURATION, AUDIO_FORMAT, @@ -138,9 +141,9 @@ class AudioEventMaintainer(threading.Thread): # create communication for audio detections self.requestor = InterProcessRequestor() - self.config_subscriber = ConfigSubscriber(f"config/audio/{camera.name}") - self.enabled_subscriber = ConfigSubscriber( - f"config/enabled/{camera.name}", True + self.config_subscriber = CameraConfigUpdateSubscriber( + {self.config.name: self.config}, + [CameraConfigUpdateEnum.audio, CameraConfigUpdateEnum.enabled], ) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) self.event_metadata_publisher = EventMetadataPublisher() @@ -308,21 +311,12 @@ class AudioEventMaintainer(threading.Thread): self.logger.error(f"Error reading audio data from ffmpeg process: {e}") log_and_restart() - def _update_enabled_state(self) -> bool: - """Fetch the latest config and update enabled state.""" - _, config_data = self.enabled_subscriber.check_for_update() - if config_data: - self.config.enabled = config_data.enabled - return config_data.enabled - - return self.config.enabled - def run(self) -> None: - if self._update_enabled_state(): + if self.config.enabled: self.start_or_restart_ffmpeg() while not self.stop_event.is_set(): - enabled = self._update_enabled_state() + enabled = self.config.enabled if enabled != self.was_enabled: if enabled: self.logger.debug( @@ -344,13 +338,7 @@ class AudioEventMaintainer(threading.Thread): continue # check if there is an updated config - ( - updated_topic, - updated_audio_config, - ) = self.config_subscriber.check_for_update() - - if updated_topic: - self.config.audio = updated_audio_config + self.config_subscriber.check_for_updates() self.read_audio() @@ -359,7 +347,6 @@ class AudioEventMaintainer(threading.Thread): self.logpipe.close() self.requestor.stop() self.config_subscriber.stop() - self.enabled_subscriber.stop() self.detection_publisher.stop() diff --git a/frigate/motion/improved_motion.py b/frigate/motion/improved_motion.py index 69de6d015..10818ea70 100644 --- a/frigate/motion/improved_motion.py +++ b/frigate/motion/improved_motion.py @@ -5,7 +5,6 @@ import numpy as np from scipy.ndimage import gaussian_filter from frigate.camera import PTZMetrics -from frigate.comms.config_updater import ConfigSubscriber from frigate.config import MotionConfig from frigate.motion import MotionDetector from frigate.util.image import grab_cv2_contours @@ -49,7 +48,6 @@ class ImprovedMotionDetector(MotionDetector): self.contrast_values = np.zeros((contrast_frame_history, 2), np.uint8) self.contrast_values[:, 1:2] = 255 self.contrast_values_index = 0 - self.config_subscriber = ConfigSubscriber(f"config/motion/{name}", True) self.ptz_metrics = ptz_metrics self.last_stop_time = None @@ -59,12 +57,6 @@ class ImprovedMotionDetector(MotionDetector): def detect(self, frame): motion_boxes = [] - # check for updated motion config - _, updated_motion_config = self.config_subscriber.check_for_update() - - if updated_motion_config: - self.config = updated_motion_config - if not self.config.enabled: return motion_boxes @@ -246,4 +238,4 @@ class ImprovedMotionDetector(MotionDetector): def stop(self) -> None: """stop the motion detector.""" - self.config_subscriber.stop() + pass diff --git a/frigate/output/birdseye.py b/frigate/output/birdseye.py index b295af82e..78686fd63 100644 --- a/frigate/output/birdseye.py +++ b/frigate/output/birdseye.py @@ -15,7 +15,6 @@ from typing import Any, Optional import cv2 import numpy as np -from frigate.comms.config_updater import ConfigSubscriber from frigate.config import BirdseyeModeEnum, FfmpegConfig, FrigateConfig from frigate.const import BASE_DIR, BIRDSEYE_PIPE, INSTALL_DIR from frigate.util.image import ( @@ -754,7 +753,6 @@ class Birdseye: "birdseye", self.converter, websocket_server, stop_event ) self.birdseye_manager = BirdsEyeFrameManager(config, stop_event) - self.birdseye_subscriber = ConfigSubscriber("config/birdseye/") self.frame_manager = SharedMemoryFrameManager() self.stop_event = stop_event @@ -791,20 +789,6 @@ class Birdseye: frame_time: float, frame: np.ndarray, ) -> None: - # check if there is an updated config - while True: - ( - updated_birdseye_topic, - updated_birdseye_config, - ) = self.birdseye_subscriber.check_for_update() - - if not updated_birdseye_topic: - break - - if updated_birdseye_config: - camera_name = updated_birdseye_topic.rpartition("/")[-1] - self.config.cameras[camera_name].birdseye = updated_birdseye_config - if self.birdseye_manager.update( camera, len([o for o in current_tracked_objects if not o["stationary"]]), @@ -815,6 +799,5 @@ class Birdseye: self.__send_new_frame() def stop(self) -> None: - self.birdseye_subscriber.stop() self.converter.join() self.broadcaster.join() diff --git a/frigate/output/output.py b/frigate/output/output.py index befb663eb..6decf0005 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -17,10 +17,13 @@ from ws4py.server.wsgirefserver import ( ) from ws4py.server.wsgiutils import WebSocketWSGIApplication -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.ws import WebSocket from frigate.config import FrigateConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import CACHE_DIR, CLIPS_DIR from frigate.output.birdseye import Birdseye from frigate.output.camera import JsmpegCamera @@ -99,7 +102,14 @@ def output_frames( websocket_thread = threading.Thread(target=websocket_server.serve_forever) detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) - config_enabled_subscriber = ConfigSubscriber("config/enabled/") + config_subscriber = CameraConfigUpdateSubscriber( + config.cameras, + [ + CameraConfigUpdateEnum.birdseye, + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.record, + ], + ) jsmpeg_cameras: dict[str, JsmpegCamera] = {} birdseye: Birdseye | None = None @@ -111,7 +121,7 @@ def output_frames( move_preview_frames("cache") for camera, cam_config in config.cameras.items(): - if not cam_config.enabled: + if not cam_config.enabled_in_config: continue jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server) @@ -125,18 +135,7 @@ def output_frames( while not stop_event.is_set(): # check if there is an updated config - while True: - ( - updated_enabled_topic, - updated_enabled_config, - ) = config_enabled_subscriber.check_for_update() - - if not updated_enabled_topic: - break - - if updated_enabled_config: - camera_name = updated_enabled_topic.rpartition("/")[-1] - config.cameras[camera_name].enabled = updated_enabled_config.enabled + config_subscriber.check_for_updates() (topic, data) = detection_subscriber.check_for_update(timeout=1) now = datetime.datetime.now().timestamp() @@ -240,7 +239,7 @@ def output_frames( if birdseye is not None: birdseye.stop() - config_enabled_subscriber.stop() + config_subscriber.stop() websocket_server.manager.close_all() websocket_server.manager.stop() websocket_server.manager.join() diff --git a/frigate/output/preview.py b/frigate/output/preview.py index be9da292b..1bc94ea23 100644 --- a/frigate/output/preview.py +++ b/frigate/output/preview.py @@ -13,7 +13,6 @@ from typing import Any import cv2 import numpy as np -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, RecordQualityEnum from frigate.const import CACHE_DIR, CLIPS_DIR, INSERT_PREVIEW, PREVIEW_FRAME_TYPE @@ -174,9 +173,6 @@ class PreviewRecorder: # create communication for finished previews self.requestor = InterProcessRequestor() - self.config_subscriber = ConfigSubscriber( - f"config/record/{self.config.name}", True - ) y, u1, u2, v1, v2 = get_yuv_crop( self.config.frame_shape_yuv, @@ -323,12 +319,6 @@ class PreviewRecorder: ) -> None: self.offline = False - # check for updated record config - _, updated_record_config = self.config_subscriber.check_for_update() - - if updated_record_config: - self.config.record = updated_record_config - # always write the first frame if self.start_time == 0: self.start_time = frame_time diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index f1b9a600e..7f13451d6 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -16,7 +16,6 @@ from typing import Any, Optional, Tuple import numpy as np import psutil -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.recordings_updater import ( @@ -24,6 +23,10 @@ from frigate.comms.recordings_updater import ( RecordingsDataTypeEnum, ) from frigate.config import FrigateConfig, RetainModeEnum +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import ( CACHE_DIR, CACHE_SEGMENT_FORMAT, @@ -71,7 +74,9 @@ class RecordingMaintainer(threading.Thread): # create communication for retained recordings self.requestor = InterProcessRequestor() - self.config_subscriber = ConfigSubscriber("config/record/") + self.config_subscriber = CameraConfigUpdateSubscriber( + self.config.cameras, [CameraConfigUpdateEnum.record] + ) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.recordings_publisher = RecordingsDataPublisher( RecordingsDataTypeEnum.recordings_available_through @@ -518,17 +523,7 @@ class RecordingMaintainer(threading.Thread): run_start = datetime.datetime.now().timestamp() # check if there is an updated config - while True: - ( - updated_topic, - updated_record_config, - ) = self.config_subscriber.check_for_update() - - if not updated_topic: - break - - camera_name = updated_topic.rpartition("/")[-1] - self.config.cameras[camera_name].record = updated_record_config + self.config_subscriber.check_for_updates() stale_frame_count = 0 stale_frame_count_threshold = 10 diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index b144b6e52..7f60a0209 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -15,10 +15,13 @@ from typing import Any, Optional import cv2 import numpy as np -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, FrigateConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import ( CLEAR_ONGOING_REVIEW_SEGMENTS, CLIPS_DIR, @@ -150,9 +153,14 @@ class ReviewSegmentMaintainer(threading.Thread): # create communication for review segments self.requestor = InterProcessRequestor() - self.record_config_subscriber = ConfigSubscriber("config/record/") - self.review_config_subscriber = ConfigSubscriber("config/review/") - self.enabled_config_subscriber = ConfigSubscriber("config/enabled/") + self.config_subscriber = CameraConfigUpdateSubscriber( + config.cameras, + [ + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.record, + CameraConfigUpdateEnum.review, + ], + ) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) # manual events @@ -458,50 +466,15 @@ class ReviewSegmentMaintainer(threading.Thread): def run(self) -> None: while not self.stop_event.is_set(): # check if there is an updated config - while True: - ( - updated_record_topic, - updated_record_config, - ) = self.record_config_subscriber.check_for_update() + updated_topics = self.config_subscriber.check_for_updates() - ( - updated_review_topic, - updated_review_config, - ) = self.review_config_subscriber.check_for_update() + if "record" in updated_topics: + for camera in updated_topics["record"]: + self.end_segment(camera) - ( - updated_enabled_topic, - updated_enabled_config, - ) = self.enabled_config_subscriber.check_for_update() - - if ( - not updated_record_topic - and not updated_review_topic - and not updated_enabled_topic - ): - break - - if updated_record_topic: - camera_name = updated_record_topic.rpartition("/")[-1] - self.config.cameras[camera_name].record = updated_record_config - - # immediately end segment - if not updated_record_config.enabled: - self.end_segment(camera_name) - - if updated_review_topic: - camera_name = updated_review_topic.rpartition("/")[-1] - self.config.cameras[camera_name].review = updated_review_config - - if updated_enabled_config: - camera_name = updated_enabled_topic.rpartition("/")[-1] - self.config.cameras[ - camera_name - ].enabled = updated_enabled_config.enabled - - # immediately end segment as we may not get another update - if not updated_enabled_config.enabled: - self.end_segment(camera_name) + if "enabled" in updated_topics: + for camera in updated_topics["enabled"]: + self.end_segment(camera) (topic, data) = self.detection_subscriber.check_for_update(timeout=1) @@ -730,8 +703,7 @@ class ReviewSegmentMaintainer(threading.Thread): f"Dedicated LPR camera API has been called for {camera}, but detections are disabled. LPR events will not appear as a detection." ) - self.record_config_subscriber.stop() - self.review_config_subscriber.stop() + self.config_subscriber.stop() self.requestor.stop() self.detection_subscriber.stop() logger.info("Exiting review maintainer...") diff --git a/frigate/track/object_processing.py b/frigate/track/object_processing.py index 2eb55e883..5157e0424 100644 --- a/frigate/track/object_processing.py +++ b/frigate/track/object_processing.py @@ -14,7 +14,6 @@ import numpy as np from peewee import DoesNotExist from frigate.camera.state import CameraState -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.dispatcher import Dispatcher from frigate.comms.event_metadata_updater import ( @@ -29,6 +28,10 @@ from frigate.config import ( RecordConfig, SnapshotsConfig, ) +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import FAST_QUEUE_TIMEOUT, UPDATE_CAMERA_ACTIVITY from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.models import Event, Timeline @@ -63,7 +66,9 @@ class TrackedObjectProcessor(threading.Thread): self.last_motion_detected: dict[str, float] = {} self.ptz_autotracker_thread = ptz_autotracker_thread - self.config_enabled_subscriber = ConfigSubscriber("config/enabled/") + self.config_subscriber = CameraConfigUpdateSubscriber( + self.config.cameras, [CameraConfigUpdateEnum.enabled] + ) self.requestor = InterProcessRequestor() self.detection_publisher = DetectionPublisher(DetectionTypeEnum.all) @@ -576,24 +581,14 @@ class TrackedObjectProcessor(threading.Thread): def run(self): while not self.stop_event.is_set(): # check for config updates - while True: - ( - updated_enabled_topic, - updated_enabled_config, - ) = self.config_enabled_subscriber.check_for_update() + updated_topics = self.config_subscriber.check_for_updates() - if not updated_enabled_topic: - break - - camera_name = updated_enabled_topic.rpartition("/")[-1] - self.config.cameras[ - camera_name - ].enabled = updated_enabled_config.enabled - - if self.camera_states[camera_name].prev_enabled is None: - self.camera_states[ - camera_name - ].prev_enabled = updated_enabled_config.enabled + if "enabled" in updated_topics: + for camera in updated_topics["enabled"]: + if self.camera_states[camera].prev_enabled is None: + self.camera_states[camera].prev_enabled = self.config.cameras[ + camera + ].enabled # manage camera disabled state for camera, config in self.config.cameras.items(): @@ -702,6 +697,6 @@ class TrackedObjectProcessor(threading.Thread): self.event_sender.stop() self.event_end_subscriber.stop() self.sub_label_subscriber.stop() - self.config_enabled_subscriber.stop() + self.config_subscriber.stop() logger.info("Exiting object processor...") diff --git a/frigate/video.py b/frigate/video.py index 754c9b764..2efabfd93 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -15,10 +15,13 @@ import cv2 from setproctitle import setproctitle from frigate.camera import CameraMetrics, PTZMetrics -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.config.camera.camera import CameraTypeEnum +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import ( CACHE_DIR, CACHE_SEGMENT_FORMAT, @@ -112,15 +115,13 @@ def capture_frames( frame_rate.start() skipped_eps = EventsPerSecond() skipped_eps.start() - config_subscriber = ConfigSubscriber(f"config/enabled/{config.name}", True) + config_subscriber = CameraConfigUpdateSubscriber( + {config.name: config}, [CameraConfigUpdateEnum.enabled] + ) def get_enabled_state(): """Fetch the latest enabled state from ZMQ.""" - _, config_data = config_subscriber.check_for_update() - - if config_data: - config.enabled = config_data.enabled - + config_subscriber.check_for_updates() return config.enabled while not stop_event.is_set(): @@ -167,7 +168,6 @@ def capture_frames( class CameraWatchdog(threading.Thread): def __init__( self, - camera_name, config: CameraConfig, shm_frame_count: int, frame_queue: Queue, @@ -177,13 +177,12 @@ class CameraWatchdog(threading.Thread): stop_event, ): threading.Thread.__init__(self) - self.logger = logging.getLogger(f"watchdog.{camera_name}") - self.camera_name = camera_name + self.logger = logging.getLogger(f"watchdog.{config.name}") self.config = config self.shm_frame_count = shm_frame_count self.capture_thread = None self.ffmpeg_detect_process = None - self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") + self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.detect") self.ffmpeg_other_processes: list[dict[str, Any]] = [] self.camera_fps = camera_fps self.skipped_fps = skipped_fps @@ -196,16 +195,14 @@ class CameraWatchdog(threading.Thread): self.stop_event = stop_event self.sleeptime = self.config.ffmpeg.retry_interval - self.config_subscriber = ConfigSubscriber(f"config/enabled/{camera_name}", True) + self.config_subscriber = CameraConfigUpdateSubscriber( + {config.name: config}, [CameraConfigUpdateEnum.enabled] + ) self.was_enabled = self.config.enabled def _update_enabled_state(self) -> bool: """Fetch the latest config and update enabled state.""" - _, config_data = self.config_subscriber.check_for_update() - if config_data: - self.config.enabled = config_data.enabled - return config_data.enabled - + self.config_subscriber.check_for_updates() return self.config.enabled def run(self): @@ -217,10 +214,10 @@ class CameraWatchdog(threading.Thread): enabled = self._update_enabled_state() if enabled != self.was_enabled: if enabled: - self.logger.debug(f"Enabling camera {self.camera_name}") + self.logger.debug(f"Enabling camera {self.config.name}") self.start_all_ffmpeg() else: - self.logger.debug(f"Disabling camera {self.camera_name}") + self.logger.debug(f"Disabling camera {self.config.name}") self.stop_all_ffmpeg() self.was_enabled = enabled continue @@ -233,7 +230,7 @@ class CameraWatchdog(threading.Thread): if not self.capture_thread.is_alive(): self.camera_fps.value = 0 self.logger.error( - f"Ffmpeg process crashed unexpectedly for {self.camera_name}." + f"Ffmpeg process crashed unexpectedly for {self.config.name}." ) self.logger.error( "The following ffmpeg logs include the last 100 lines prior to exit." @@ -243,7 +240,7 @@ class CameraWatchdog(threading.Thread): elif now - self.capture_thread.current_frame.value > 20: self.camera_fps.value = 0 self.logger.info( - f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..." + f"No frames received from {self.config.name} in 20 seconds. Exiting ffmpeg..." ) self.ffmpeg_detect_process.terminate() try: @@ -260,7 +257,7 @@ class CameraWatchdog(threading.Thread): self.fps_overflow_count = 0 self.camera_fps.value = 0 self.logger.info( - f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..." + f"{self.config.name} exceeded fps limit. Exiting ffmpeg..." ) self.ffmpeg_detect_process.terminate() try: @@ -289,7 +286,7 @@ class CameraWatchdog(threading.Thread): latest_segment_time + datetime.timedelta(seconds=120) ): self.logger.error( - f"No new recording segments were created for {self.camera_name} in the last 120s. restarting the ffmpeg record process..." + f"No new recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..." ) p["process"] = start_or_restart_ffmpeg( p["cmd"], @@ -336,13 +333,13 @@ class CameraWatchdog(threading.Thread): def start_all_ffmpeg(self): """Start all ffmpeg processes (detection and others).""" - logger.debug(f"Starting all ffmpeg processes for {self.camera_name}") + logger.debug(f"Starting all ffmpeg processes for {self.config.name}") self.start_ffmpeg_detect() for c in self.config.ffmpeg_cmds: if "detect" in c["roles"]: continue logpipe = LogPipe( - f"ffmpeg.{self.camera_name}.{'_'.join(sorted(c['roles']))}" + f"ffmpeg.{self.config.name}.{'_'.join(sorted(c['roles']))}" ) self.ffmpeg_other_processes.append( { @@ -355,12 +352,12 @@ class CameraWatchdog(threading.Thread): def stop_all_ffmpeg(self): """Stop all ffmpeg processes (detection and others).""" - logger.debug(f"Stopping all ffmpeg processes for {self.camera_name}") + logger.debug(f"Stopping all ffmpeg processes for {self.config.name}") if self.capture_thread is not None and self.capture_thread.is_alive(): self.capture_thread.join(timeout=5) if self.capture_thread.is_alive(): self.logger.warning( - f"Capture thread for {self.camera_name} did not stop gracefully." + f"Capture thread for {self.config.name} did not stop gracefully." ) if self.ffmpeg_detect_process is not None: stop_ffmpeg(self.ffmpeg_detect_process, self.logger) @@ -387,7 +384,7 @@ class CameraWatchdog(threading.Thread): newest_segment_time = latest_segment for file in cache_files: - if self.camera_name in file: + if self.config.name in file: basename = os.path.splitext(file)[0] _, date = basename.rsplit("@", maxsplit=1) segment_time = datetime.datetime.strptime( @@ -444,7 +441,7 @@ class CameraCapture(threading.Thread): def capture_camera( - name, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics + config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics ): stop_event = mp.Event() @@ -454,11 +451,10 @@ def capture_camera( signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGINT, receiveSignal) - threading.current_thread().name = f"capture:{name}" - setproctitle(f"frigate.capture:{name}") + threading.current_thread().name = f"capture:{config.name}" + setproctitle(f"frigate.capture:{config.name}") camera_watchdog = CameraWatchdog( - name, config, shm_frame_count, camera_metrics.frame_queue, @@ -526,7 +522,6 @@ def track_camera( frame_shape, model_config, config, - config.detect, frame_manager, motion_detector, object_detector, @@ -593,7 +588,6 @@ def process_frames( frame_shape: tuple[int, int], model_config: ModelConfig, camera_config: CameraConfig, - detect_config: DetectConfig, frame_manager: FrameManager, motion_detector: MotionDetector, object_detector: RemoteObjectDetector, @@ -608,8 +602,14 @@ def process_frames( exit_on_empty: bool = False, ): next_region_update = get_tomorrow_at_time(2) - detect_config_subscriber = ConfigSubscriber(f"config/detect/{camera_name}", True) - enabled_config_subscriber = ConfigSubscriber(f"config/enabled/{camera_name}", True) + config_subscriber = CameraConfigUpdateSubscriber( + {camera_name: camera_config}, + [ + CameraConfigUpdateEnum.detect, + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.motion, + ], + ) fps_tracker = EventsPerSecond() fps_tracker.start() @@ -644,11 +644,11 @@ def process_frames( ] while not stop_event.is_set(): - _, updated_enabled_config = enabled_config_subscriber.check_for_update() + updated_configs = config_subscriber.check_for_updates() - if updated_enabled_config: + if "enabled" in updated_configs: prev_enabled = camera_enabled - camera_enabled = updated_enabled_config.enabled + camera_enabled = camera_config.enabled if ( not camera_enabled @@ -676,12 +676,6 @@ def process_frames( time.sleep(0.1) continue - # check for updated detect config - _, updated_detect_config = detect_config_subscriber.check_for_update() - - if updated_detect_config: - detect_config = updated_detect_config - if ( datetime.datetime.now().astimezone(datetime.timezone.utc) > next_region_update @@ -716,14 +710,14 @@ def process_frames( consolidated_detections = [] # if detection is disabled - if not detect_config.enabled: + if not camera_config.detect.enabled: object_tracker.match_and_update(frame_name, frame_time, []) else: # get stationary object ids # check every Nth frame for stationary objects # disappeared objects are not stationary # also check for overlapping motion boxes - if stationary_frame_counter == detect_config.stationary.interval: + if stationary_frame_counter == camera_config.detect.stationary.interval: stationary_frame_counter = 0 stationary_object_ids = [] else: @@ -732,7 +726,8 @@ def process_frames( obj["id"] for obj in object_tracker.tracked_objects.values() # if it has exceeded the stationary threshold - if obj["motionless_count"] >= detect_config.stationary.threshold + if obj["motionless_count"] + >= camera_config.detect.stationary.threshold # and it hasn't disappeared and object_tracker.disappeared[obj["id"]] == 0 # and it doesn't overlap with any current motion boxes when not calibrating @@ -747,7 +742,8 @@ def process_frames( ( # use existing object box for stationary objects obj["estimate"] - if obj["motionless_count"] < detect_config.stationary.threshold + if obj["motionless_count"] + < camera_config.detect.stationary.threshold else obj["box"] ) for obj in object_tracker.tracked_objects.values() @@ -821,7 +817,7 @@ def process_frames( for region in regions: detections.extend( detect( - detect_config, + camera_config.detect, object_detector, frame, model_config, @@ -968,5 +964,4 @@ def process_frames( motion_detector.stop() requestor.stop() - detect_config_subscriber.stop() - enabled_config_subscriber.stop() + config_subscriber.stop()