From 6f16ecdd483d4b00ad68b777b6eed6524a080efb Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 11 Jun 2025 11:25:30 -0600 Subject: [PATCH] Dynamic Management of Cameras (#18671) * Add base class for global config updates * Add or remove camera states * Move camera process management to separate thread * Move camera management fully to separate class * Cleanup * Stop camera processes when stop command is sent * Start processes dynamically when needed * Adjust * Leave extra room in tracked object queue for two cameras * Dynamically set extra config pieces * Add some TODOs * Fix type check * Simplify config updates * Improve typing * Correctly handle indexed entries * Cleanup * Create out SHM * Use ZMQ for signaling object detectoin is completed * Get camera correctly created * Cleanup for updating the cameras config * Cleanup * Don't enable audio if no cameras have audio transcription * Use exact string so similar camera names don't interfere * Add ability to update config via json body to config/set endpoint Additionally, update the config in a single rather than multiple calls for each updated key * fix autotracking calibration to support new config updater function --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> --- frigate/api/app.py | 45 +++- frigate/api/defs/request/app_body.py | 3 +- frigate/app.py | 166 +++------------ frigate/camera/activity_manager.py | 35 +-- frigate/camera/maintainer.py | 248 ++++++++++++++++++++++ frigate/comms/object_detector_signaler.py | 21 ++ frigate/comms/webpush.py | 9 +- frigate/config/camera/updater.py | 19 +- frigate/embeddings/maintainer.py | 11 + frigate/events/audio.py | 20 +- frigate/object_detection/base.py | 41 ++-- frigate/output/output.py | 12 +- frigate/ptz/autotrack.py | 11 +- frigate/record/maintainer.py | 4 +- frigate/review/maintainer.py | 3 + frigate/track/object_processing.py | 52 +++-- frigate/util/builtin.py | 53 +++-- frigate/video.py | 8 +- 18 files changed, 533 insertions(+), 228 deletions(-) create mode 100644 frigate/camera/maintainer.py create mode 100644 frigate/comms/object_detector_signaler.py diff --git a/frigate/api/app.py b/frigate/api/app.py index 38116f6d6..bcdd5c954 100644 --- a/frigate/api/app.py +++ b/frigate/api/app.py @@ -6,6 +6,7 @@ import json import logging import os import traceback +import urllib from datetime import datetime, timedelta from functools import reduce from io import StringIO @@ -36,8 +37,10 @@ from frigate.models import Event, Timeline from frigate.stats.prometheus import get_metrics, update_metrics from frigate.util.builtin import ( clean_camera_user_pass, + flatten_config_data, get_tz_modifiers, - update_yaml_from_url, + process_config_query_string, + update_yaml_file_bulk, ) from frigate.util.config import find_config_file from frigate.util.services import ( @@ -358,14 +361,37 @@ def config_set(request: Request, body: AppConfigSetBody): with open(config_file, "r") as f: old_raw_config = f.read() - f.close() try: - update_yaml_from_url(config_file, str(request.url)) + updates = {} + + # process query string parameters (takes precedence over body.config_data) + parsed_url = urllib.parse.urlparse(str(request.url)) + query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True) + + # Filter out empty keys but keep blank values for non-empty keys + query_string = {k: v for k, v in query_string.items() if k} + + if query_string: + updates = process_config_query_string(query_string) + elif body.config_data: + updates = flatten_config_data(body.config_data) + + if not updates: + return JSONResponse( + content=( + {"success": False, "message": "No configuration data provided"} + ), + status_code=400, + ) + + # apply all updates in a single operation + update_yaml_file_bulk(config_file, updates) + + # validate the updated config with open(config_file, "r") as f: new_raw_config = f.read() - f.close() - # Validate the config schema + try: config = FrigateConfig.parse(new_raw_config) except Exception: @@ -390,12 +416,19 @@ def config_set(request: Request, body: AppConfigSetBody): ) if body.requires_restart == 0 or body.update_topic: + old_config: FrigateConfig = request.app.frigate_config request.app.frigate_config = config if body.update_topic and body.update_topic.startswith("config/cameras/"): _, _, camera, field = body.update_topic.split("/") - settings = config.get_nested_object(body.update_topic) + if field == "add": + settings = config.cameras[camera] + elif field == "remove": + settings = old_config.cameras[camera] + else: + settings = config.get_nested_object(body.update_topic) + request.app.config_publisher.publish_update( CameraConfigUpdateTopic(CameraConfigUpdateEnum[field], camera), settings, diff --git a/frigate/api/defs/request/app_body.py b/frigate/api/defs/request/app_body.py index 7456a6c77..7f8ca40ec 100644 --- a/frigate/api/defs/request/app_body.py +++ b/frigate/api/defs/request/app_body.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Any, Dict, Optional from pydantic import BaseModel @@ -6,6 +6,7 @@ from pydantic import BaseModel class AppConfigSetBody(BaseModel): requires_restart: int = 1 update_topic: str | None = None + config_data: Optional[Dict[str, Any]] = None class AppPutPasswordBody(BaseModel): diff --git a/frigate/app.py b/frigate/app.py index 04cdb2920..cccbce53e 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -17,6 +17,7 @@ import frigate.util as util from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.camera import CameraMetrics, PTZMetrics +from frigate.camera.maintainer import CameraMaintainer from frigate.comms.base_communicator import Communicator from frigate.comms.dispatcher import Dispatcher from frigate.comms.event_metadata_updater import EventMetadataPublisher @@ -35,7 +36,6 @@ from frigate.const import ( FACE_DIR, MODEL_CACHE_DIR, RECORD_DIR, - SHM_FRAMES_VAR, THUMB_DIR, ) from frigate.data_processing.types import DataProcessorMetrics @@ -69,10 +69,8 @@ from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor from frigate.track.object_processing import TrackedObjectProcessor from frigate.util.builtin import empty_and_close_queue -from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory -from frigate.util.object import get_camera_regions_grid +from frigate.util.image import UntrackedSharedMemory from frigate.version import VERSION -from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog logger = logging.getLogger(__name__) @@ -84,7 +82,6 @@ class FrigateApp: self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} - self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() self.camera_metrics: dict[str, CameraMetrics] = {} @@ -101,8 +98,6 @@ class FrigateApp: self.ptz_metrics: dict[str, PTZMetrics] = {} self.processes: dict[str, int] = {} self.embeddings: Optional[EmbeddingsContext] = None - self.region_grids: dict[str, list[list[dict[str, int]]]] = {} - self.frame_manager = SharedMemoryFrameManager() self.config = config def ensure_dirs(self) -> None: @@ -138,8 +133,16 @@ class FrigateApp: def init_queues(self) -> None: # Queue for cameras to push tracked objects to + # leaving room for 2 extra cameras to be added self.detected_frames_queue: Queue = mp.Queue( - maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2 + maxsize=( + sum( + camera.enabled_in_config == True + for camera in self.config.cameras.values() + ) + + 2 + ) + * 2 ) # Queue for timeline events @@ -276,7 +279,9 @@ class FrigateApp: "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous }, timeout=max( - 60, 10 * len([c for c in self.config.cameras.values() if c.enabled]) + 60, + 10 + * len([c for c in self.config.cameras.values() if c.enabled_in_config]), ), load_vec_extension=self.config.semantic_search.enabled, ) @@ -306,7 +311,9 @@ class FrigateApp: def init_embeddings_client(self) -> None: genai_cameras = [ - c for c in self.config.cameras.values() if c.enabled and c.genai.enabled + c + for c in self.config.cameras.values() + if c.enabled_in_config and c.genai.enabled ] if ( @@ -355,8 +362,6 @@ class FrigateApp: def start_detectors(self) -> None: for name in self.config.cameras.keys(): - self.detection_out_events[name] = mp.Event() - try: largest_frame = max( [ @@ -388,7 +393,7 @@ class FrigateApp: self.detectors[name] = ObjectDetectProcess( name, self.detection_queue, - self.detection_out_events, + list(self.config.cameras.keys()), detector_config, ) @@ -423,69 +428,16 @@ class FrigateApp: output_processor.start() logger.info(f"Output process started: {output_processor.pid}") - def init_historical_regions(self) -> None: - # delete region grids for removed or renamed cameras - cameras = list(self.config.cameras.keys()) - Regions.delete().where(~(Regions.camera << cameras)).execute() - - # create or update region grids for each camera - for camera in self.config.cameras.values(): - assert camera.name is not None - self.region_grids[camera.name] = get_camera_regions_grid( - camera.name, - camera.detect, - max(self.config.model.width, self.config.model.height), - ) - - def start_camera_processors(self) -> None: - for name, config in self.config.cameras.items(): - if not self.config.cameras[name].enabled_in_config: - logger.info(f"Camera processor not started for disabled camera {name}") - continue - - camera_process = util.Process( - target=track_camera, - name=f"camera_processor:{name}", - args=( - name, - config, - self.config.model, - self.config.model.merged_labelmap, - self.detection_queue, - self.detection_out_events[name], - self.detected_frames_queue, - self.camera_metrics[name], - self.ptz_metrics[name], - self.region_grids[name], - ), - daemon=True, - ) - self.camera_metrics[name].process = camera_process - camera_process.start() - logger.info(f"Camera processor started for {name}: {camera_process.pid}") - - def start_camera_capture_processes(self) -> None: - shm_frame_count = self.shm_frame_count() - - for name, config in self.config.cameras.items(): - if not self.config.cameras[name].enabled_in_config: - logger.info(f"Capture process not started for disabled camera {name}") - continue - - # pre-create shms - for i in range(shm_frame_count): - frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1] - self.frame_manager.create(f"{config.name}_frame{i}", frame_size) - - capture_process = util.Process( - target=capture_camera, - name=f"camera_capture:{name}", - args=(config, shm_frame_count, self.camera_metrics[name]), - ) - capture_process.daemon = True - self.camera_metrics[name].capture_process = capture_process - capture_process.start() - logger.info(f"Capture process started for {name}: {capture_process.pid}") + def start_camera_processor(self) -> None: + self.camera_maintainer = CameraMaintainer( + self.config, + self.detection_queue, + self.detected_frames_queue, + self.camera_metrics, + self.ptz_metrics, + self.stop_event, + ) + self.camera_maintainer.start() def start_audio_processor(self) -> None: audio_cameras = [ @@ -545,45 +497,6 @@ class FrigateApp: self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event) self.frigate_watchdog.start() - def shm_frame_count(self) -> int: - total_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1) - - # required for log files + nginx cache - min_req_shm = 40 + 10 - - if self.config.birdseye.restream: - min_req_shm += 8 - - available_shm = total_shm - min_req_shm - cam_total_frame_size = 0.0 - - for camera in self.config.cameras.values(): - if camera.enabled and camera.detect.width and camera.detect.height: - cam_total_frame_size += round( - (camera.detect.width * camera.detect.height * 1.5 + 270480) - / 1048576, - 1, - ) - - if cam_total_frame_size == 0.0: - return 0 - - shm_frame_count = min( - int(os.environ.get(SHM_FRAMES_VAR, "50")), - int(available_shm / (cam_total_frame_size)), - ) - - logger.debug( - f"Calculated total camera size {available_shm} / {cam_total_frame_size} :: {shm_frame_count} frames for each camera in SHM" - ) - - if shm_frame_count < 20: - logger.warning( - f"The current SHM size of {total_shm}MB is too small, recommend increasing it to at least {round(min_req_shm + cam_total_frame_size * 20)}MB." - ) - - return shm_frame_count - def init_auth(self) -> None: if self.config.auth.enabled: if User.select().count() == 0: @@ -650,10 +563,8 @@ class FrigateApp: self.init_embeddings_client() self.start_video_output_processor() self.start_ptz_autotracker() - self.init_historical_regions() self.start_detected_frames_processor() - self.start_camera_processors() - self.start_camera_capture_processes() + self.start_camera_processor() self.start_audio_processor() self.start_storage_maintainer() self.start_stats_emitter() @@ -707,24 +618,6 @@ class FrigateApp: if self.onvif_controller: self.onvif_controller.close() - # ensure the capture processes are done - for camera, metrics in self.camera_metrics.items(): - capture_process = metrics.capture_process - if capture_process is not None: - logger.info(f"Waiting for capture process for {camera} to stop") - capture_process.terminate() - capture_process.join() - - # ensure the camera processors are done - for camera, metrics in self.camera_metrics.items(): - camera_process = metrics.process - if camera_process is not None: - logger.info(f"Waiting for process for {camera} to stop") - camera_process.terminate() - camera_process.join() - logger.info(f"Closing frame queue for {camera}") - empty_and_close_queue(metrics.frame_queue) - # ensure the detectors are done for detector in self.detectors.values(): detector.stop() @@ -769,7 +662,6 @@ class FrigateApp: self.event_metadata_updater.stop() self.inter_zmq_proxy.stop() - self.frame_manager.cleanup() while len(self.detection_shms) > 0: shm = self.detection_shms.pop() shm.close() diff --git a/frigate/camera/activity_manager.py b/frigate/camera/activity_manager.py index 6039a07f6..e10730931 100644 --- a/frigate/camera/activity_manager.py +++ b/frigate/camera/activity_manager.py @@ -3,7 +3,7 @@ from collections import Counter from typing import Any, Callable -from frigate.config.config import FrigateConfig +from frigate.config import CameraConfig, FrigateConfig class CameraActivityManager: @@ -23,26 +23,33 @@ class CameraActivityManager: if not camera_config.enabled_in_config: continue - self.last_camera_activity[camera_config.name] = {} - self.camera_all_object_counts[camera_config.name] = Counter() - self.camera_active_object_counts[camera_config.name] = Counter() + self.__init_camera(camera_config) - for zone, zone_config in camera_config.zones.items(): - if zone not in self.all_zone_labels: - self.zone_all_object_counts[zone] = Counter() - self.zone_active_object_counts[zone] = Counter() - self.all_zone_labels[zone] = set() + def __init_camera(self, camera_config: CameraConfig) -> None: + self.last_camera_activity[camera_config.name] = {} + self.camera_all_object_counts[camera_config.name] = Counter() + self.camera_active_object_counts[camera_config.name] = Counter() - self.all_zone_labels[zone].update( - zone_config.objects - if zone_config.objects - else camera_config.objects.track - ) + for zone, zone_config in camera_config.zones.items(): + if zone not in self.all_zone_labels: + self.zone_all_object_counts[zone] = Counter() + self.zone_active_object_counts[zone] = Counter() + self.all_zone_labels[zone] = set() + + self.all_zone_labels[zone].update( + zone_config.objects + if zone_config.objects + else camera_config.objects.track + ) def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None: all_objects: list[dict[str, Any]] = [] for camera in new_activity.keys(): + # handle cameras that were added dynamically + if camera not in self.camera_all_object_counts: + self.__init_camera(self.config.cameras[camera]) + new_objects = new_activity[camera].get("objects", []) all_objects.extend(new_objects) diff --git a/frigate/camera/maintainer.py b/frigate/camera/maintainer.py new file mode 100644 index 000000000..6abeb762e --- /dev/null +++ b/frigate/camera/maintainer.py @@ -0,0 +1,248 @@ +"""Create and maintain camera processes / management.""" + +import logging +import os +import shutil +import threading +from multiprocessing import Queue +from multiprocessing.synchronize import Event as MpEvent + +from frigate.camera import CameraMetrics, PTZMetrics +from frigate.config import FrigateConfig +from frigate.config.camera import CameraConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) +from frigate.const import SHM_FRAMES_VAR +from frigate.models import Regions +from frigate.util import Process as FrigateProcess +from frigate.util.builtin import empty_and_close_queue +from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory +from frigate.util.object import get_camera_regions_grid +from frigate.video import capture_camera, track_camera + +logger = logging.getLogger(__name__) + + +class CameraMaintainer(threading.Thread): + def __init__( + self, + config: FrigateConfig, + detection_queue: Queue, + detected_frames_queue: Queue, + camera_metrics: dict[str, CameraMetrics], + ptz_metrics: dict[str, PTZMetrics], + stop_event: MpEvent, + ): + super().__init__(name="camera_processor") + self.config = config + self.detection_queue = detection_queue + self.detected_frames_queue = detected_frames_queue + self.stop_event = stop_event + self.camera_metrics = camera_metrics + self.ptz_metrics = ptz_metrics + self.frame_manager = SharedMemoryFrameManager() + self.region_grids: dict[str, list[list[dict[str, int]]]] = {} + self.update_subscriber = CameraConfigUpdateSubscriber( + self.config, + {}, + [ + CameraConfigUpdateEnum.add, + CameraConfigUpdateEnum.remove, + ], + ) + self.shm_count = self.__calculate_shm_frame_count() + + def __init_historical_regions(self) -> None: + # delete region grids for removed or renamed cameras + cameras = list(self.config.cameras.keys()) + Regions.delete().where(~(Regions.camera << cameras)).execute() + + # create or update region grids for each camera + for camera in self.config.cameras.values(): + assert camera.name is not None + self.region_grids[camera.name] = get_camera_regions_grid( + camera.name, + camera.detect, + max(self.config.model.width, self.config.model.height), + ) + + def __calculate_shm_frame_count(self) -> int: + total_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1) + + # required for log files + nginx cache + min_req_shm = 40 + 10 + + if self.config.birdseye.restream: + min_req_shm += 8 + + available_shm = total_shm - min_req_shm + cam_total_frame_size = 0.0 + + for camera in self.config.cameras.values(): + if ( + camera.enabled_in_config + and camera.detect.width + and camera.detect.height + ): + cam_total_frame_size += round( + (camera.detect.width * camera.detect.height * 1.5 + 270480) + / 1048576, + 1, + ) + + # leave room for 2 cameras that are added dynamically, if a user wants to add more cameras they may need to increase the SHM size and restart after adding them. + cam_total_frame_size += 2 * round( + (camera.detect.width * camera.detect.height * 1.5 + 270480) / 1048576, + 1, + ) + + if cam_total_frame_size == 0.0: + return 0 + + shm_frame_count = min( + int(os.environ.get(SHM_FRAMES_VAR, "50")), + int(available_shm / (cam_total_frame_size)), + ) + + logger.debug( + f"Calculated total camera size {available_shm} / {cam_total_frame_size} :: {shm_frame_count} frames for each camera in SHM" + ) + + if shm_frame_count < 20: + logger.warning( + f"The current SHM size of {total_shm}MB is too small, recommend increasing it to at least {round(min_req_shm + cam_total_frame_size * 20)}MB." + ) + + return shm_frame_count + + def __start_camera_processor( + self, name: str, config: CameraConfig, runtime: bool = False + ) -> None: + if not config.enabled_in_config: + logger.info(f"Camera processor not started for disabled camera {name}") + return + + if runtime: + self.camera_metrics[name] = CameraMetrics() + self.ptz_metrics[name] = PTZMetrics(autotracker_enabled=False) + self.region_grids[name] = get_camera_regions_grid( + name, + config.detect, + max(self.config.model.width, self.config.model.height), + ) + + try: + largest_frame = max( + [ + det.model.height * det.model.width * 3 + if det.model is not None + else 320 + for det in self.config.detectors.values() + ] + ) + UntrackedSharedMemory(name=f"out-{name}", create=True, size=20 * 6 * 4) + UntrackedSharedMemory( + name=name, + create=True, + size=largest_frame, + ) + except FileExistsError: + pass + + camera_process = FrigateProcess( + target=track_camera, + name=f"camera_processor:{name}", + args=( + config.name, + config, + self.config.model, + self.config.model.merged_labelmap, + self.detection_queue, + self.detected_frames_queue, + self.camera_metrics[name], + self.ptz_metrics[name], + self.region_grids[name], + ), + daemon=True, + ) + self.camera_metrics[config.name].process = camera_process + camera_process.start() + logger.info(f"Camera processor started for {config.name}: {camera_process.pid}") + + def __start_camera_capture( + self, name: str, config: CameraConfig, runtime: bool = False + ) -> None: + if not config.enabled_in_config: + logger.info(f"Capture process not started for disabled camera {name}") + return + + # pre-create shms + for i in range(10 if runtime else self.shm_count): + frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1] + self.frame_manager.create(f"{config.name}_frame{i}", frame_size) + + capture_process = FrigateProcess( + target=capture_camera, + name=f"camera_capture:{name}", + args=(config, self.shm_count, self.camera_metrics[name]), + ) + capture_process.daemon = True + self.camera_metrics[name].capture_process = capture_process + capture_process.start() + logger.info(f"Capture process started for {name}: {capture_process.pid}") + + def __stop_camera_capture_process(self, camera: str) -> None: + capture_process = self.camera_metrics[camera].capture_process + if capture_process is not None: + logger.info(f"Waiting for capture process for {camera} to stop") + capture_process.terminate() + capture_process.join() + + def __stop_camera_process(self, camera: str) -> None: + metrics = self.camera_metrics[camera] + camera_process = metrics.process + if camera_process is not None: + logger.info(f"Waiting for process for {camera} to stop") + camera_process.terminate() + camera_process.join() + logger.info(f"Closing frame queue for {camera}") + empty_and_close_queue(metrics.frame_queue) + + def run(self): + self.__init_historical_regions() + + # start camera processes + for camera, config in self.config.cameras.items(): + self.__start_camera_processor(camera, config) + self.__start_camera_capture(camera, config) + + while not self.stop_event.wait(1): + updates = self.update_subscriber.check_for_updates() + + for update_type, updated_cameras in updates.items(): + if update_type == CameraConfigUpdateEnum.add.name: + for camera in updated_cameras: + self.__start_camera_processor( + camera, + self.update_subscriber.camera_configs[camera], + runtime=True, + ) + self.__start_camera_capture( + camera, self.update_subscriber.camera_configs[camera] + ) + elif update_type == CameraConfigUpdateEnum.remove.name: + self.__stop_camera_capture_process(camera) + self.__stop_camera_process(camera) + + # ensure the capture processes are done + for camera in self.camera_metrics.keys(): + self.__stop_camera_capture_process(camera) + + # ensure the camera processors are done + for camera in self.camera_metrics.keys(): + self.__stop_camera_process(camera) + + self.update_subscriber.stop() + self.frame_manager.cleanup() diff --git a/frigate/comms/object_detector_signaler.py b/frigate/comms/object_detector_signaler.py new file mode 100644 index 000000000..befc83e4d --- /dev/null +++ b/frigate/comms/object_detector_signaler.py @@ -0,0 +1,21 @@ +"""Facilitates communication between processes for object detection signals.""" + +from .zmq_proxy import Publisher, Subscriber + + +class ObjectDetectorPublisher(Publisher): + """Publishes signal for object detection to different processes.""" + + topic_base = "object_detector/" + + +class ObjectDetectorSubscriber(Subscriber): + """Simplifies receiving a signal for object detection.""" + + topic_base = "object_detector/" + + def __init__(self, topic: str) -> None: + super().__init__(topic) + + def check_for_update(self): + return super().check_for_update(timeout=5) diff --git a/frigate/comms/webpush.py b/frigate/comms/webpush.py index 91027d1a4..c50a91e94 100644 --- a/frigate/comms/webpush.py +++ b/frigate/comms/webpush.py @@ -81,7 +81,7 @@ class WebPushClient(Communicator): # type: ignore[misc] "config/notifications", exact=True ) self.config_subscriber = CameraConfigUpdateSubscriber( - self.config.cameras, [CameraConfigUpdateEnum.notifications] + self.config, self.config.cameras, [CameraConfigUpdateEnum.notifications] ) def subscribe(self, receiver: Callable) -> None: @@ -170,7 +170,12 @@ class WebPushClient(Communicator): # type: ignore[misc] if updated_notification_config: self.config.notifications = updated_notification_config - self.config_subscriber.check_for_updates() + updates = self.config_subscriber.check_for_updates() + + if "add" in updates: + for camera in updates["add"]: + self.suspended_cameras[camera] = 0 + self.last_camera_notification_time[camera] = 0 if topic == "reviews": decoded = json.loads(payload) diff --git a/frigate/config/camera/updater.py b/frigate/config/camera/updater.py index 5ddc26d44..83536fc46 100644 --- a/frigate/config/camera/updater.py +++ b/frigate/config/camera/updater.py @@ -5,12 +5,13 @@ from enum import Enum from typing import Any from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber -from frigate.config import CameraConfig +from frigate.config import CameraConfig, FrigateConfig class CameraConfigUpdateEnum(str, Enum): """Supported camera config update types.""" + add = "add" # for adding a camera audio = "audio" audio_transcription = "audio_transcription" birdseye = "birdseye" @@ -20,6 +21,7 @@ class CameraConfigUpdateEnum(str, Enum): notifications = "notifications" objects = "objects" record = "record" + remove = "remove" # for removing a camera review = "review" snapshots = "snapshots" zones = "zones" @@ -49,9 +51,11 @@ class CameraConfigUpdatePublisher: class CameraConfigUpdateSubscriber: def __init__( self, + config: FrigateConfig | None, camera_configs: dict[str, CameraConfig], topics: list[CameraConfigUpdateEnum], ): + self.config = config self.camera_configs = camera_configs self.topics = topics @@ -68,14 +72,23 @@ class CameraConfigUpdateSubscriber: def __update_config( self, camera: str, update_type: CameraConfigUpdateEnum, updated_config: Any ) -> None: - config = self.camera_configs[camera] + if update_type == CameraConfigUpdateEnum.add: + self.config.cameras[camera] = updated_config + self.camera_configs[camera] = updated_config + return + elif update_type == CameraConfigUpdateEnum.remove: + self.config.cameras.pop(camera) + self.camera_configs.pop(camera) + return + + config = self.camera_configs.get(camera) if not config: return if update_type == CameraConfigUpdateEnum.audio: config.audio = updated_config - if update_type == CameraConfigUpdateEnum.audio_transcription: + elif update_type == CameraConfigUpdateEnum.audio_transcription: config.audio_transcription = updated_config elif update_type == CameraConfigUpdateEnum.birdseye: config.birdseye = updated_config diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index ce81c2bc4..0980a8ae8 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -29,6 +29,10 @@ from frigate.comms.recordings_updater import ( ) from frigate.config import FrigateConfig from frigate.config.camera.camera import CameraTypeEnum +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import ( CLIPS_DIR, UPDATE_EVENT_DESCRIPTION, @@ -87,6 +91,11 @@ class EmbeddingMaintainer(threading.Thread): self.config = config self.metrics = metrics self.embeddings = None + self.config_updater = CameraConfigUpdateSubscriber( + self.config, + self.config.cameras, + [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove], + ) if config.semantic_search.enabled: self.embeddings = Embeddings(config, db, metrics) @@ -198,6 +207,7 @@ class EmbeddingMaintainer(threading.Thread): def run(self) -> None: """Maintain a SQLite-vec database for semantic search.""" while not self.stop_event.is_set(): + self.config_updater.check_for_updates() self._process_requests() self._process_updates() self._process_recordings_updates() @@ -206,6 +216,7 @@ class EmbeddingMaintainer(threading.Thread): self._process_finalized() self._process_event_metadata() + self.config_updater.stop() self.event_subscriber.stop() self.event_end_subscriber.stop() self.recordings_subscriber.stop() diff --git a/frigate/events/audio.py b/frigate/events/audio.py index aeeaf3b4f..797a767ba 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -90,10 +90,19 @@ class AudioProcessor(util.Process): self.camera_metrics = camera_metrics self.cameras = cameras self.config = config - self.transcription_model_runner = AudioTranscriptionModelRunner( - self.config.audio_transcription.device, - self.config.audio_transcription.model_size, - ) + + if any( + [ + conf.audio_transcription.enabled_in_config + for conf in config.cameras.values() + ] + ): + self.transcription_model_runner = AudioTranscriptionModelRunner( + self.config.audio_transcription.device, + self.config.audio_transcription.model_size, + ) + else: + self.transcription_model_runner = None def run(self) -> None: audio_threads: list[AudioEventMaintainer] = [] @@ -138,7 +147,7 @@ class AudioEventMaintainer(threading.Thread): camera: CameraConfig, config: FrigateConfig, camera_metrics: dict[str, CameraMetrics], - audio_transcription_model_runner: AudioTranscriptionModelRunner, + audio_transcription_model_runner: AudioTranscriptionModelRunner | None, stop_event: threading.Event, ) -> None: super().__init__(name=f"{camera.name}_audio_event_processor") @@ -162,6 +171,7 @@ class AudioEventMaintainer(threading.Thread): # create communication for audio detections self.requestor = InterProcessRequestor() self.config_subscriber = CameraConfigUpdateSubscriber( + None, {self.camera_config.name: self.camera_config}, [ CameraConfigUpdateEnum.audio, diff --git a/frigate/object_detection/base.py b/frigate/object_detection/base.py index c77a720a0..86febc6a7 100644 --- a/frigate/object_detection/base.py +++ b/frigate/object_detection/base.py @@ -13,6 +13,10 @@ import numpy as np from setproctitle import setproctitle import frigate.util as util +from frigate.comms.object_detector_signaler import ( + ObjectDetectorPublisher, + ObjectDetectorSubscriber, +) from frigate.detectors import create_detector from frigate.detectors.detector_config import ( BaseDetectorConfig, @@ -89,7 +93,7 @@ class LocalObjectDetector(ObjectDetector): def run_detector( name: str, detection_queue: Queue, - out_events: dict[str, MpEvent], + cameras: list[str], avg_speed: Value, start: Value, detector_config: BaseDetectorConfig, @@ -108,15 +112,19 @@ def run_detector( signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGINT, receiveSignal) - frame_manager = SharedMemoryFrameManager() - object_detector = LocalObjectDetector(detector_config=detector_config) - - outputs = {} - for name in out_events.keys(): + def create_output_shm(name: str): out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) outputs[name] = {"shm": out_shm, "np": out_np} + frame_manager = SharedMemoryFrameManager() + object_detector = LocalObjectDetector(detector_config=detector_config) + detector_publisher = ObjectDetectorPublisher() + + outputs = {} + for name in cameras: + create_output_shm(name) + while not stop_event.is_set(): try: connection_id = detection_queue.get(timeout=1) @@ -136,12 +144,18 @@ def run_detector( detections = object_detector.detect_raw(input_frame) duration = datetime.datetime.now().timestamp() - start.value frame_manager.close(connection_id) + + if connection_id not in outputs: + create_output_shm(connection_id) + outputs[connection_id]["np"][:] = detections[:] - out_events[connection_id].set() + signal_id = f"{connection_id}/update" + detector_publisher.publish(signal_id, signal_id) start.value = 0.0 avg_speed.value = (avg_speed.value * 9 + duration) / 10 + detector_publisher.stop() logger.info("Exited detection process...") @@ -150,11 +164,11 @@ class ObjectDetectProcess: self, name: str, detection_queue: Queue, - out_events: dict[str, MpEvent], + cameras: list[str], detector_config: BaseDetectorConfig, ): self.name = name - self.out_events = out_events + self.cameras = cameras self.detection_queue = detection_queue self.avg_inference_speed = Value("d", 0.01) self.detection_start = Value("d", 0.0) @@ -185,7 +199,7 @@ class ObjectDetectProcess: args=( self.name, self.detection_queue, - self.out_events, + self.cameras, self.avg_inference_speed, self.detection_start, self.detector_config, @@ -201,7 +215,6 @@ class RemoteObjectDetector: name: str, labels: dict[int, str], detection_queue: Queue, - event: MpEvent, model_config: ModelConfig, stop_event: MpEvent, ): @@ -209,7 +222,6 @@ class RemoteObjectDetector: self.name = name self.fps = EventsPerSecond() self.detection_queue = detection_queue - self.event = event self.stop_event = stop_event self.shm = UntrackedSharedMemory(name=self.name, create=False) self.np_shm = np.ndarray( @@ -219,6 +231,7 @@ class RemoteObjectDetector: ) self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) + self.detector_subscriber = ObjectDetectorSubscriber(f"{name}/update") def detect(self, tensor_input, threshold=0.4): detections = [] @@ -228,9 +241,8 @@ class RemoteObjectDetector: # copy input to shared memory self.np_shm[:] = tensor_input[:] - self.event.clear() self.detection_queue.put(self.name) - result = self.event.wait(timeout=5.0) + result = self.detector_subscriber.check_for_update() # if it timed out if result is None: @@ -246,5 +258,6 @@ class RemoteObjectDetector: return detections def cleanup(self): + self.detector_subscriber.stop() self.shm.unlink() self.out_shm.unlink() diff --git a/frigate/output/output.py b/frigate/output/output.py index 6decf0005..d323596fe 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -103,8 +103,10 @@ def output_frames( detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) config_subscriber = CameraConfigUpdateSubscriber( + config, config.cameras, [ + CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.birdseye, CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.record, @@ -135,7 +137,15 @@ def output_frames( while not stop_event.is_set(): # check if there is an updated config - config_subscriber.check_for_updates() + updates = config_subscriber.check_for_updates() + + if "add" in updates: + for camera in updates["add"]: + jsmpeg_cameras[camera] = JsmpegCamera( + cam_config, stop_event, websocket_server + ) + preview_recorders[camera] = PreviewRecorder(cam_config) + preview_write_times[camera] = 0 (topic, data) = detection_subscriber.check_for_update(timeout=1) now = datetime.datetime.now().timestamp() diff --git a/frigate/ptz/autotrack.py b/frigate/ptz/autotrack.py index f38bf1f5f..f0d8419dd 100644 --- a/frigate/ptz/autotrack.py +++ b/frigate/ptz/autotrack.py @@ -31,7 +31,7 @@ from frigate.const import ( ) from frigate.ptz.onvif import OnvifController from frigate.track.tracked_object import TrackedObject -from frigate.util.builtin import update_yaml_file +from frigate.util.builtin import update_yaml_file_bulk from frigate.util.config import find_config_file from frigate.util.image import SharedMemoryFrameManager, intersection_over_union @@ -348,10 +348,13 @@ class PtzAutoTracker: f"{camera}: Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}" ) - update_yaml_file( + update_yaml_file_bulk( config_file, - ["cameras", camera, "onvif", "autotracking", "movement_weights"], - self.config.cameras[camera].onvif.autotracking.movement_weights, + { + f"cameras.{camera}.onvif.autotracking.movement_weights": self.config.cameras[ + camera + ].onvif.autotracking.movement_weights + }, ) async def _calibrate_camera(self, camera): diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index ace9a5d24..0883437da 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -75,7 +75,9 @@ class RecordingMaintainer(threading.Thread): # create communication for retained recordings self.requestor = InterProcessRequestor() self.config_subscriber = CameraConfigUpdateSubscriber( - self.config.cameras, [CameraConfigUpdateEnum.record] + self.config, + self.config.cameras, + [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record], ) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.recordings_publisher = RecordingsDataPublisher( diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 7f60a0209..778717db3 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -154,10 +154,13 @@ class ReviewSegmentMaintainer(threading.Thread): # create communication for review segments self.requestor = InterProcessRequestor() self.config_subscriber = CameraConfigUpdateSubscriber( + config, config.cameras, [ + CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.record, + CameraConfigUpdateEnum.remove, CameraConfigUpdateEnum.review, ], ) diff --git a/frigate/track/object_processing.py b/frigate/track/object_processing.py index af4f7bd42..6409dd925 100644 --- a/frigate/track/object_processing.py +++ b/frigate/track/object_processing.py @@ -66,9 +66,15 @@ class TrackedObjectProcessor(threading.Thread): self.last_motion_detected: dict[str, float] = {} self.ptz_autotracker_thread = ptz_autotracker_thread - self.config_subscriber = CameraConfigUpdateSubscriber( + self.camera_config_subscriber = CameraConfigUpdateSubscriber( + self.config, self.config.cameras, - [CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.zones], + [ + CameraConfigUpdateEnum.add, + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.remove, + CameraConfigUpdateEnum.zones, + ], ) self.requestor = InterProcessRequestor() @@ -91,6 +97,12 @@ class TrackedObjectProcessor(threading.Thread): self.zone_data = defaultdict(lambda: defaultdict(dict)) self.active_zone_data = defaultdict(lambda: defaultdict(dict)) + for camera in self.config.cameras.keys(): + self.create_camera_state(camera) + + def create_camera_state(self, camera: str) -> None: + """Creates a new camera state.""" + def start(camera: str, obj: TrackedObject, frame_name: str): self.event_sender.publish( ( @@ -198,17 +210,16 @@ class TrackedObjectProcessor(threading.Thread): self.camera_activity[camera] = activity self.requestor.send_data(UPDATE_CAMERA_ACTIVITY, self.camera_activity) - for camera in self.config.cameras.keys(): - camera_state = CameraState( - camera, self.config, self.frame_manager, self.ptz_autotracker_thread - ) - camera_state.on("start", start) - camera_state.on("autotrack", autotrack) - camera_state.on("update", update) - camera_state.on("end", end) - camera_state.on("snapshot", snapshot) - camera_state.on("camera_activity", camera_activity) - self.camera_states[camera] = camera_state + camera_state = CameraState( + camera, self.config, self.frame_manager, self.ptz_autotracker_thread + ) + camera_state.on("start", start) + camera_state.on("autotrack", autotrack) + camera_state.on("update", update) + camera_state.on("end", end) + camera_state.on("snapshot", snapshot) + camera_state.on("camera_activity", camera_activity) + self.camera_states[camera] = camera_state def should_save_snapshot(self, camera, obj: TrackedObject): if obj.false_positive: @@ -582,7 +593,7 @@ class TrackedObjectProcessor(threading.Thread): def run(self): while not self.stop_event.is_set(): # check for config updates - updated_topics = self.config_subscriber.check_for_updates() + updated_topics = self.camera_config_subscriber.check_for_updates() if "enabled" in updated_topics: for camera in updated_topics["enabled"]: @@ -590,6 +601,17 @@ class TrackedObjectProcessor(threading.Thread): self.camera_states[camera].prev_enabled = self.config.cameras[ camera ].enabled + elif "add" in updated_topics: + for camera in updated_topics["add"]: + self.config.cameras[camera] = ( + self.camera_config_subscriber.camera_configs[camera] + ) + self.create_camera_state(camera) + elif "remove" in updated_topics: + for camera in updated_topics["remove"]: + camera_state = self.camera_states[camera] + camera_state.shutdown() + self.camera_states.pop(camera) # manage camera disabled state for camera, config in self.config.cameras.items(): @@ -698,6 +720,6 @@ class TrackedObjectProcessor(threading.Thread): self.event_sender.stop() self.event_end_subscriber.stop() self.sub_label_subscriber.stop() - self.config_subscriber.stop() + self.camera_config_subscriber.stop() logger.info("Exiting object processor...") diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index 52280ecd8..0433af18e 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -14,7 +14,7 @@ import urllib.parse from collections.abc import Mapping from multiprocessing.sharedctypes import Synchronized from pathlib import Path -from typing import Any, Optional, Tuple, Union +from typing import Any, Dict, Optional, Tuple, Union from zoneinfo import ZoneInfoNotFoundError import numpy as np @@ -184,25 +184,12 @@ def create_mask(frame_shape, mask): mask_img[:] = 255 -def update_yaml_from_url(file_path, url): - parsed_url = urllib.parse.urlparse(url) - query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True) - - # Filter out empty keys but keep blank values for non-empty keys - query_string = {k: v for k, v in query_string.items() if k} - +def process_config_query_string(query_string: Dict[str, list]) -> Dict[str, Any]: + updates = {} for key_path_str, new_value_list in query_string.items(): - key_path = key_path_str.split(".") - for i in range(len(key_path)): - try: - index = int(key_path[i]) - key_path[i] = (key_path[i - 1], index) - key_path.pop(i - 1) - except ValueError: - pass - + # use the string key as-is for updates dictionary if len(new_value_list) > 1: - update_yaml_file(file_path, key_path, new_value_list) + updates[key_path_str] = new_value_list else: value = new_value_list[0] try: @@ -210,10 +197,24 @@ def update_yaml_from_url(file_path, url): value = ast.literal_eval(value) if "," not in value else value except (ValueError, SyntaxError): pass - update_yaml_file(file_path, key_path, value) + updates[key_path_str] = value + return updates -def update_yaml_file(file_path, key_path, new_value): +def flatten_config_data( + config_data: Dict[str, Any], parent_key: str = "" +) -> Dict[str, Any]: + items = [] + for key, value in config_data.items(): + new_key = f"{parent_key}.{key}" if parent_key else key + if isinstance(value, dict): + items.extend(flatten_config_data(value, new_key).items()) + else: + items.append((new_key, value)) + return dict(items) + + +def update_yaml_file_bulk(file_path: str, updates: Dict[str, Any]): yaml = YAML() yaml.indent(mapping=2, sequence=4, offset=2) @@ -226,7 +227,17 @@ def update_yaml_file(file_path, key_path, new_value): ) return - data = update_yaml(data, key_path, new_value) + # Apply all updates + for key_path_str, new_value in updates.items(): + key_path = key_path_str.split(".") + for i in range(len(key_path)): + try: + index = int(key_path[i]) + key_path[i] = (key_path[i - 1], index) + key_path.pop(i - 1) + except ValueError: + pass + data = update_yaml(data, key_path, new_value) try: with open(file_path, "w") as f: diff --git a/frigate/video.py b/frigate/video.py index 5fc70ca02..03377d01a 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -116,7 +116,7 @@ def capture_frames( skipped_eps = EventsPerSecond() skipped_eps.start() config_subscriber = CameraConfigUpdateSubscriber( - {config.name: config}, [CameraConfigUpdateEnum.enabled] + None, {config.name: config}, [CameraConfigUpdateEnum.enabled] ) def get_enabled_state(): @@ -196,7 +196,7 @@ class CameraWatchdog(threading.Thread): self.sleeptime = self.config.ffmpeg.retry_interval self.config_subscriber = CameraConfigUpdateSubscriber( - {config.name: config}, [CameraConfigUpdateEnum.enabled] + None, {config.name: config}, [CameraConfigUpdateEnum.enabled] ) self.was_enabled = self.config.enabled @@ -473,7 +473,6 @@ def track_camera( model_config: ModelConfig, labelmap: dict[int, str], detection_queue: Queue, - result_connection: MpEvent, detected_objects_queue, camera_metrics: CameraMetrics, ptz_metrics: PTZMetrics, @@ -503,7 +502,7 @@ def track_camera( ptz_metrics=ptz_metrics, ) object_detector = RemoteObjectDetector( - name, labelmap, detection_queue, result_connection, model_config, stop_event + name, labelmap, detection_queue, model_config, stop_event ) object_tracker = NorfairTracker(config, ptz_metrics) @@ -597,6 +596,7 @@ def process_frames( ): next_region_update = get_tomorrow_at_time(2) config_subscriber = CameraConfigUpdateSubscriber( + None, {camera_name: camera_config}, [ CameraConfigUpdateEnum.detect,