diff --git a/frigate/camera/activity_manager.py b/frigate/camera/activity_manager.py new file mode 100644 index 000000000..381e295b9 --- /dev/null +++ b/frigate/camera/activity_manager.py @@ -0,0 +1,130 @@ +"""Manage camera activity and updating listeners.""" + +from collections import Counter +from typing import Callable + +from frigate.config.config import FrigateConfig + + +class CameraActivityManager: + def __init__( + self, config: FrigateConfig, publish: Callable[[str, any], None] + ) -> None: + self.config = config + self.publish = publish + self.last_camera_activity: dict[str, dict[str, any]] = {} + self.camera_all_object_counts: dict[str, Counter] = {} + self.camera_active_object_counts: dict[str, Counter] = {} + self.zone_all_object_counts: dict[str, Counter] = {} + self.zone_active_object_counts: dict[str, Counter] = {} + self.all_zone_labels: dict[str, set[str]] = {} + + for camera_config in config.cameras.values(): + if not camera_config.enabled: + continue + + self.last_camera_activity[camera_config.name] = {} + self.camera_all_object_counts[camera_config.name] = Counter() + self.camera_active_object_counts[camera_config.name] = Counter() + + for zone, zone_config in camera_config.zones.items(): + if zone not in self.all_zone_labels: + self.zone_all_object_counts[zone] = Counter() + self.zone_active_object_counts[zone] = Counter() + self.all_zone_labels[zone] = set() + + self.all_zone_labels[zone].update(zone_config.objects) + + def update_activity(self, new_activity: dict[str, dict[str, any]]) -> None: + all_objects: list[dict[str, any]] = [] + + for camera in new_activity.keys(): + new_objects = new_activity[camera].get("objects", []) + all_objects.extend(new_objects) + + if self.last_camera_activity.get(camera, {}).get("objects") != new_objects: + self.compare_camera_activity(camera, new_objects) + + # run through every zone, getting a count of objects in that zone right now + for zone, labels in self.all_zone_labels.items(): + all_zone_objects = Counter( + obj["label"].replace("-verified", "") + for obj in all_objects + if zone in obj["current_zones"] + ) + active_zone_objects = Counter( + obj["label"].replace("-verified", "") + for obj in all_objects + if zone in obj["current_zones"] and not obj["stationary"] + ) + any_changed = False + + # run through each object and check what topics need to be updated for this zone + for label in labels: + new_count = all_zone_objects[label] + new_active_count = active_zone_objects[label] + + if ( + new_count != self.zone_all_object_counts[zone][label] + or label not in self.zone_all_object_counts[zone] + ): + any_changed = True + self.publish(f"{zone}/{label}", new_count) + self.zone_all_object_counts[zone][label] = new_count + + if ( + new_active_count != self.zone_active_object_counts[zone][label] + or label not in self.zone_active_object_counts[zone] + ): + any_changed = True + self.publish(f"{zone}/{label}/active", new_active_count) + self.zone_active_object_counts[zone][label] = new_active_count + + if any_changed: + self.publish(f"{zone}/all", sum(list(all_zone_objects.values()))) + self.publish( + f"{zone}/all/active", sum(list(active_zone_objects.values())) + ) + + self.last_camera_activity = new_activity + + def compare_camera_activity( + self, camera: str, new_activity: dict[str, any] + ) -> None: + all_objects = Counter( + obj["label"].replace("-verified", "") for obj in new_activity + ) + active_objects = Counter( + obj["label"].replace("-verified", "") + for obj in new_activity + if not obj["stationary"] + ) + any_changed = False + + # run through each object and check what topics need to be updated + for label in self.config.cameras[camera].objects.track: + if label in self.config.model.all_attributes: + continue + + new_count = all_objects[label] + new_active_count = active_objects[label] + + if ( + new_count != self.camera_all_object_counts[camera][label] + or label not in self.camera_all_object_counts[camera] + ): + any_changed = True + self.publish(f"{camera}/{label}", new_count) + self.camera_all_object_counts[camera][label] = new_count + + if ( + new_active_count != self.camera_active_object_counts[camera][label] + or label not in self.camera_active_object_counts[camera] + ): + any_changed = True + self.publish(f"{camera}/{label}/active", new_active_count) + self.camera_active_object_counts[camera][label] = new_active_count + + if any_changed: + self.publish(f"{camera}/all", sum(list(all_objects.values()))) + self.publish(f"{camera}/all/active", sum(list(active_objects.values()))) diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 2bddc97a5..445147df8 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -7,6 +7,7 @@ from abc import ABC, abstractmethod from typing import Any, Callable, Optional from frigate.camera import PTZMetrics +from frigate.camera.activity_manager import CameraActivityManager from frigate.comms.config_updater import ConfigPublisher from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.const import ( @@ -64,7 +65,7 @@ class Dispatcher: self.onvif = onvif self.ptz_metrics = ptz_metrics self.comms = communicators - self.camera_activity = {} + self.camera_activity = CameraActivityManager(config, self.publish) self.model_state = {} self.embeddings_reindex = {} @@ -130,7 +131,7 @@ class Dispatcher: ).execute() def handle_update_camera_activity(): - self.camera_activity = payload + self.camera_activity.update_activity(payload) def handle_update_event_description(): event: Event = Event.get(Event.id == payload["id"]) @@ -171,7 +172,7 @@ class Dispatcher: ) def handle_on_connect(): - camera_status = self.camera_activity.copy() + camera_status = self.camera_activity.last_camera_activity.copy() for camera in camera_status.keys(): camera_status[camera]["config"] = { diff --git a/frigate/object_processing.py b/frigate/object_processing.py index b5196e686..ba2e15b20 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -4,7 +4,7 @@ import logging import os import queue import threading -from collections import Counter, defaultdict +from collections import defaultdict from multiprocessing.synchronize import Event as MpEvent from typing import Callable, Optional @@ -51,8 +51,6 @@ class CameraState: self.camera_config = config.cameras[name] self.frame_manager = frame_manager self.best_objects: dict[str, TrackedObject] = {} - self.object_counts = defaultdict(int) - self.active_object_counts = defaultdict(int) self.tracked_objects: dict[str, TrackedObject] = {} self.frame_cache = {} self.zone_objects = defaultdict(list) @@ -338,6 +336,7 @@ class CameraState: "ratio": obj.obj_data["ratio"], "score": obj.obj_data["score"], "sub_label": sub_label, + "current_zones": obj.current_zones, } ) @@ -377,78 +376,6 @@ class CameraState: for c in self.callbacks["camera_activity"]: c(self.name, camera_activity) - # update overall camera state for each object type - obj_counter = Counter( - obj.obj_data["label"] - for obj in tracked_objects.values() - if not obj.false_positive - ) - - active_obj_counter = Counter( - obj.obj_data["label"] - for obj in tracked_objects.values() - if not obj.false_positive and obj.active - ) - - # keep track of all labels detected for this camera - total_label_count = 0 - total_active_label_count = 0 - - # report on all detected objects - for obj_name, count in obj_counter.items(): - total_label_count += count - - if count != self.object_counts[obj_name]: - self.object_counts[obj_name] = count - for c in self.callbacks["object_status"]: - c(self.name, obj_name, count) - - # update the active count on all detected objects - # To ensure we emit 0's if all objects are stationary, we need to loop - # over the set of all objects, not just active ones. - for obj_name in set(obj_counter): - count = active_obj_counter[obj_name] - total_active_label_count += count - - if count != self.active_object_counts[obj_name]: - self.active_object_counts[obj_name] = count - for c in self.callbacks["active_object_status"]: - c(self.name, obj_name, count) - - # publish for all labels detected for this camera - if total_label_count != self.object_counts.get("all"): - self.object_counts["all"] = total_label_count - for c in self.callbacks["object_status"]: - c(self.name, "all", total_label_count) - - # publish active label counts for this camera - if total_active_label_count != self.active_object_counts.get("all"): - self.active_object_counts["all"] = total_active_label_count - for c in self.callbacks["active_object_status"]: - c(self.name, "all", total_active_label_count) - - # expire any objects that are >0 and no longer detected - expired_objects = [ - obj_name - for obj_name, count in self.object_counts.items() - if count > 0 and obj_name not in obj_counter - ] - for obj_name in expired_objects: - # Ignore the artificial all label - if obj_name == "all": - continue - - self.object_counts[obj_name] = 0 - for c in self.callbacks["object_status"]: - c(self.name, obj_name, 0) - # Only publish if the object was previously active. - if self.active_object_counts[obj_name] > 0: - for c in self.callbacks["active_object_status"]: - c(self.name, obj_name, 0) - self.active_object_counts[obj_name] = 0 - for c in self.callbacks["snapshot"]: - c(self.name, self.best_objects[obj_name], frame_name) - # cleanup thumbnail frame cache current_thumb_frames = { obj.thumbnail_data["frame_time"] @@ -635,14 +562,6 @@ class TrackedObjectProcessor(threading.Thread): retain=True, ) - def object_status(camera, object_name, status): - self.dispatcher.publish(f"{camera}/{object_name}", status, retain=False) - - def active_object_status(camera, object_name, status): - self.dispatcher.publish( - f"{camera}/{object_name}/active", status, retain=False - ) - def camera_activity(camera, activity): last_activity = self.camera_activity.get(camera) @@ -659,8 +578,6 @@ class TrackedObjectProcessor(threading.Thread): camera_state.on("update", update) camera_state.on("end", end) camera_state.on("snapshot", snapshot) - camera_state.on("object_status", object_status) - camera_state.on("active_object_status", active_object_status) camera_state.on("camera_activity", camera_activity) self.camera_states[camera] = camera_state @@ -817,124 +734,6 @@ class TrackedObjectProcessor(threading.Thread): ) ) - # update zone counts for each label - # for each zone in the current camera - for zone in self.config.cameras[camera].zones.keys(): - # count labels for the camera in the zone - obj_counter = Counter( - obj.obj_data["label"] - for obj in camera_state.tracked_objects.values() - if zone in obj.current_zones and not obj.false_positive - ) - active_obj_counter = Counter( - obj.obj_data["label"] - for obj in camera_state.tracked_objects.values() - if ( - zone in obj.current_zones - and not obj.false_positive - and obj.active - ) - ) - total_label_count = 0 - total_active_label_count = 0 - - # update counts and publish status - for label in set(self.zone_data[zone].keys()) | set(obj_counter.keys()): - # Ignore the artificial all label - if label == "all": - continue - - # if we have previously published a count for this zone/label - zone_label = self.zone_data[zone][label] - active_zone_label = self.active_zone_data[zone][label] - if camera in zone_label: - current_count = sum(zone_label.values()) - current_active_count = sum(active_zone_label.values()) - zone_label[camera] = ( - obj_counter[label] if label in obj_counter else 0 - ) - active_zone_label[camera] = ( - active_obj_counter[label] - if label in active_obj_counter - else 0 - ) - new_count = sum(zone_label.values()) - new_active_count = sum(active_zone_label.values()) - if new_count != current_count: - self.dispatcher.publish( - f"{zone}/{label}", - new_count, - retain=False, - ) - if new_active_count != current_active_count: - self.dispatcher.publish( - f"{zone}/{label}/active", - new_active_count, - retain=False, - ) - - # Set the count for the /zone/all topic. - total_label_count += new_count - total_active_label_count += new_active_count - - # if this is a new zone/label combo for this camera - else: - if label in obj_counter: - zone_label[camera] = obj_counter[label] - active_zone_label[camera] = active_obj_counter[label] - self.dispatcher.publish( - f"{zone}/{label}", - obj_counter[label], - retain=False, - ) - self.dispatcher.publish( - f"{zone}/{label}/active", - active_obj_counter[label], - retain=False, - ) - - # Set the count for the /zone/all topic. - total_label_count += obj_counter[label] - total_active_label_count += active_obj_counter[label] - - # if we have previously published a count for this zone all labels - zone_label = self.zone_data[zone]["all"] - active_zone_label = self.active_zone_data[zone]["all"] - if camera in zone_label: - current_count = sum(zone_label.values()) - current_active_count = sum(active_zone_label.values()) - zone_label[camera] = total_label_count - active_zone_label[camera] = total_active_label_count - new_count = sum(zone_label.values()) - new_active_count = sum(active_zone_label.values()) - - if new_count != current_count: - self.dispatcher.publish( - f"{zone}/all", - new_count, - retain=False, - ) - if new_active_count != current_active_count: - self.dispatcher.publish( - f"{zone}/all/active", - new_active_count, - retain=False, - ) - # if this is a new zone all label for this camera - else: - zone_label[camera] = total_label_count - active_zone_label[camera] = total_active_label_count - self.dispatcher.publish( - f"{zone}/all", - total_label_count, - retain=False, - ) - self.dispatcher.publish( - f"{zone}/all/active", - total_active_label_count, - retain=False, - ) - # cleanup event finished queue while not self.stop_event.is_set(): update = self.event_end_subscriber.check_for_update(timeout=0.01)