blakeblackshear.frigate/frigate/camera/activity_manager.py
Nicolas Mowen 195f705616
Support all audio type in MQTT (#19768)
* Support all audio type in MQTT

* Formatting
2025-08-26 09:50:50 -05:00

260 lines
9.8 KiB
Python

"""Manage camera activity and updating listeners."""
import datetime
import json
import logging
import random
import string
from collections import Counter
from typing import Any, Callable
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.config import CameraConfig, FrigateConfig
logger = logging.getLogger(__name__)
class CameraActivityManager:
def __init__(
self, config: FrigateConfig, publish: Callable[[str, Any], None]
) -> None:
self.config = config
self.publish = publish
self.last_camera_activity: dict[str, dict[str, Any]] = {}
self.camera_all_object_counts: dict[str, Counter] = {}
self.camera_active_object_counts: dict[str, Counter] = {}
self.zone_all_object_counts: dict[str, Counter] = {}
self.zone_active_object_counts: dict[str, Counter] = {}
self.all_zone_labels: dict[str, set[str]] = {}
for camera_config in config.cameras.values():
if not camera_config.enabled_in_config:
continue
self.__init_camera(camera_config)
def __init_camera(self, camera_config: CameraConfig) -> None:
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
for zone, zone_config in camera_config.zones.items():
if zone not in self.all_zone_labels:
self.zone_all_object_counts[zone] = Counter()
self.zone_active_object_counts[zone] = Counter()
self.all_zone_labels[zone] = set()
self.all_zone_labels[zone].update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
all_objects: list[dict[str, Any]] = []
for camera in new_activity.keys():
# handle cameras that were added dynamically
if camera not in self.camera_all_object_counts:
self.__init_camera(self.config.cameras[camera])
new_objects = new_activity[camera].get("objects", [])
all_objects.extend(new_objects)
if self.last_camera_activity.get(camera, {}).get("objects") != new_objects:
self.compare_camera_activity(camera, new_objects)
# run through every zone, getting a count of objects in that zone right now
for zone, labels in self.all_zone_labels.items():
all_zone_objects = Counter(
obj["label"].replace("-verified", "")
for obj in all_objects
if zone in obj["current_zones"]
)
active_zone_objects = Counter(
obj["label"].replace("-verified", "")
for obj in all_objects
if zone in obj["current_zones"] and not obj["stationary"]
)
any_changed = False
# run through each object and check what topics need to be updated for this zone
for label in labels:
new_count = all_zone_objects[label]
new_active_count = active_zone_objects[label]
if (
new_count != self.zone_all_object_counts[zone][label]
or label not in self.zone_all_object_counts[zone]
):
any_changed = True
self.publish(f"{zone}/{label}", new_count)
self.zone_all_object_counts[zone][label] = new_count
if (
new_active_count != self.zone_active_object_counts[zone][label]
or label not in self.zone_active_object_counts[zone]
):
any_changed = True
self.publish(f"{zone}/{label}/active", new_active_count)
self.zone_active_object_counts[zone][label] = new_active_count
if any_changed:
self.publish(f"{zone}/all", sum(list(all_zone_objects.values())))
self.publish(
f"{zone}/all/active", sum(list(active_zone_objects.values()))
)
self.last_camera_activity = new_activity
def compare_camera_activity(
self, camera: str, new_activity: dict[str, Any]
) -> None:
all_objects = Counter(
obj["label"].replace("-verified", "") for obj in new_activity
)
active_objects = Counter(
obj["label"].replace("-verified", "")
for obj in new_activity
if not obj["stationary"]
)
any_changed = False
# run through each object and check what topics need to be updated
for label in self.config.cameras[camera].objects.track:
if label in self.config.model.non_logo_attributes:
continue
new_count = all_objects[label]
new_active_count = active_objects[label]
if (
new_count != self.camera_all_object_counts[camera][label]
or label not in self.camera_all_object_counts[camera]
):
any_changed = True
self.publish(f"{camera}/{label}", new_count)
self.camera_all_object_counts[camera][label] = new_count
if (
new_active_count != self.camera_active_object_counts[camera][label]
or label not in self.camera_active_object_counts[camera]
):
any_changed = True
self.publish(f"{camera}/{label}/active", new_active_count)
self.camera_active_object_counts[camera][label] = new_active_count
if any_changed:
self.publish(f"{camera}/all", sum(list(all_objects.values())))
self.publish(f"{camera}/all/active", sum(list(active_objects.values())))
class AudioActivityManager:
def __init__(
self, config: FrigateConfig, publish: Callable[[str, Any], None]
) -> None:
self.config = config
self.publish = publish
self.current_audio_detections: dict[str, dict[str, dict[str, Any]]] = {}
self.event_metadata_publisher = EventMetadataPublisher()
for camera_config in config.cameras.values():
if not camera_config.audio.enabled_in_config:
continue
self.__init_camera(camera_config)
def __init_camera(self, camera_config: CameraConfig) -> None:
self.current_audio_detections[camera_config.name] = {}
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
now = datetime.datetime.now().timestamp()
for camera in new_activity.keys():
# handle cameras that were added dynamically
if camera not in self.current_audio_detections:
self.__init_camera(self.config.cameras[camera])
new_detections = new_activity[camera].get("detections", [])
if self.compare_audio_activity(camera, new_detections, now):
logger.debug(f"Audio detections for {camera}: {new_activity}")
self.publish(
f"{camera}/audio/all",
"ON" if len(self.current_audio_detections[camera]) > 0 else "OFF",
)
self.publish(
"audio_detections",
json.dumps(self.current_audio_detections),
)
def compare_audio_activity(
self, camera: str, new_detections: list[tuple[str, float]], now: float
) -> None:
max_not_heard = self.config.cameras[camera].audio.max_not_heard
current = self.current_audio_detections[camera]
any_changed = False
for label, score in new_detections:
any_changed = True
if label in current:
current[label]["last_detection"] = now
current[label]["score"] = score
else:
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
event_id = f"{now}-{rand_id}"
self.publish(f"{camera}/audio/{label}", "ON")
self.event_metadata_publisher.publish(
(
now,
camera,
label,
event_id,
True,
score,
None,
None,
"audio",
{},
),
EventMetadataTypeEnum.manual_event_create.value,
)
current[label] = {
"id": event_id,
"score": score,
"last_detection": now,
}
# expire detections
for label in list(current.keys()):
if now - current[label]["last_detection"] > max_not_heard:
any_changed = True
self.publish(f"{camera}/audio/{label}", "OFF")
self.event_metadata_publisher.publish(
(current[label]["id"], now),
EventMetadataTypeEnum.manual_event_end.value,
)
del current[label]
return any_changed
def expire_all(self, camera: str) -> None:
now = datetime.datetime.now().timestamp()
current = self.current_audio_detections.get(camera, {})
for label in list(current.keys()):
self.publish(f"{camera}/audio/{label}", "OFF")
self.event_metadata_publisher.publish(
(current[label]["id"], now),
EventMetadataTypeEnum.manual_event_end.value,
)
del current[label]