mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-21 19:07:46 +01:00
Refactor attribute saving (#14090)
* Refactor attribute saving * Ensure sub label is not overwritten * Formatting * Fix unused
This commit is contained in:
parent
594ca3a04b
commit
15fa55c223
@ -108,7 +108,12 @@ def is_better_thumbnail(label, current_thumb, new_obj, frame_shape) -> bool:
|
||||
|
||||
class TrackedObject:
|
||||
def __init__(
|
||||
self, camera, colormap, camera_config: CameraConfig, frame_cache, obj_data
|
||||
self,
|
||||
camera,
|
||||
colormap,
|
||||
camera_config: CameraConfig,
|
||||
frame_cache,
|
||||
obj_data: dict[str, any],
|
||||
):
|
||||
# set the score history then remove as it is not part of object state
|
||||
self.score_history = obj_data["score_history"]
|
||||
@ -227,8 +232,8 @@ class TrackedObject:
|
||||
if self.attributes[attr["label"]] < attr["score"]:
|
||||
self.attributes[attr["label"]] = attr["score"]
|
||||
|
||||
# populate the sub_label for car with highest scoring logo
|
||||
if self.obj_data["label"] == "car":
|
||||
# populate the sub_label for object with highest scoring logo
|
||||
if self.obj_data["label"] in ["car", "package", "person"]:
|
||||
recognized_logos = {
|
||||
k: self.attributes[k]
|
||||
for k in ["ups", "fedex", "amazon"]
|
||||
@ -236,7 +241,13 @@ class TrackedObject:
|
||||
}
|
||||
if len(recognized_logos) > 0:
|
||||
max_logo = max(recognized_logos, key=recognized_logos.get)
|
||||
self.obj_data["sub_label"] = (max_logo, recognized_logos[max_logo])
|
||||
|
||||
# don't overwrite sub label if it is already set
|
||||
if (
|
||||
self.obj_data.get("sub_label") is None
|
||||
or self.obj_data["sub_label"][0] == max_logo
|
||||
):
|
||||
self.obj_data["sub_label"] = (max_logo, recognized_logos[max_logo])
|
||||
|
||||
# check for significant change
|
||||
if not self.false_positive:
|
||||
|
50
frigate/test/test_obects.py
Normal file
50
frigate/test/test_obects.py
Normal file
@ -0,0 +1,50 @@
|
||||
import unittest
|
||||
|
||||
from frigate.track.object_attribute import ObjectAttribute
|
||||
|
||||
|
||||
class TestAttribute(unittest.TestCase):
|
||||
def test_overlapping_object_selection(self) -> None:
|
||||
attribute = ObjectAttribute(
|
||||
(
|
||||
"amazon",
|
||||
0.80078125,
|
||||
(847, 242, 883, 255),
|
||||
468,
|
||||
2.769230769230769,
|
||||
(702, 134, 1050, 482),
|
||||
)
|
||||
)
|
||||
objects = [
|
||||
{
|
||||
"label": "car",
|
||||
"score": 0.98828125,
|
||||
"box": (728, 223, 1266, 719),
|
||||
"area": 266848,
|
||||
"ratio": 1.0846774193548387,
|
||||
"region": (349, 0, 1397, 1048),
|
||||
"frame_time": 1727785394.498972,
|
||||
"centroid": (997, 471),
|
||||
"id": "1727785349.150633-408hal",
|
||||
"start_time": 1727785349.150633,
|
||||
"motionless_count": 362,
|
||||
"position_changes": 0,
|
||||
"score_history": [0.98828125, 0.95703125, 0.98828125, 0.98828125],
|
||||
},
|
||||
{
|
||||
"label": "person",
|
||||
"score": 0.76953125,
|
||||
"box": (826, 172, 939, 417),
|
||||
"area": 27685,
|
||||
"ratio": 0.46122448979591835,
|
||||
"region": (702, 134, 1050, 482),
|
||||
"frame_time": 1727785394.498972,
|
||||
"centroid": (882, 294),
|
||||
"id": "1727785390.499768-9fbhem",
|
||||
"start_time": 1727785390.499768,
|
||||
"motionless_count": 2,
|
||||
"position_changes": 1,
|
||||
"score_history": [0.8828125, 0.83984375, 0.91796875, 0.94140625],
|
||||
},
|
||||
]
|
||||
assert attribute.find_best_object(objects) == "1727785390.499768-9fbhem"
|
44
frigate/track/object_attribute.py
Normal file
44
frigate/track/object_attribute.py
Normal file
@ -0,0 +1,44 @@
|
||||
"""Object attribute."""
|
||||
|
||||
from frigate.util.object import area, box_inside
|
||||
|
||||
|
||||
class ObjectAttribute:
|
||||
def __init__(self, raw_data: tuple) -> None:
|
||||
self.label = raw_data[0]
|
||||
self.score = raw_data[1]
|
||||
self.box = raw_data[2]
|
||||
self.area = raw_data[3]
|
||||
self.ratio = raw_data[4]
|
||||
self.region = raw_data[5]
|
||||
|
||||
def get_tracking_data(self) -> dict[str, any]:
|
||||
"""Return data saved to the object."""
|
||||
return {
|
||||
"label": self.label,
|
||||
"score": self.score,
|
||||
"box": self.box,
|
||||
}
|
||||
|
||||
def find_best_object(self, objects: list[dict[str, any]]) -> str:
|
||||
"""Find the best attribute for each object and return its ID."""
|
||||
best_object_area = None
|
||||
best_object_id = None
|
||||
|
||||
for obj in objects:
|
||||
if not box_inside(obj["box"], self.box):
|
||||
continue
|
||||
|
||||
object_area = area(obj["box"])
|
||||
|
||||
# if multiple objects have the same attribute then they
|
||||
# are overlapping, it is most likely that the smaller object
|
||||
# is the one with the attribute
|
||||
if best_object_area is None:
|
||||
best_object_area = object_area
|
||||
best_object_id = obj["id"]
|
||||
elif object_area < best_object_area:
|
||||
best_object_area = object_area
|
||||
best_object_id = obj["id"]
|
||||
|
||||
return best_object_id
|
@ -27,6 +27,7 @@ from frigate.object_detection import RemoteObjectDetector
|
||||
from frigate.ptz.autotrack import ptz_moving_at_frame_time
|
||||
from frigate.track import ObjectTracker
|
||||
from frigate.track.norfair_tracker import NorfairTracker
|
||||
from frigate.track.object_attribute import ObjectAttribute
|
||||
from frigate.util.builtin import EventsPerSecond, get_tomorrow_at_time
|
||||
from frigate.util.image import (
|
||||
FrameManager,
|
||||
@ -34,7 +35,6 @@ from frigate.util.image import (
|
||||
draw_box_with_label,
|
||||
)
|
||||
from frigate.util.object import (
|
||||
box_inside,
|
||||
create_tensor_input,
|
||||
get_cluster_candidates,
|
||||
get_cluster_region,
|
||||
@ -734,29 +734,32 @@ def process_frames(
|
||||
object_tracker.update_frame_times(frame_time)
|
||||
|
||||
# group the attribute detections based on what label they apply to
|
||||
attribute_detections = {}
|
||||
attribute_detections: dict[str, ObjectAttribute] = {}
|
||||
for label, attribute_labels in model_config.attributes_map.items():
|
||||
attribute_detections[label] = [
|
||||
d for d in consolidated_detections if d[0] in attribute_labels
|
||||
ObjectAttribute(d)
|
||||
for d in consolidated_detections
|
||||
if d[0] in attribute_labels
|
||||
]
|
||||
|
||||
# build detections and add attributes
|
||||
# build detections
|
||||
detections = {}
|
||||
for obj in object_tracker.tracked_objects.values():
|
||||
attributes = []
|
||||
# if the objects label has associated attribute detections
|
||||
if obj["label"] in attribute_detections.keys():
|
||||
# add them to attributes if they intersect
|
||||
for attribute_detection in attribute_detections[obj["label"]]:
|
||||
if box_inside(obj["box"], (attribute_detection[2])):
|
||||
attributes.append(
|
||||
{
|
||||
"label": attribute_detection[0],
|
||||
"score": attribute_detection[1],
|
||||
"box": attribute_detection[2],
|
||||
}
|
||||
)
|
||||
detections[obj["id"]] = {**obj, "attributes": attributes}
|
||||
detections[obj["id"]] = {**obj, "attributes": []}
|
||||
|
||||
# find the best object for each attribute to be assigned to
|
||||
all_objects: list[dict[str, any]] = object_tracker.tracked_objects.values()
|
||||
for attributes in attribute_detections.values():
|
||||
for attribute in attributes:
|
||||
filtered_objects = filter(
|
||||
lambda o: o["label"] in attribute_detections.keys(), all_objects
|
||||
)
|
||||
selected_object_id = attribute.find_best_object(filtered_objects)
|
||||
|
||||
if selected_object_id is not None:
|
||||
detections[selected_object_id]["attributes"].append(
|
||||
attribute.get_tracking_data()
|
||||
)
|
||||
|
||||
# debug object tracking
|
||||
if False:
|
||||
|
Loading…
Reference in New Issue
Block a user