Dynamic Config Updates (#18353)

* Create classes to handle publishing and subscribing config updates

* Cleanup

* Use config updater

* Update handling for enabled config

* Cleanup

* Recording config updates

* Birdseye config updates

* Handle notifications

* handle review

* Update motion
This commit is contained in:
Nicolas Mowen
2025-05-22 12:16:51 -06:00
committed by Blake Blackshear
parent b7dbcce6e5
commit dc187eee1c
13 changed files with 316 additions and 236 deletions

View File

@@ -17,6 +17,10 @@ from titlecase import titlecase
from frigate.comms.base_communicator import Communicator
from frigate.comms.config_updater import ConfigSubscriber
from frigate.config import FrigateConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import CONFIG_DIR
from frigate.models import User
@@ -73,7 +77,12 @@ class WebPushClient(Communicator): # type: ignore[misc]
self.web_pushers[user["username"]].append(WebPusher(sub))
# notification config updater
self.config_subscriber = ConfigSubscriber("config/notifications")
self.global_config_subscriber = ConfigSubscriber(
"config/notifications", exact=True
)
self.config_subscriber = CameraConfigUpdateSubscriber(
self.config.cameras, [CameraConfigUpdateEnum.notifications]
)
def subscribe(self, receiver: Callable) -> None:
"""Wrapper for allowing dispatcher to subscribe."""
@@ -154,15 +163,14 @@ class WebPushClient(Communicator): # type: ignore[misc]
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Wrapper for publishing when client is in valid state."""
# check for updated notification config
_, updated_notification_config = self.config_subscriber.check_for_update()
_, updated_notification_config = (
self.global_config_subscriber.check_for_update()
)
if updated_notification_config:
for key, value in updated_notification_config.items():
if key == "_global_notifications":
self.config.notifications = value
self.config.notifications = updated_notification_config
elif key in self.config.cameras:
self.config.cameras[key].notifications = value
self.config_subscriber.check_for_updates()
if topic == "reviews":
decoded = json.loads(payload)