Dynamic Management of Cameras (#18671)

* Add base class for global config updates

* Add or remove camera states

* Move camera process management to separate thread

* Move camera management fully to separate class

* Cleanup

* Stop camera processes when stop command is sent

* Start processes dynamically when needed

* Adjust

* Leave extra room in tracked object queue for two cameras

* Dynamically set extra config pieces

* Add some TODOs

* Fix type check

* Simplify config updates

* Improve typing

* Correctly handle indexed entries

* Cleanup

* Create out SHM

* Use ZMQ for signaling object detectoin is completed

* Get camera correctly created

* Cleanup for updating the cameras config

* Cleanup

* Don't enable audio if no cameras have audio transcription

* Use exact string so similar camera names don't interfere

* Add ability to update config via json body to config/set endpoint

Additionally, update the config in a single rather than multiple calls for each updated key

* fix autotracking calibration to support new config updater function

---------

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
This commit is contained in:
Nicolas Mowen 2025-06-11 11:25:30 -06:00 committed by GitHub
parent 28fba7122d
commit 6f16ecdd48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 533 additions and 228 deletions

View File

@ -6,6 +6,7 @@ import json
import logging
import os
import traceback
import urllib
from datetime import datetime, timedelta
from functools import reduce
from io import StringIO
@ -36,8 +37,10 @@ from frigate.models import Event, Timeline
from frigate.stats.prometheus import get_metrics, update_metrics
from frigate.util.builtin import (
clean_camera_user_pass,
flatten_config_data,
get_tz_modifiers,
update_yaml_from_url,
process_config_query_string,
update_yaml_file_bulk,
)
from frigate.util.config import find_config_file
from frigate.util.services import (
@ -358,14 +361,37 @@ def config_set(request: Request, body: AppConfigSetBody):
with open(config_file, "r") as f:
old_raw_config = f.read()
f.close()
try:
update_yaml_from_url(config_file, str(request.url))
updates = {}
# process query string parameters (takes precedence over body.config_data)
parsed_url = urllib.parse.urlparse(str(request.url))
query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
# Filter out empty keys but keep blank values for non-empty keys
query_string = {k: v for k, v in query_string.items() if k}
if query_string:
updates = process_config_query_string(query_string)
elif body.config_data:
updates = flatten_config_data(body.config_data)
if not updates:
return JSONResponse(
content=(
{"success": False, "message": "No configuration data provided"}
),
status_code=400,
)
# apply all updates in a single operation
update_yaml_file_bulk(config_file, updates)
# validate the updated config
with open(config_file, "r") as f:
new_raw_config = f.read()
f.close()
# Validate the config schema
try:
config = FrigateConfig.parse(new_raw_config)
except Exception:
@ -390,12 +416,19 @@ def config_set(request: Request, body: AppConfigSetBody):
)
if body.requires_restart == 0 or body.update_topic:
old_config: FrigateConfig = request.app.frigate_config
request.app.frigate_config = config
if body.update_topic and body.update_topic.startswith("config/cameras/"):
_, _, camera, field = body.update_topic.split("/")
settings = config.get_nested_object(body.update_topic)
if field == "add":
settings = config.cameras[camera]
elif field == "remove":
settings = old_config.cameras[camera]
else:
settings = config.get_nested_object(body.update_topic)
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum[field], camera),
settings,

View File

@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Dict, Optional
from pydantic import BaseModel
@ -6,6 +6,7 @@ from pydantic import BaseModel
class AppConfigSetBody(BaseModel):
requires_restart: int = 1
update_topic: str | None = None
config_data: Optional[Dict[str, Any]] = None
class AppPutPasswordBody(BaseModel):

View File

@ -17,6 +17,7 @@ import frigate.util as util
from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.camera.maintainer import CameraMaintainer
from frigate.comms.base_communicator import Communicator
from frigate.comms.dispatcher import Dispatcher
from frigate.comms.event_metadata_updater import EventMetadataPublisher
@ -35,7 +36,6 @@ from frigate.const import (
FACE_DIR,
MODEL_CACHE_DIR,
RECORD_DIR,
SHM_FRAMES_VAR,
THUMB_DIR,
)
from frigate.data_processing.types import DataProcessorMetrics
@ -69,10 +69,8 @@ from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid
from frigate.util.image import UntrackedSharedMemory
from frigate.version import VERSION
from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog
logger = logging.getLogger(__name__)
@ -84,7 +82,6 @@ class FrigateApp:
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetrics] = {}
@ -101,8 +98,6 @@ class FrigateApp:
self.ptz_metrics: dict[str, PTZMetrics] = {}
self.processes: dict[str, int] = {}
self.embeddings: Optional[EmbeddingsContext] = None
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
self.frame_manager = SharedMemoryFrameManager()
self.config = config
def ensure_dirs(self) -> None:
@ -138,8 +133,16 @@ class FrigateApp:
def init_queues(self) -> None:
# Queue for cameras to push tracked objects to
# leaving room for 2 extra cameras to be added
self.detected_frames_queue: Queue = mp.Queue(
maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2
maxsize=(
sum(
camera.enabled_in_config == True
for camera in self.config.cameras.values()
)
+ 2
)
* 2
)
# Queue for timeline events
@ -276,7 +279,9 @@ class FrigateApp:
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
60,
10
* len([c for c in self.config.cameras.values() if c.enabled_in_config]),
),
load_vec_extension=self.config.semantic_search.enabled,
)
@ -306,7 +311,9 @@ class FrigateApp:
def init_embeddings_client(self) -> None:
genai_cameras = [
c for c in self.config.cameras.values() if c.enabled and c.genai.enabled
c
for c in self.config.cameras.values()
if c.enabled_in_config and c.genai.enabled
]
if (
@ -355,8 +362,6 @@ class FrigateApp:
def start_detectors(self) -> None:
for name in self.config.cameras.keys():
self.detection_out_events[name] = mp.Event()
try:
largest_frame = max(
[
@ -388,7 +393,7 @@ class FrigateApp:
self.detectors[name] = ObjectDetectProcess(
name,
self.detection_queue,
self.detection_out_events,
list(self.config.cameras.keys()),
detector_config,
)
@ -423,69 +428,16 @@ class FrigateApp:
output_processor.start()
logger.info(f"Output process started: {output_processor.pid}")
def init_historical_regions(self) -> None:
# delete region grids for removed or renamed cameras
cameras = list(self.config.cameras.keys())
Regions.delete().where(~(Regions.camera << cameras)).execute()
# create or update region grids for each camera
for camera in self.config.cameras.values():
assert camera.name is not None
self.region_grids[camera.name] = get_camera_regions_grid(
camera.name,
camera.detect,
max(self.config.model.width, self.config.model.height),
)
def start_camera_processors(self) -> None:
for name, config in self.config.cameras.items():
if not self.config.cameras[name].enabled_in_config:
logger.info(f"Camera processor not started for disabled camera {name}")
continue
camera_process = util.Process(
target=track_camera,
name=f"camera_processor:{name}",
args=(
name,
config,
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detection_out_events[name],
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
),
daemon=True,
)
self.camera_metrics[name].process = camera_process
camera_process.start()
logger.info(f"Camera processor started for {name}: {camera_process.pid}")
def start_camera_capture_processes(self) -> None:
shm_frame_count = self.shm_frame_count()
for name, config in self.config.cameras.items():
if not self.config.cameras[name].enabled_in_config:
logger.info(f"Capture process not started for disabled camera {name}")
continue
# pre-create shms
for i in range(shm_frame_count):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
capture_process = util.Process(
target=capture_camera,
name=f"camera_capture:{name}",
args=(config, shm_frame_count, self.camera_metrics[name]),
)
capture_process.daemon = True
self.camera_metrics[name].capture_process = capture_process
capture_process.start()
logger.info(f"Capture process started for {name}: {capture_process.pid}")
def start_camera_processor(self) -> None:
self.camera_maintainer = CameraMaintainer(
self.config,
self.detection_queue,
self.detected_frames_queue,
self.camera_metrics,
self.ptz_metrics,
self.stop_event,
)
self.camera_maintainer.start()
def start_audio_processor(self) -> None:
audio_cameras = [
@ -545,45 +497,6 @@ class FrigateApp:
self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event)
self.frigate_watchdog.start()
def shm_frame_count(self) -> int:
total_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1)
# required for log files + nginx cache
min_req_shm = 40 + 10
if self.config.birdseye.restream:
min_req_shm += 8
available_shm = total_shm - min_req_shm
cam_total_frame_size = 0.0
for camera in self.config.cameras.values():
if camera.enabled and camera.detect.width and camera.detect.height:
cam_total_frame_size += round(
(camera.detect.width * camera.detect.height * 1.5 + 270480)
/ 1048576,
1,
)
if cam_total_frame_size == 0.0:
return 0
shm_frame_count = min(
int(os.environ.get(SHM_FRAMES_VAR, "50")),
int(available_shm / (cam_total_frame_size)),
)
logger.debug(
f"Calculated total camera size {available_shm} / {cam_total_frame_size} :: {shm_frame_count} frames for each camera in SHM"
)
if shm_frame_count < 20:
logger.warning(
f"The current SHM size of {total_shm}MB is too small, recommend increasing it to at least {round(min_req_shm + cam_total_frame_size * 20)}MB."
)
return shm_frame_count
def init_auth(self) -> None:
if self.config.auth.enabled:
if User.select().count() == 0:
@ -650,10 +563,8 @@ class FrigateApp:
self.init_embeddings_client()
self.start_video_output_processor()
self.start_ptz_autotracker()
self.init_historical_regions()
self.start_detected_frames_processor()
self.start_camera_processors()
self.start_camera_capture_processes()
self.start_camera_processor()
self.start_audio_processor()
self.start_storage_maintainer()
self.start_stats_emitter()
@ -707,24 +618,6 @@ class FrigateApp:
if self.onvif_controller:
self.onvif_controller.close()
# ensure the capture processes are done
for camera, metrics in self.camera_metrics.items():
capture_process = metrics.capture_process
if capture_process is not None:
logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.terminate()
capture_process.join()
# ensure the camera processors are done
for camera, metrics in self.camera_metrics.items():
camera_process = metrics.process
if camera_process is not None:
logger.info(f"Waiting for process for {camera} to stop")
camera_process.terminate()
camera_process.join()
logger.info(f"Closing frame queue for {camera}")
empty_and_close_queue(metrics.frame_queue)
# ensure the detectors are done
for detector in self.detectors.values():
detector.stop()
@ -769,7 +662,6 @@ class FrigateApp:
self.event_metadata_updater.stop()
self.inter_zmq_proxy.stop()
self.frame_manager.cleanup()
while len(self.detection_shms) > 0:
shm = self.detection_shms.pop()
shm.close()

View File

@ -3,7 +3,7 @@
from collections import Counter
from typing import Any, Callable
from frigate.config.config import FrigateConfig
from frigate.config import CameraConfig, FrigateConfig
class CameraActivityManager:
@ -23,26 +23,33 @@ class CameraActivityManager:
if not camera_config.enabled_in_config:
continue
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
self.__init_camera(camera_config)
for zone, zone_config in camera_config.zones.items():
if zone not in self.all_zone_labels:
self.zone_all_object_counts[zone] = Counter()
self.zone_active_object_counts[zone] = Counter()
self.all_zone_labels[zone] = set()
def __init_camera(self, camera_config: CameraConfig) -> None:
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
self.all_zone_labels[zone].update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
for zone, zone_config in camera_config.zones.items():
if zone not in self.all_zone_labels:
self.zone_all_object_counts[zone] = Counter()
self.zone_active_object_counts[zone] = Counter()
self.all_zone_labels[zone] = set()
self.all_zone_labels[zone].update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
all_objects: list[dict[str, Any]] = []
for camera in new_activity.keys():
# handle cameras that were added dynamically
if camera not in self.camera_all_object_counts:
self.__init_camera(self.config.cameras[camera])
new_objects = new_activity[camera].get("objects", [])
all_objects.extend(new_objects)

View File

@ -0,0 +1,248 @@
"""Create and maintain camera processes / management."""
import logging
import os
import shutil
import threading
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.config import FrigateConfig
from frigate.config.camera import CameraConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import SHM_FRAMES_VAR
from frigate.models import Regions
from frigate.util import Process as FrigateProcess
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid
from frigate.video import capture_camera, track_camera
logger = logging.getLogger(__name__)
class CameraMaintainer(threading.Thread):
def __init__(
self,
config: FrigateConfig,
detection_queue: Queue,
detected_frames_queue: Queue,
camera_metrics: dict[str, CameraMetrics],
ptz_metrics: dict[str, PTZMetrics],
stop_event: MpEvent,
):
super().__init__(name="camera_processor")
self.config = config
self.detection_queue = detection_queue
self.detected_frames_queue = detected_frames_queue
self.stop_event = stop_event
self.camera_metrics = camera_metrics
self.ptz_metrics = ptz_metrics
self.frame_manager = SharedMemoryFrameManager()
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
self.update_subscriber = CameraConfigUpdateSubscriber(
self.config,
{},
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.remove,
],
)
self.shm_count = self.__calculate_shm_frame_count()
def __init_historical_regions(self) -> None:
# delete region grids for removed or renamed cameras
cameras = list(self.config.cameras.keys())
Regions.delete().where(~(Regions.camera << cameras)).execute()
# create or update region grids for each camera
for camera in self.config.cameras.values():
assert camera.name is not None
self.region_grids[camera.name] = get_camera_regions_grid(
camera.name,
camera.detect,
max(self.config.model.width, self.config.model.height),
)
def __calculate_shm_frame_count(self) -> int:
total_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1)
# required for log files + nginx cache
min_req_shm = 40 + 10
if self.config.birdseye.restream:
min_req_shm += 8
available_shm = total_shm - min_req_shm
cam_total_frame_size = 0.0
for camera in self.config.cameras.values():
if (
camera.enabled_in_config
and camera.detect.width
and camera.detect.height
):
cam_total_frame_size += round(
(camera.detect.width * camera.detect.height * 1.5 + 270480)
/ 1048576,
1,
)
# leave room for 2 cameras that are added dynamically, if a user wants to add more cameras they may need to increase the SHM size and restart after adding them.
cam_total_frame_size += 2 * round(
(camera.detect.width * camera.detect.height * 1.5 + 270480) / 1048576,
1,
)
if cam_total_frame_size == 0.0:
return 0
shm_frame_count = min(
int(os.environ.get(SHM_FRAMES_VAR, "50")),
int(available_shm / (cam_total_frame_size)),
)
logger.debug(
f"Calculated total camera size {available_shm} / {cam_total_frame_size} :: {shm_frame_count} frames for each camera in SHM"
)
if shm_frame_count < 20:
logger.warning(
f"The current SHM size of {total_shm}MB is too small, recommend increasing it to at least {round(min_req_shm + cam_total_frame_size * 20)}MB."
)
return shm_frame_count
def __start_camera_processor(
self, name: str, config: CameraConfig, runtime: bool = False
) -> None:
if not config.enabled_in_config:
logger.info(f"Camera processor not started for disabled camera {name}")
return
if runtime:
self.camera_metrics[name] = CameraMetrics()
self.ptz_metrics[name] = PTZMetrics(autotracker_enabled=False)
self.region_grids[name] = get_camera_regions_grid(
name,
config.detect,
max(self.config.model.width, self.config.model.height),
)
try:
largest_frame = max(
[
det.model.height * det.model.width * 3
if det.model is not None
else 320
for det in self.config.detectors.values()
]
)
UntrackedSharedMemory(name=f"out-{name}", create=True, size=20 * 6 * 4)
UntrackedSharedMemory(
name=name,
create=True,
size=largest_frame,
)
except FileExistsError:
pass
camera_process = FrigateProcess(
target=track_camera,
name=f"camera_processor:{name}",
args=(
config.name,
config,
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
),
daemon=True,
)
self.camera_metrics[config.name].process = camera_process
camera_process.start()
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
def __start_camera_capture(
self, name: str, config: CameraConfig, runtime: bool = False
) -> None:
if not config.enabled_in_config:
logger.info(f"Capture process not started for disabled camera {name}")
return
# pre-create shms
for i in range(10 if runtime else self.shm_count):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
capture_process = FrigateProcess(
target=capture_camera,
name=f"camera_capture:{name}",
args=(config, self.shm_count, self.camera_metrics[name]),
)
capture_process.daemon = True
self.camera_metrics[name].capture_process = capture_process
capture_process.start()
logger.info(f"Capture process started for {name}: {capture_process.pid}")
def __stop_camera_capture_process(self, camera: str) -> None:
capture_process = self.camera_metrics[camera].capture_process
if capture_process is not None:
logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.terminate()
capture_process.join()
def __stop_camera_process(self, camera: str) -> None:
metrics = self.camera_metrics[camera]
camera_process = metrics.process
if camera_process is not None:
logger.info(f"Waiting for process for {camera} to stop")
camera_process.terminate()
camera_process.join()
logger.info(f"Closing frame queue for {camera}")
empty_and_close_queue(metrics.frame_queue)
def run(self):
self.__init_historical_regions()
# start camera processes
for camera, config in self.config.cameras.items():
self.__start_camera_processor(camera, config)
self.__start_camera_capture(camera, config)
while not self.stop_event.wait(1):
updates = self.update_subscriber.check_for_updates()
for update_type, updated_cameras in updates.items():
if update_type == CameraConfigUpdateEnum.add.name:
for camera in updated_cameras:
self.__start_camera_processor(
camera,
self.update_subscriber.camera_configs[camera],
runtime=True,
)
self.__start_camera_capture(
camera, self.update_subscriber.camera_configs[camera]
)
elif update_type == CameraConfigUpdateEnum.remove.name:
self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera)
# ensure the capture processes are done
for camera in self.camera_metrics.keys():
self.__stop_camera_capture_process(camera)
# ensure the camera processors are done
for camera in self.camera_metrics.keys():
self.__stop_camera_process(camera)
self.update_subscriber.stop()
self.frame_manager.cleanup()

View File

@ -0,0 +1,21 @@
"""Facilitates communication between processes for object detection signals."""
from .zmq_proxy import Publisher, Subscriber
class ObjectDetectorPublisher(Publisher):
"""Publishes signal for object detection to different processes."""
topic_base = "object_detector/"
class ObjectDetectorSubscriber(Subscriber):
"""Simplifies receiving a signal for object detection."""
topic_base = "object_detector/"
def __init__(self, topic: str) -> None:
super().__init__(topic)
def check_for_update(self):
return super().check_for_update(timeout=5)

View File

@ -81,7 +81,7 @@ class WebPushClient(Communicator): # type: ignore[misc]
"config/notifications", exact=True
)
self.config_subscriber = CameraConfigUpdateSubscriber(
self.config.cameras, [CameraConfigUpdateEnum.notifications]
self.config, self.config.cameras, [CameraConfigUpdateEnum.notifications]
)
def subscribe(self, receiver: Callable) -> None:
@ -170,7 +170,12 @@ class WebPushClient(Communicator): # type: ignore[misc]
if updated_notification_config:
self.config.notifications = updated_notification_config
self.config_subscriber.check_for_updates()
updates = self.config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
self.suspended_cameras[camera] = 0
self.last_camera_notification_time[camera] = 0
if topic == "reviews":
decoded = json.loads(payload)

View File

@ -5,12 +5,13 @@ from enum import Enum
from typing import Any
from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber
from frigate.config import CameraConfig
from frigate.config import CameraConfig, FrigateConfig
class CameraConfigUpdateEnum(str, Enum):
"""Supported camera config update types."""
add = "add" # for adding a camera
audio = "audio"
audio_transcription = "audio_transcription"
birdseye = "birdseye"
@ -20,6 +21,7 @@ class CameraConfigUpdateEnum(str, Enum):
notifications = "notifications"
objects = "objects"
record = "record"
remove = "remove" # for removing a camera
review = "review"
snapshots = "snapshots"
zones = "zones"
@ -49,9 +51,11 @@ class CameraConfigUpdatePublisher:
class CameraConfigUpdateSubscriber:
def __init__(
self,
config: FrigateConfig | None,
camera_configs: dict[str, CameraConfig],
topics: list[CameraConfigUpdateEnum],
):
self.config = config
self.camera_configs = camera_configs
self.topics = topics
@ -68,14 +72,23 @@ class CameraConfigUpdateSubscriber:
def __update_config(
self, camera: str, update_type: CameraConfigUpdateEnum, updated_config: Any
) -> None:
config = self.camera_configs[camera]
if update_type == CameraConfigUpdateEnum.add:
self.config.cameras[camera] = updated_config
self.camera_configs[camera] = updated_config
return
elif update_type == CameraConfigUpdateEnum.remove:
self.config.cameras.pop(camera)
self.camera_configs.pop(camera)
return
config = self.camera_configs.get(camera)
if not config:
return
if update_type == CameraConfigUpdateEnum.audio:
config.audio = updated_config
if update_type == CameraConfigUpdateEnum.audio_transcription:
elif update_type == CameraConfigUpdateEnum.audio_transcription:
config.audio_transcription = updated_config
elif update_type == CameraConfigUpdateEnum.birdseye:
config.birdseye = updated_config

View File

@ -29,6 +29,10 @@ from frigate.comms.recordings_updater import (
)
from frigate.config import FrigateConfig
from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import (
CLIPS_DIR,
UPDATE_EVENT_DESCRIPTION,
@ -87,6 +91,11 @@ class EmbeddingMaintainer(threading.Thread):
self.config = config
self.metrics = metrics
self.embeddings = None
self.config_updater = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove],
)
if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics)
@ -198,6 +207,7 @@ class EmbeddingMaintainer(threading.Thread):
def run(self) -> None:
"""Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set():
self.config_updater.check_for_updates()
self._process_requests()
self._process_updates()
self._process_recordings_updates()
@ -206,6 +216,7 @@ class EmbeddingMaintainer(threading.Thread):
self._process_finalized()
self._process_event_metadata()
self.config_updater.stop()
self.event_subscriber.stop()
self.event_end_subscriber.stop()
self.recordings_subscriber.stop()

View File

@ -90,10 +90,19 @@ class AudioProcessor(util.Process):
self.camera_metrics = camera_metrics
self.cameras = cameras
self.config = config
self.transcription_model_runner = AudioTranscriptionModelRunner(
self.config.audio_transcription.device,
self.config.audio_transcription.model_size,
)
if any(
[
conf.audio_transcription.enabled_in_config
for conf in config.cameras.values()
]
):
self.transcription_model_runner = AudioTranscriptionModelRunner(
self.config.audio_transcription.device,
self.config.audio_transcription.model_size,
)
else:
self.transcription_model_runner = None
def run(self) -> None:
audio_threads: list[AudioEventMaintainer] = []
@ -138,7 +147,7 @@ class AudioEventMaintainer(threading.Thread):
camera: CameraConfig,
config: FrigateConfig,
camera_metrics: dict[str, CameraMetrics],
audio_transcription_model_runner: AudioTranscriptionModelRunner,
audio_transcription_model_runner: AudioTranscriptionModelRunner | None,
stop_event: threading.Event,
) -> None:
super().__init__(name=f"{camera.name}_audio_event_processor")
@ -162,6 +171,7 @@ class AudioEventMaintainer(threading.Thread):
# create communication for audio detections
self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber(
None,
{self.camera_config.name: self.camera_config},
[
CameraConfigUpdateEnum.audio,

View File

@ -13,6 +13,10 @@ import numpy as np
from setproctitle import setproctitle
import frigate.util as util
from frigate.comms.object_detector_signaler import (
ObjectDetectorPublisher,
ObjectDetectorSubscriber,
)
from frigate.detectors import create_detector
from frigate.detectors.detector_config import (
BaseDetectorConfig,
@ -89,7 +93,7 @@ class LocalObjectDetector(ObjectDetector):
def run_detector(
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
cameras: list[str],
avg_speed: Value,
start: Value,
detector_config: BaseDetectorConfig,
@ -108,15 +112,19 @@ def run_detector(
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
outputs = {}
for name in out_events.keys():
def create_output_shm(name: str):
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np}
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
detector_publisher = ObjectDetectorPublisher()
outputs = {}
for name in cameras:
create_output_shm(name)
while not stop_event.is_set():
try:
connection_id = detection_queue.get(timeout=1)
@ -136,12 +144,18 @@ def run_detector(
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id)
if connection_id not in outputs:
create_output_shm(connection_id)
outputs[connection_id]["np"][:] = detections[:]
out_events[connection_id].set()
signal_id = f"{connection_id}/update"
detector_publisher.publish(signal_id, signal_id)
start.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10
detector_publisher.stop()
logger.info("Exited detection process...")
@ -150,11 +164,11 @@ class ObjectDetectProcess:
self,
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
cameras: list[str],
detector_config: BaseDetectorConfig,
):
self.name = name
self.out_events = out_events
self.cameras = cameras
self.detection_queue = detection_queue
self.avg_inference_speed = Value("d", 0.01)
self.detection_start = Value("d", 0.0)
@ -185,7 +199,7 @@ class ObjectDetectProcess:
args=(
self.name,
self.detection_queue,
self.out_events,
self.cameras,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
@ -201,7 +215,6 @@ class RemoteObjectDetector:
name: str,
labels: dict[int, str],
detection_queue: Queue,
event: MpEvent,
model_config: ModelConfig,
stop_event: MpEvent,
):
@ -209,7 +222,6 @@ class RemoteObjectDetector:
self.name = name
self.fps = EventsPerSecond()
self.detection_queue = detection_queue
self.event = event
self.stop_event = stop_event
self.shm = UntrackedSharedMemory(name=self.name, create=False)
self.np_shm = np.ndarray(
@ -219,6 +231,7 @@ class RemoteObjectDetector:
)
self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False)
self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
self.detector_subscriber = ObjectDetectorSubscriber(f"{name}/update")
def detect(self, tensor_input, threshold=0.4):
detections = []
@ -228,9 +241,8 @@ class RemoteObjectDetector:
# copy input to shared memory
self.np_shm[:] = tensor_input[:]
self.event.clear()
self.detection_queue.put(self.name)
result = self.event.wait(timeout=5.0)
result = self.detector_subscriber.check_for_update()
# if it timed out
if result is None:
@ -246,5 +258,6 @@ class RemoteObjectDetector:
return detections
def cleanup(self):
self.detector_subscriber.stop()
self.shm.unlink()
self.out_shm.unlink()

View File

@ -103,8 +103,10 @@ def output_frames(
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
@ -135,7 +137,15 @@ def output_frames(
while not stop_event.is_set():
# check if there is an updated config
config_subscriber.check_for_updates()
updates = config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
(topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp()

View File

@ -31,7 +31,7 @@ from frigate.const import (
)
from frigate.ptz.onvif import OnvifController
from frigate.track.tracked_object import TrackedObject
from frigate.util.builtin import update_yaml_file
from frigate.util.builtin import update_yaml_file_bulk
from frigate.util.config import find_config_file
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
@ -348,10 +348,13 @@ class PtzAutoTracker:
f"{camera}: Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}"
)
update_yaml_file(
update_yaml_file_bulk(
config_file,
["cameras", camera, "onvif", "autotracking", "movement_weights"],
self.config.cameras[camera].onvif.autotracking.movement_weights,
{
f"cameras.{camera}.onvif.autotracking.movement_weights": self.config.cameras[
camera
].onvif.autotracking.movement_weights
},
)
async def _calibrate_camera(self, camera):

View File

@ -75,7 +75,9 @@ class RecordingMaintainer(threading.Thread):
# create communication for retained recordings
self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber(
self.config.cameras, [CameraConfigUpdateEnum.record]
self.config,
self.config.cameras,
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
)
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.recordings_publisher = RecordingsDataPublisher(

View File

@ -154,10 +154,13 @@ class ReviewSegmentMaintainer(threading.Thread):
# create communication for review segments
self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.review,
],
)

View File

@ -66,9 +66,15 @@ class TrackedObjectProcessor(threading.Thread):
self.last_motion_detected: dict[str, float] = {}
self.ptz_autotracker_thread = ptz_autotracker_thread
self.config_subscriber = CameraConfigUpdateSubscriber(
self.camera_config_subscriber = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.zones],
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.zones,
],
)
self.requestor = InterProcessRequestor()
@ -91,6 +97,12 @@ class TrackedObjectProcessor(threading.Thread):
self.zone_data = defaultdict(lambda: defaultdict(dict))
self.active_zone_data = defaultdict(lambda: defaultdict(dict))
for camera in self.config.cameras.keys():
self.create_camera_state(camera)
def create_camera_state(self, camera: str) -> None:
"""Creates a new camera state."""
def start(camera: str, obj: TrackedObject, frame_name: str):
self.event_sender.publish(
(
@ -198,17 +210,16 @@ class TrackedObjectProcessor(threading.Thread):
self.camera_activity[camera] = activity
self.requestor.send_data(UPDATE_CAMERA_ACTIVITY, self.camera_activity)
for camera in self.config.cameras.keys():
camera_state = CameraState(
camera, self.config, self.frame_manager, self.ptz_autotracker_thread
)
camera_state.on("start", start)
camera_state.on("autotrack", autotrack)
camera_state.on("update", update)
camera_state.on("end", end)
camera_state.on("snapshot", snapshot)
camera_state.on("camera_activity", camera_activity)
self.camera_states[camera] = camera_state
camera_state = CameraState(
camera, self.config, self.frame_manager, self.ptz_autotracker_thread
)
camera_state.on("start", start)
camera_state.on("autotrack", autotrack)
camera_state.on("update", update)
camera_state.on("end", end)
camera_state.on("snapshot", snapshot)
camera_state.on("camera_activity", camera_activity)
self.camera_states[camera] = camera_state
def should_save_snapshot(self, camera, obj: TrackedObject):
if obj.false_positive:
@ -582,7 +593,7 @@ class TrackedObjectProcessor(threading.Thread):
def run(self):
while not self.stop_event.is_set():
# check for config updates
updated_topics = self.config_subscriber.check_for_updates()
updated_topics = self.camera_config_subscriber.check_for_updates()
if "enabled" in updated_topics:
for camera in updated_topics["enabled"]:
@ -590,6 +601,17 @@ class TrackedObjectProcessor(threading.Thread):
self.camera_states[camera].prev_enabled = self.config.cameras[
camera
].enabled
elif "add" in updated_topics:
for camera in updated_topics["add"]:
self.config.cameras[camera] = (
self.camera_config_subscriber.camera_configs[camera]
)
self.create_camera_state(camera)
elif "remove" in updated_topics:
for camera in updated_topics["remove"]:
camera_state = self.camera_states[camera]
camera_state.shutdown()
self.camera_states.pop(camera)
# manage camera disabled state
for camera, config in self.config.cameras.items():
@ -698,6 +720,6 @@ class TrackedObjectProcessor(threading.Thread):
self.event_sender.stop()
self.event_end_subscriber.stop()
self.sub_label_subscriber.stop()
self.config_subscriber.stop()
self.camera_config_subscriber.stop()
logger.info("Exiting object processor...")

View File

@ -14,7 +14,7 @@ import urllib.parse
from collections.abc import Mapping
from multiprocessing.sharedctypes import Synchronized
from pathlib import Path
from typing import Any, Optional, Tuple, Union
from typing import Any, Dict, Optional, Tuple, Union
from zoneinfo import ZoneInfoNotFoundError
import numpy as np
@ -184,25 +184,12 @@ def create_mask(frame_shape, mask):
mask_img[:] = 255
def update_yaml_from_url(file_path, url):
parsed_url = urllib.parse.urlparse(url)
query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
# Filter out empty keys but keep blank values for non-empty keys
query_string = {k: v for k, v in query_string.items() if k}
def process_config_query_string(query_string: Dict[str, list]) -> Dict[str, Any]:
updates = {}
for key_path_str, new_value_list in query_string.items():
key_path = key_path_str.split(".")
for i in range(len(key_path)):
try:
index = int(key_path[i])
key_path[i] = (key_path[i - 1], index)
key_path.pop(i - 1)
except ValueError:
pass
# use the string key as-is for updates dictionary
if len(new_value_list) > 1:
update_yaml_file(file_path, key_path, new_value_list)
updates[key_path_str] = new_value_list
else:
value = new_value_list[0]
try:
@ -210,10 +197,24 @@ def update_yaml_from_url(file_path, url):
value = ast.literal_eval(value) if "," not in value else value
except (ValueError, SyntaxError):
pass
update_yaml_file(file_path, key_path, value)
updates[key_path_str] = value
return updates
def update_yaml_file(file_path, key_path, new_value):
def flatten_config_data(
config_data: Dict[str, Any], parent_key: str = ""
) -> Dict[str, Any]:
items = []
for key, value in config_data.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
items.extend(flatten_config_data(value, new_key).items())
else:
items.append((new_key, value))
return dict(items)
def update_yaml_file_bulk(file_path: str, updates: Dict[str, Any]):
yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2)
@ -226,7 +227,17 @@ def update_yaml_file(file_path, key_path, new_value):
)
return
data = update_yaml(data, key_path, new_value)
# Apply all updates
for key_path_str, new_value in updates.items():
key_path = key_path_str.split(".")
for i in range(len(key_path)):
try:
index = int(key_path[i])
key_path[i] = (key_path[i - 1], index)
key_path.pop(i - 1)
except ValueError:
pass
data = update_yaml(data, key_path, new_value)
try:
with open(file_path, "w") as f:

View File

@ -116,7 +116,7 @@ def capture_frames(
skipped_eps = EventsPerSecond()
skipped_eps.start()
config_subscriber = CameraConfigUpdateSubscriber(
{config.name: config}, [CameraConfigUpdateEnum.enabled]
None, {config.name: config}, [CameraConfigUpdateEnum.enabled]
)
def get_enabled_state():
@ -196,7 +196,7 @@ class CameraWatchdog(threading.Thread):
self.sleeptime = self.config.ffmpeg.retry_interval
self.config_subscriber = CameraConfigUpdateSubscriber(
{config.name: config}, [CameraConfigUpdateEnum.enabled]
None, {config.name: config}, [CameraConfigUpdateEnum.enabled]
)
self.was_enabled = self.config.enabled
@ -473,7 +473,6 @@ def track_camera(
model_config: ModelConfig,
labelmap: dict[int, str],
detection_queue: Queue,
result_connection: MpEvent,
detected_objects_queue,
camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics,
@ -503,7 +502,7 @@ def track_camera(
ptz_metrics=ptz_metrics,
)
object_detector = RemoteObjectDetector(
name, labelmap, detection_queue, result_connection, model_config, stop_event
name, labelmap, detection_queue, model_config, stop_event
)
object_tracker = NorfairTracker(config, ptz_metrics)
@ -597,6 +596,7 @@ def process_frames(
):
next_region_update = get_tomorrow_at_time(2)
config_subscriber = CameraConfigUpdateSubscriber(
None,
{camera_name: camera_config},
[
CameraConfigUpdateEnum.detect,