Refactor camera activity processing (#15803)

* Replace object label sensors with new manager

* Implement zone topics

* remove unused
This commit is contained in:
Nicolas Mowen 2025-01-03 22:11:53 -06:00
parent 7c2ff818a5
commit f4501a2094
3 changed files with 136 additions and 206 deletions

View File

@ -0,0 +1,130 @@
"""Manage camera activity and updating listeners."""
from collections import Counter
from typing import Callable
from frigate.config.config import FrigateConfig
class CameraActivityManager:
def __init__(
self, config: FrigateConfig, publish: Callable[[str, any], None]
) -> None:
self.config = config
self.publish = publish
self.last_camera_activity: dict[str, dict[str, any]] = {}
self.camera_all_object_counts: dict[str, Counter] = {}
self.camera_active_object_counts: dict[str, Counter] = {}
self.zone_all_object_counts: dict[str, Counter] = {}
self.zone_active_object_counts: dict[str, Counter] = {}
self.all_zone_labels: dict[str, set[str]] = {}
for camera_config in config.cameras.values():
if not camera_config.enabled:
continue
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
for zone, zone_config in camera_config.zones.items():
if zone not in self.all_zone_labels:
self.zone_all_object_counts[zone] = Counter()
self.zone_active_object_counts[zone] = Counter()
self.all_zone_labels[zone] = set()
self.all_zone_labels[zone].update(zone_config.objects)
def update_activity(self, new_activity: dict[str, dict[str, any]]) -> None:
all_objects: list[dict[str, any]] = []
for camera in new_activity.keys():
new_objects = new_activity[camera].get("objects", [])
all_objects.extend(new_objects)
if self.last_camera_activity.get(camera, {}).get("objects") != new_objects:
self.compare_camera_activity(camera, new_objects)
# run through every zone, getting a count of objects in that zone right now
for zone, labels in self.all_zone_labels.items():
all_zone_objects = Counter(
obj["label"].replace("-verified", "")
for obj in all_objects
if zone in obj["current_zones"]
)
active_zone_objects = Counter(
obj["label"].replace("-verified", "")
for obj in all_objects
if zone in obj["current_zones"] and not obj["stationary"]
)
any_changed = False
# run through each object and check what topics need to be updated for this zone
for label in labels:
new_count = all_zone_objects[label]
new_active_count = active_zone_objects[label]
if (
new_count != self.zone_all_object_counts[zone][label]
or label not in self.zone_all_object_counts[zone]
):
any_changed = True
self.publish(f"{zone}/{label}", new_count)
self.zone_all_object_counts[zone][label] = new_count
if (
new_active_count != self.zone_active_object_counts[zone][label]
or label not in self.zone_active_object_counts[zone]
):
any_changed = True
self.publish(f"{zone}/{label}/active", new_active_count)
self.zone_active_object_counts[zone][label] = new_active_count
if any_changed:
self.publish(f"{zone}/all", sum(list(all_zone_objects.values())))
self.publish(
f"{zone}/all/active", sum(list(active_zone_objects.values()))
)
self.last_camera_activity = new_activity
def compare_camera_activity(
self, camera: str, new_activity: dict[str, any]
) -> None:
all_objects = Counter(
obj["label"].replace("-verified", "") for obj in new_activity
)
active_objects = Counter(
obj["label"].replace("-verified", "")
for obj in new_activity
if not obj["stationary"]
)
any_changed = False
# run through each object and check what topics need to be updated
for label in self.config.cameras[camera].objects.track:
if label in self.config.model.all_attributes:
continue
new_count = all_objects[label]
new_active_count = active_objects[label]
if (
new_count != self.camera_all_object_counts[camera][label]
or label not in self.camera_all_object_counts[camera]
):
any_changed = True
self.publish(f"{camera}/{label}", new_count)
self.camera_all_object_counts[camera][label] = new_count
if (
new_active_count != self.camera_active_object_counts[camera][label]
or label not in self.camera_active_object_counts[camera]
):
any_changed = True
self.publish(f"{camera}/{label}/active", new_active_count)
self.camera_active_object_counts[camera][label] = new_active_count
if any_changed:
self.publish(f"{camera}/all", sum(list(all_objects.values())))
self.publish(f"{camera}/all/active", sum(list(active_objects.values())))

View File

@ -7,6 +7,7 @@ from abc import ABC, abstractmethod
from typing import Any, Callable, Optional from typing import Any, Callable, Optional
from frigate.camera import PTZMetrics from frigate.camera import PTZMetrics
from frigate.camera.activity_manager import CameraActivityManager
from frigate.comms.config_updater import ConfigPublisher from frigate.comms.config_updater import ConfigPublisher
from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import ( from frigate.const import (
@ -64,7 +65,7 @@ class Dispatcher:
self.onvif = onvif self.onvif = onvif
self.ptz_metrics = ptz_metrics self.ptz_metrics = ptz_metrics
self.comms = communicators self.comms = communicators
self.camera_activity = {} self.camera_activity = CameraActivityManager(config, self.publish)
self.model_state = {} self.model_state = {}
self.embeddings_reindex = {} self.embeddings_reindex = {}
@ -130,7 +131,7 @@ class Dispatcher:
).execute() ).execute()
def handle_update_camera_activity(): def handle_update_camera_activity():
self.camera_activity = payload self.camera_activity.update_activity(payload)
def handle_update_event_description(): def handle_update_event_description():
event: Event = Event.get(Event.id == payload["id"]) event: Event = Event.get(Event.id == payload["id"])
@ -171,7 +172,7 @@ class Dispatcher:
) )
def handle_on_connect(): def handle_on_connect():
camera_status = self.camera_activity.copy() camera_status = self.camera_activity.last_camera_activity.copy()
for camera in camera_status.keys(): for camera in camera_status.keys():
camera_status[camera]["config"] = { camera_status[camera]["config"] = {

View File

@ -4,7 +4,7 @@ import logging
import os import os
import queue import queue
import threading import threading
from collections import Counter, defaultdict from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Callable, Optional from typing import Callable, Optional
@ -51,8 +51,6 @@ class CameraState:
self.camera_config = config.cameras[name] self.camera_config = config.cameras[name]
self.frame_manager = frame_manager self.frame_manager = frame_manager
self.best_objects: dict[str, TrackedObject] = {} self.best_objects: dict[str, TrackedObject] = {}
self.object_counts = defaultdict(int)
self.active_object_counts = defaultdict(int)
self.tracked_objects: dict[str, TrackedObject] = {} self.tracked_objects: dict[str, TrackedObject] = {}
self.frame_cache = {} self.frame_cache = {}
self.zone_objects = defaultdict(list) self.zone_objects = defaultdict(list)
@ -338,6 +336,7 @@ class CameraState:
"ratio": obj.obj_data["ratio"], "ratio": obj.obj_data["ratio"],
"score": obj.obj_data["score"], "score": obj.obj_data["score"],
"sub_label": sub_label, "sub_label": sub_label,
"current_zones": obj.current_zones,
} }
) )
@ -377,78 +376,6 @@ class CameraState:
for c in self.callbacks["camera_activity"]: for c in self.callbacks["camera_activity"]:
c(self.name, camera_activity) c(self.name, camera_activity)
# update overall camera state for each object type
obj_counter = Counter(
obj.obj_data["label"]
for obj in tracked_objects.values()
if not obj.false_positive
)
active_obj_counter = Counter(
obj.obj_data["label"]
for obj in tracked_objects.values()
if not obj.false_positive and obj.active
)
# keep track of all labels detected for this camera
total_label_count = 0
total_active_label_count = 0
# report on all detected objects
for obj_name, count in obj_counter.items():
total_label_count += count
if count != self.object_counts[obj_name]:
self.object_counts[obj_name] = count
for c in self.callbacks["object_status"]:
c(self.name, obj_name, count)
# update the active count on all detected objects
# To ensure we emit 0's if all objects are stationary, we need to loop
# over the set of all objects, not just active ones.
for obj_name in set(obj_counter):
count = active_obj_counter[obj_name]
total_active_label_count += count
if count != self.active_object_counts[obj_name]:
self.active_object_counts[obj_name] = count
for c in self.callbacks["active_object_status"]:
c(self.name, obj_name, count)
# publish for all labels detected for this camera
if total_label_count != self.object_counts.get("all"):
self.object_counts["all"] = total_label_count
for c in self.callbacks["object_status"]:
c(self.name, "all", total_label_count)
# publish active label counts for this camera
if total_active_label_count != self.active_object_counts.get("all"):
self.active_object_counts["all"] = total_active_label_count
for c in self.callbacks["active_object_status"]:
c(self.name, "all", total_active_label_count)
# expire any objects that are >0 and no longer detected
expired_objects = [
obj_name
for obj_name, count in self.object_counts.items()
if count > 0 and obj_name not in obj_counter
]
for obj_name in expired_objects:
# Ignore the artificial all label
if obj_name == "all":
continue
self.object_counts[obj_name] = 0
for c in self.callbacks["object_status"]:
c(self.name, obj_name, 0)
# Only publish if the object was previously active.
if self.active_object_counts[obj_name] > 0:
for c in self.callbacks["active_object_status"]:
c(self.name, obj_name, 0)
self.active_object_counts[obj_name] = 0
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[obj_name], frame_name)
# cleanup thumbnail frame cache # cleanup thumbnail frame cache
current_thumb_frames = { current_thumb_frames = {
obj.thumbnail_data["frame_time"] obj.thumbnail_data["frame_time"]
@ -635,14 +562,6 @@ class TrackedObjectProcessor(threading.Thread):
retain=True, retain=True,
) )
def object_status(camera, object_name, status):
self.dispatcher.publish(f"{camera}/{object_name}", status, retain=False)
def active_object_status(camera, object_name, status):
self.dispatcher.publish(
f"{camera}/{object_name}/active", status, retain=False
)
def camera_activity(camera, activity): def camera_activity(camera, activity):
last_activity = self.camera_activity.get(camera) last_activity = self.camera_activity.get(camera)
@ -659,8 +578,6 @@ class TrackedObjectProcessor(threading.Thread):
camera_state.on("update", update) camera_state.on("update", update)
camera_state.on("end", end) camera_state.on("end", end)
camera_state.on("snapshot", snapshot) camera_state.on("snapshot", snapshot)
camera_state.on("object_status", object_status)
camera_state.on("active_object_status", active_object_status)
camera_state.on("camera_activity", camera_activity) camera_state.on("camera_activity", camera_activity)
self.camera_states[camera] = camera_state self.camera_states[camera] = camera_state
@ -817,124 +734,6 @@ class TrackedObjectProcessor(threading.Thread):
) )
) )
# update zone counts for each label
# for each zone in the current camera
for zone in self.config.cameras[camera].zones.keys():
# count labels for the camera in the zone
obj_counter = Counter(
obj.obj_data["label"]
for obj in camera_state.tracked_objects.values()
if zone in obj.current_zones and not obj.false_positive
)
active_obj_counter = Counter(
obj.obj_data["label"]
for obj in camera_state.tracked_objects.values()
if (
zone in obj.current_zones
and not obj.false_positive
and obj.active
)
)
total_label_count = 0
total_active_label_count = 0
# update counts and publish status
for label in set(self.zone_data[zone].keys()) | set(obj_counter.keys()):
# Ignore the artificial all label
if label == "all":
continue
# if we have previously published a count for this zone/label
zone_label = self.zone_data[zone][label]
active_zone_label = self.active_zone_data[zone][label]
if camera in zone_label:
current_count = sum(zone_label.values())
current_active_count = sum(active_zone_label.values())
zone_label[camera] = (
obj_counter[label] if label in obj_counter else 0
)
active_zone_label[camera] = (
active_obj_counter[label]
if label in active_obj_counter
else 0
)
new_count = sum(zone_label.values())
new_active_count = sum(active_zone_label.values())
if new_count != current_count:
self.dispatcher.publish(
f"{zone}/{label}",
new_count,
retain=False,
)
if new_active_count != current_active_count:
self.dispatcher.publish(
f"{zone}/{label}/active",
new_active_count,
retain=False,
)
# Set the count for the /zone/all topic.
total_label_count += new_count
total_active_label_count += new_active_count
# if this is a new zone/label combo for this camera
else:
if label in obj_counter:
zone_label[camera] = obj_counter[label]
active_zone_label[camera] = active_obj_counter[label]
self.dispatcher.publish(
f"{zone}/{label}",
obj_counter[label],
retain=False,
)
self.dispatcher.publish(
f"{zone}/{label}/active",
active_obj_counter[label],
retain=False,
)
# Set the count for the /zone/all topic.
total_label_count += obj_counter[label]
total_active_label_count += active_obj_counter[label]
# if we have previously published a count for this zone all labels
zone_label = self.zone_data[zone]["all"]
active_zone_label = self.active_zone_data[zone]["all"]
if camera in zone_label:
current_count = sum(zone_label.values())
current_active_count = sum(active_zone_label.values())
zone_label[camera] = total_label_count
active_zone_label[camera] = total_active_label_count
new_count = sum(zone_label.values())
new_active_count = sum(active_zone_label.values())
if new_count != current_count:
self.dispatcher.publish(
f"{zone}/all",
new_count,
retain=False,
)
if new_active_count != current_active_count:
self.dispatcher.publish(
f"{zone}/all/active",
new_active_count,
retain=False,
)
# if this is a new zone all label for this camera
else:
zone_label[camera] = total_label_count
active_zone_label[camera] = total_active_label_count
self.dispatcher.publish(
f"{zone}/all",
total_label_count,
retain=False,
)
self.dispatcher.publish(
f"{zone}/all/active",
total_active_label_count,
retain=False,
)
# cleanup event finished queue # cleanup event finished queue
while not self.stop_event.is_set(): while not self.stop_event.is_set():
update = self.event_end_subscriber.check_for_update(timeout=0.01) update = self.event_end_subscriber.check_for_update(timeout=0.01)