From 15fa55c2230c371aaa48d8f496a6b0648b14e0ca Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Tue, 1 Oct 2024 07:31:03 -0600 Subject: [PATCH] Refactor attribute saving (#14090) * Refactor attribute saving * Ensure sub label is not overwritten * Formatting * Fix unused --- frigate/object_processing.py | 19 +++++++++--- frigate/test/test_obects.py | 50 +++++++++++++++++++++++++++++++ frigate/track/object_attribute.py | 44 +++++++++++++++++++++++++++ frigate/video.py | 39 +++++++++++++----------- 4 files changed, 130 insertions(+), 22 deletions(-) create mode 100644 frigate/test/test_obects.py create mode 100644 frigate/track/object_attribute.py diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 19cb5ae2b..55f494d55 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -108,7 +108,12 @@ def is_better_thumbnail(label, current_thumb, new_obj, frame_shape) -> bool: class TrackedObject: def __init__( - self, camera, colormap, camera_config: CameraConfig, frame_cache, obj_data + self, + camera, + colormap, + camera_config: CameraConfig, + frame_cache, + obj_data: dict[str, any], ): # set the score history then remove as it is not part of object state self.score_history = obj_data["score_history"] @@ -227,8 +232,8 @@ class TrackedObject: if self.attributes[attr["label"]] < attr["score"]: self.attributes[attr["label"]] = attr["score"] - # populate the sub_label for car with highest scoring logo - if self.obj_data["label"] == "car": + # populate the sub_label for object with highest scoring logo + if self.obj_data["label"] in ["car", "package", "person"]: recognized_logos = { k: self.attributes[k] for k in ["ups", "fedex", "amazon"] @@ -236,7 +241,13 @@ class TrackedObject: } if len(recognized_logos) > 0: max_logo = max(recognized_logos, key=recognized_logos.get) - self.obj_data["sub_label"] = (max_logo, recognized_logos[max_logo]) + + # don't overwrite sub label if it is already set + if ( + self.obj_data.get("sub_label") is None + or self.obj_data["sub_label"][0] == max_logo + ): + self.obj_data["sub_label"] = (max_logo, recognized_logos[max_logo]) # check for significant change if not self.false_positive: diff --git a/frigate/test/test_obects.py b/frigate/test/test_obects.py new file mode 100644 index 000000000..f1c039ef8 --- /dev/null +++ b/frigate/test/test_obects.py @@ -0,0 +1,50 @@ +import unittest + +from frigate.track.object_attribute import ObjectAttribute + + +class TestAttribute(unittest.TestCase): + def test_overlapping_object_selection(self) -> None: + attribute = ObjectAttribute( + ( + "amazon", + 0.80078125, + (847, 242, 883, 255), + 468, + 2.769230769230769, + (702, 134, 1050, 482), + ) + ) + objects = [ + { + "label": "car", + "score": 0.98828125, + "box": (728, 223, 1266, 719), + "area": 266848, + "ratio": 1.0846774193548387, + "region": (349, 0, 1397, 1048), + "frame_time": 1727785394.498972, + "centroid": (997, 471), + "id": "1727785349.150633-408hal", + "start_time": 1727785349.150633, + "motionless_count": 362, + "position_changes": 0, + "score_history": [0.98828125, 0.95703125, 0.98828125, 0.98828125], + }, + { + "label": "person", + "score": 0.76953125, + "box": (826, 172, 939, 417), + "area": 27685, + "ratio": 0.46122448979591835, + "region": (702, 134, 1050, 482), + "frame_time": 1727785394.498972, + "centroid": (882, 294), + "id": "1727785390.499768-9fbhem", + "start_time": 1727785390.499768, + "motionless_count": 2, + "position_changes": 1, + "score_history": [0.8828125, 0.83984375, 0.91796875, 0.94140625], + }, + ] + assert attribute.find_best_object(objects) == "1727785390.499768-9fbhem" diff --git a/frigate/track/object_attribute.py b/frigate/track/object_attribute.py new file mode 100644 index 000000000..54433c5f3 --- /dev/null +++ b/frigate/track/object_attribute.py @@ -0,0 +1,44 @@ +"""Object attribute.""" + +from frigate.util.object import area, box_inside + + +class ObjectAttribute: + def __init__(self, raw_data: tuple) -> None: + self.label = raw_data[0] + self.score = raw_data[1] + self.box = raw_data[2] + self.area = raw_data[3] + self.ratio = raw_data[4] + self.region = raw_data[5] + + def get_tracking_data(self) -> dict[str, any]: + """Return data saved to the object.""" + return { + "label": self.label, + "score": self.score, + "box": self.box, + } + + def find_best_object(self, objects: list[dict[str, any]]) -> str: + """Find the best attribute for each object and return its ID.""" + best_object_area = None + best_object_id = None + + for obj in objects: + if not box_inside(obj["box"], self.box): + continue + + object_area = area(obj["box"]) + + # if multiple objects have the same attribute then they + # are overlapping, it is most likely that the smaller object + # is the one with the attribute + if best_object_area is None: + best_object_area = object_area + best_object_id = obj["id"] + elif object_area < best_object_area: + best_object_area = object_area + best_object_id = obj["id"] + + return best_object_id diff --git a/frigate/video.py b/frigate/video.py index 485764d2e..2af38540c 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -27,6 +27,7 @@ from frigate.object_detection import RemoteObjectDetector from frigate.ptz.autotrack import ptz_moving_at_frame_time from frigate.track import ObjectTracker from frigate.track.norfair_tracker import NorfairTracker +from frigate.track.object_attribute import ObjectAttribute from frigate.util.builtin import EventsPerSecond, get_tomorrow_at_time from frigate.util.image import ( FrameManager, @@ -34,7 +35,6 @@ from frigate.util.image import ( draw_box_with_label, ) from frigate.util.object import ( - box_inside, create_tensor_input, get_cluster_candidates, get_cluster_region, @@ -734,29 +734,32 @@ def process_frames( object_tracker.update_frame_times(frame_time) # group the attribute detections based on what label they apply to - attribute_detections = {} + attribute_detections: dict[str, ObjectAttribute] = {} for label, attribute_labels in model_config.attributes_map.items(): attribute_detections[label] = [ - d for d in consolidated_detections if d[0] in attribute_labels + ObjectAttribute(d) + for d in consolidated_detections + if d[0] in attribute_labels ] - # build detections and add attributes + # build detections detections = {} for obj in object_tracker.tracked_objects.values(): - attributes = [] - # if the objects label has associated attribute detections - if obj["label"] in attribute_detections.keys(): - # add them to attributes if they intersect - for attribute_detection in attribute_detections[obj["label"]]: - if box_inside(obj["box"], (attribute_detection[2])): - attributes.append( - { - "label": attribute_detection[0], - "score": attribute_detection[1], - "box": attribute_detection[2], - } - ) - detections[obj["id"]] = {**obj, "attributes": attributes} + detections[obj["id"]] = {**obj, "attributes": []} + + # find the best object for each attribute to be assigned to + all_objects: list[dict[str, any]] = object_tracker.tracked_objects.values() + for attributes in attribute_detections.values(): + for attribute in attributes: + filtered_objects = filter( + lambda o: o["label"] in attribute_detections.keys(), all_objects + ) + selected_object_id = attribute.find_best_object(filtered_objects) + + if selected_object_id is not None: + detections[selected_object_id]["attributes"].append( + attribute.get_tracking_data() + ) # debug object tracking if False: