mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-09-23 17:52:05 +02:00
260 lines
9.8 KiB
Python
260 lines
9.8 KiB
Python
"""Manage camera activity and updating listeners."""
|
|
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import random
|
|
import string
|
|
from collections import Counter
|
|
from typing import Any, Callable
|
|
|
|
from frigate.comms.event_metadata_updater import (
|
|
EventMetadataPublisher,
|
|
EventMetadataTypeEnum,
|
|
)
|
|
from frigate.config import CameraConfig, FrigateConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CameraActivityManager:
|
|
def __init__(
|
|
self, config: FrigateConfig, publish: Callable[[str, Any], None]
|
|
) -> None:
|
|
self.config = config
|
|
self.publish = publish
|
|
self.last_camera_activity: dict[str, dict[str, Any]] = {}
|
|
self.camera_all_object_counts: dict[str, Counter] = {}
|
|
self.camera_active_object_counts: dict[str, Counter] = {}
|
|
self.zone_all_object_counts: dict[str, Counter] = {}
|
|
self.zone_active_object_counts: dict[str, Counter] = {}
|
|
self.all_zone_labels: dict[str, set[str]] = {}
|
|
|
|
for camera_config in config.cameras.values():
|
|
if not camera_config.enabled_in_config:
|
|
continue
|
|
|
|
self.__init_camera(camera_config)
|
|
|
|
def __init_camera(self, camera_config: CameraConfig) -> None:
|
|
self.last_camera_activity[camera_config.name] = {}
|
|
self.camera_all_object_counts[camera_config.name] = Counter()
|
|
self.camera_active_object_counts[camera_config.name] = Counter()
|
|
|
|
for zone, zone_config in camera_config.zones.items():
|
|
if zone not in self.all_zone_labels:
|
|
self.zone_all_object_counts[zone] = Counter()
|
|
self.zone_active_object_counts[zone] = Counter()
|
|
self.all_zone_labels[zone] = set()
|
|
|
|
self.all_zone_labels[zone].update(
|
|
zone_config.objects
|
|
if zone_config.objects
|
|
else camera_config.objects.track
|
|
)
|
|
|
|
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
|
|
all_objects: list[dict[str, Any]] = []
|
|
|
|
for camera in new_activity.keys():
|
|
# handle cameras that were added dynamically
|
|
if camera not in self.camera_all_object_counts:
|
|
self.__init_camera(self.config.cameras[camera])
|
|
|
|
new_objects = new_activity[camera].get("objects", [])
|
|
all_objects.extend(new_objects)
|
|
|
|
if self.last_camera_activity.get(camera, {}).get("objects") != new_objects:
|
|
self.compare_camera_activity(camera, new_objects)
|
|
|
|
# run through every zone, getting a count of objects in that zone right now
|
|
for zone, labels in self.all_zone_labels.items():
|
|
all_zone_objects = Counter(
|
|
obj["label"].replace("-verified", "")
|
|
for obj in all_objects
|
|
if zone in obj["current_zones"]
|
|
)
|
|
active_zone_objects = Counter(
|
|
obj["label"].replace("-verified", "")
|
|
for obj in all_objects
|
|
if zone in obj["current_zones"] and not obj["stationary"]
|
|
)
|
|
any_changed = False
|
|
|
|
# run through each object and check what topics need to be updated for this zone
|
|
for label in labels:
|
|
new_count = all_zone_objects[label]
|
|
new_active_count = active_zone_objects[label]
|
|
|
|
if (
|
|
new_count != self.zone_all_object_counts[zone][label]
|
|
or label not in self.zone_all_object_counts[zone]
|
|
):
|
|
any_changed = True
|
|
self.publish(f"{zone}/{label}", new_count)
|
|
self.zone_all_object_counts[zone][label] = new_count
|
|
|
|
if (
|
|
new_active_count != self.zone_active_object_counts[zone][label]
|
|
or label not in self.zone_active_object_counts[zone]
|
|
):
|
|
any_changed = True
|
|
self.publish(f"{zone}/{label}/active", new_active_count)
|
|
self.zone_active_object_counts[zone][label] = new_active_count
|
|
|
|
if any_changed:
|
|
self.publish(f"{zone}/all", sum(list(all_zone_objects.values())))
|
|
self.publish(
|
|
f"{zone}/all/active", sum(list(active_zone_objects.values()))
|
|
)
|
|
|
|
self.last_camera_activity = new_activity
|
|
|
|
def compare_camera_activity(
|
|
self, camera: str, new_activity: dict[str, Any]
|
|
) -> None:
|
|
all_objects = Counter(
|
|
obj["label"].replace("-verified", "") for obj in new_activity
|
|
)
|
|
active_objects = Counter(
|
|
obj["label"].replace("-verified", "")
|
|
for obj in new_activity
|
|
if not obj["stationary"]
|
|
)
|
|
any_changed = False
|
|
|
|
# run through each object and check what topics need to be updated
|
|
for label in self.config.cameras[camera].objects.track:
|
|
if label in self.config.model.non_logo_attributes:
|
|
continue
|
|
|
|
new_count = all_objects[label]
|
|
new_active_count = active_objects[label]
|
|
|
|
if (
|
|
new_count != self.camera_all_object_counts[camera][label]
|
|
or label not in self.camera_all_object_counts[camera]
|
|
):
|
|
any_changed = True
|
|
self.publish(f"{camera}/{label}", new_count)
|
|
self.camera_all_object_counts[camera][label] = new_count
|
|
|
|
if (
|
|
new_active_count != self.camera_active_object_counts[camera][label]
|
|
or label not in self.camera_active_object_counts[camera]
|
|
):
|
|
any_changed = True
|
|
self.publish(f"{camera}/{label}/active", new_active_count)
|
|
self.camera_active_object_counts[camera][label] = new_active_count
|
|
|
|
if any_changed:
|
|
self.publish(f"{camera}/all", sum(list(all_objects.values())))
|
|
self.publish(f"{camera}/all/active", sum(list(active_objects.values())))
|
|
|
|
|
|
class AudioActivityManager:
|
|
def __init__(
|
|
self, config: FrigateConfig, publish: Callable[[str, Any], None]
|
|
) -> None:
|
|
self.config = config
|
|
self.publish = publish
|
|
self.current_audio_detections: dict[str, dict[str, dict[str, Any]]] = {}
|
|
self.event_metadata_publisher = EventMetadataPublisher()
|
|
|
|
for camera_config in config.cameras.values():
|
|
if not camera_config.audio.enabled_in_config:
|
|
continue
|
|
|
|
self.__init_camera(camera_config)
|
|
|
|
def __init_camera(self, camera_config: CameraConfig) -> None:
|
|
self.current_audio_detections[camera_config.name] = {}
|
|
|
|
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
|
|
now = datetime.datetime.now().timestamp()
|
|
|
|
for camera in new_activity.keys():
|
|
# handle cameras that were added dynamically
|
|
if camera not in self.current_audio_detections:
|
|
self.__init_camera(self.config.cameras[camera])
|
|
|
|
new_detections = new_activity[camera].get("detections", [])
|
|
if self.compare_audio_activity(camera, new_detections, now):
|
|
logger.debug(f"Audio detections for {camera}: {new_activity}")
|
|
self.publish(
|
|
f"{camera}/audio/all",
|
|
"ON" if len(self.current_audio_detections[camera]) > 0 else "OFF",
|
|
)
|
|
self.publish(
|
|
"audio_detections",
|
|
json.dumps(self.current_audio_detections),
|
|
)
|
|
|
|
def compare_audio_activity(
|
|
self, camera: str, new_detections: list[tuple[str, float]], now: float
|
|
) -> None:
|
|
max_not_heard = self.config.cameras[camera].audio.max_not_heard
|
|
current = self.current_audio_detections[camera]
|
|
|
|
any_changed = False
|
|
|
|
for label, score in new_detections:
|
|
any_changed = True
|
|
if label in current:
|
|
current[label]["last_detection"] = now
|
|
current[label]["score"] = score
|
|
else:
|
|
rand_id = "".join(
|
|
random.choices(string.ascii_lowercase + string.digits, k=6)
|
|
)
|
|
event_id = f"{now}-{rand_id}"
|
|
self.publish(f"{camera}/audio/{label}", "ON")
|
|
|
|
self.event_metadata_publisher.publish(
|
|
(
|
|
now,
|
|
camera,
|
|
label,
|
|
event_id,
|
|
True,
|
|
score,
|
|
None,
|
|
None,
|
|
"audio",
|
|
{},
|
|
),
|
|
EventMetadataTypeEnum.manual_event_create.value,
|
|
)
|
|
current[label] = {
|
|
"id": event_id,
|
|
"score": score,
|
|
"last_detection": now,
|
|
}
|
|
|
|
# expire detections
|
|
for label in list(current.keys()):
|
|
if now - current[label]["last_detection"] > max_not_heard:
|
|
any_changed = True
|
|
self.publish(f"{camera}/audio/{label}", "OFF")
|
|
|
|
self.event_metadata_publisher.publish(
|
|
(current[label]["id"], now),
|
|
EventMetadataTypeEnum.manual_event_end.value,
|
|
)
|
|
del current[label]
|
|
|
|
return any_changed
|
|
|
|
def expire_all(self, camera: str) -> None:
|
|
now = datetime.datetime.now().timestamp()
|
|
current = self.current_audio_detections.get(camera, {})
|
|
|
|
for label in list(current.keys()):
|
|
self.publish(f"{camera}/audio/{label}", "OFF")
|
|
|
|
self.event_metadata_publisher.publish(
|
|
(current[label]["id"], now),
|
|
EventMetadataTypeEnum.manual_event_end.value,
|
|
)
|
|
del current[label]
|