Tracking improvements (#16484)

* norfair tracker config per object type

* change default R back to 3.4

* separate trackers for static and autotracking cameras

* tweak params and fix debug draw

* ensure all trackers are correctly updated even when there are no detections

* basic reid with histograms

* check mp value

* check mp value again

* stationary objects won't have embeddings

* don't switch trackers when autotracking is toggled after startup

* improve motion detection during autotracking

* use helper function

* get histogram in tracker instead of detect
This commit is contained in:
Josh Hawkins 2025-02-11 09:37:58 -06:00 committed by GitHub
parent 82f8694464
commit 4ef6214029
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 327 additions and 53 deletions

View File

@ -5,6 +5,7 @@ import imutils
import numpy as np
from scipy.ndimage import gaussian_filter
from frigate.camera import PTZMetrics
from frigate.comms.config_updater import ConfigSubscriber
from frigate.config import MotionConfig
from frigate.motion import MotionDetector
@ -18,6 +19,7 @@ class ImprovedMotionDetector(MotionDetector):
frame_shape,
config: MotionConfig,
fps: int,
ptz_metrics: PTZMetrics = None,
name="improved",
blur_radius=1,
interpolation=cv2.INTER_NEAREST,
@ -48,6 +50,8 @@ class ImprovedMotionDetector(MotionDetector):
self.contrast_values[:, 1:2] = 255
self.contrast_values_index = 0
self.config_subscriber = ConfigSubscriber(f"config/motion/{name}")
self.ptz_metrics = ptz_metrics
self.last_stop_time = None
def is_calibrating(self):
return self.calibrating
@ -64,6 +68,21 @@ class ImprovedMotionDetector(MotionDetector):
if not self.config.enabled:
return motion_boxes
# if ptz motor is moving from autotracking, quickly return
# a single box that is 80% of the frame
if (
self.ptz_metrics.autotracker_enabled.value
and not self.ptz_metrics.motor_stopped.is_set()
):
return [
(
int(self.frame_shape[1] * 0.1),
int(self.frame_shape[0] * 0.1),
int(self.frame_shape[1] * 0.9),
int(self.frame_shape[0] * 0.9),
)
]
gray = frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]]
# resize frame
@ -151,6 +170,25 @@ class ImprovedMotionDetector(MotionDetector):
self.motion_frame_size[0] * self.motion_frame_size[1]
)
# check if the motor has just stopped from autotracking
# if so, reassign the average to the current frame so we begin with a new baseline
if (
# ensure we only do this for cameras with autotracking enabled
self.ptz_metrics.autotracker_enabled.value
and self.ptz_metrics.motor_stopped.is_set()
and (
self.last_stop_time is None
or self.ptz_metrics.stop_time.value != self.last_stop_time
)
# value is 0 on startup or when motor is moving
and self.ptz_metrics.stop_time.value != 0
):
self.last_stop_time = self.ptz_metrics.stop_time.value
self.avg_frame = resized_frame.astype(np.float32)
motion_boxes = []
pct_motion = 0
# once the motion is less than 5% and the number of contours is < 4, assume its calibrated
if pct_motion < 0.05 and len(motion_boxes) <= 4:
self.calibrating = False

View File

@ -465,7 +465,6 @@ class OnvifController:
return
self.cams[camera_name]["active"] = True
self.ptz_metrics[camera_name].motor_stopped.clear()
self.ptz_metrics[camera_name].start_time.value = 0
self.ptz_metrics[camera_name].stop_time.value = 0
move_request = self.cams[camera_name]["move_request"]

View File

@ -1,7 +1,9 @@
import logging
import random
import string
from typing import Sequence
import cv2
import numpy as np
from norfair import (
Detection,
@ -11,12 +13,19 @@ from norfair import (
draw_boxes,
)
from norfair.drawing.drawer import Drawer
from rich import print
from rich.console import Console
from rich.table import Table
from frigate.camera import PTZMetrics
from frigate.config import CameraConfig
from frigate.ptz.autotrack import PtzMotionEstimator
from frigate.track import ObjectTracker
from frigate.util.image import intersection_over_union
from frigate.util.image import (
SharedMemoryFrameManager,
get_histogram,
intersection_over_union,
)
from frigate.util.object import average_boxes, median_of_boxes
logger = logging.getLogger(__name__)
@ -71,12 +80,36 @@ def frigate_distance(detection: Detection, tracked_object) -> float:
return distance(detection.points, tracked_object.estimate)
def histogram_distance(matched_not_init_trackers, unmatched_trackers):
snd_embedding = unmatched_trackers.last_detection.embedding
if snd_embedding is None:
for detection in reversed(unmatched_trackers.past_detections):
if detection.embedding is not None:
snd_embedding = detection.embedding
break
else:
return 1
for detection_fst in matched_not_init_trackers.past_detections:
if detection_fst.embedding is None:
continue
distance = 1 - cv2.compareHist(
snd_embedding, detection_fst.embedding, cv2.HISTCMP_CORREL
)
if distance < 0.5:
return distance
return 1
class NorfairTracker(ObjectTracker):
def __init__(
self,
config: CameraConfig,
ptz_metrics: PTZMetrics,
):
self.frame_manager = SharedMemoryFrameManager()
self.tracked_objects = {}
self.untracked_object_boxes: list[list[int]] = []
self.disappeared = {}
@ -88,26 +121,137 @@ class NorfairTracker(ObjectTracker):
self.ptz_motion_estimator = {}
self.camera_name = config.name
self.track_id_map = {}
# TODO: could also initialize a tracker per object class if there
# was a good reason to have different distance calculations
self.tracker = Tracker(
distance_function=frigate_distance,
distance_threshold=2.5,
initialization_delay=self.detect_config.min_initialized,
hit_counter_max=self.detect_config.max_disappeared,
# use default filter factory with custom values
# R is the multiplier for the sensor measurement noise matrix, default of 4.0
# lowering R means that we trust the position of the bounding boxes more
# testing shows that the prediction was being relied on a bit too much
# TODO: could use different kalman filter values along with
# the different tracker per object class
filter_factory=OptimizedKalmanFilterFactory(R=3.4),
)
# Define tracker configurations for static camera
self.object_type_configs = {
"car": {
"filter_factory": OptimizedKalmanFilterFactory(R=3.4, Q=0.03),
"distance_function": frigate_distance,
"distance_threshold": 2.5,
},
}
# Define autotracking PTZ-specific configurations
self.ptz_object_type_configs = {
"person": {
"filter_factory": OptimizedKalmanFilterFactory(
R=4.5,
Q=0.25,
),
"distance_function": frigate_distance,
"distance_threshold": 2,
"past_detections_length": 5,
"reid_distance_function": histogram_distance,
"reid_distance_threshold": 0.5,
"reid_hit_counter_max": 10,
},
}
# Default tracker configuration
# use default filter factory with custom values
# R is the multiplier for the sensor measurement noise matrix, default of 4.0
# lowering R means that we trust the position of the bounding boxes more
# testing shows that the prediction was being relied on a bit too much
self.default_tracker_config = {
"filter_factory": OptimizedKalmanFilterFactory(R=3.4),
"distance_function": frigate_distance,
"distance_threshold": 2.5,
}
self.default_ptz_tracker_config = {
"filter_factory": OptimizedKalmanFilterFactory(R=4, Q=0.2),
"distance_function": frigate_distance,
"distance_threshold": 3,
}
self.trackers = {}
# Handle static trackers
for obj_type, tracker_config in self.object_type_configs.items():
if obj_type in self.camera_config.objects.track:
if obj_type not in self.trackers:
self.trackers[obj_type] = {}
self.trackers[obj_type]["static"] = self._create_tracker(
obj_type, tracker_config
)
# Handle PTZ trackers
for obj_type, tracker_config in self.ptz_object_type_configs.items():
if (
obj_type in self.camera_config.onvif.autotracking.track
and self.camera_config.onvif.autotracking.enabled_in_config
):
if obj_type not in self.trackers:
self.trackers[obj_type] = {}
self.trackers[obj_type]["ptz"] = self._create_tracker(
obj_type, tracker_config
)
# Initialize default trackers
self.default_tracker = {
"static": Tracker(
distance_function=frigate_distance,
distance_threshold=self.default_tracker_config["distance_threshold"],
initialization_delay=self.detect_config.min_initialized,
hit_counter_max=self.detect_config.max_disappeared,
filter_factory=self.default_tracker_config["filter_factory"],
),
"ptz": Tracker(
distance_function=frigate_distance,
distance_threshold=self.default_ptz_tracker_config[
"distance_threshold"
],
initialization_delay=self.detect_config.min_initialized,
hit_counter_max=self.detect_config.max_disappeared,
filter_factory=self.default_ptz_tracker_config["filter_factory"],
),
}
if self.ptz_metrics.autotracker_enabled.value:
self.ptz_motion_estimator = PtzMotionEstimator(
self.camera_config, self.ptz_metrics
)
def _create_tracker(self, obj_type, tracker_config):
"""Helper function to create a tracker with given configuration."""
tracker_params = {
"distance_function": tracker_config["distance_function"],
"distance_threshold": tracker_config["distance_threshold"],
"initialization_delay": self.detect_config.min_initialized,
"hit_counter_max": self.detect_config.max_disappeared,
"filter_factory": tracker_config["filter_factory"],
}
# Add reid parameters if max_frames is None
if (
self.detect_config.stationary.max_frames.objects.get(
obj_type, self.detect_config.stationary.max_frames.default
)
is None
):
reid_keys = [
"past_detections_length",
"reid_distance_function",
"reid_distance_threshold",
"reid_hit_counter_max",
]
tracker_params.update(
{key: tracker_config[key] for key in reid_keys if key in tracker_config}
)
return Tracker(**tracker_params)
def get_tracker(self, object_type: str) -> Tracker:
"""Get the appropriate tracker based on object type and camera mode."""
mode = (
"ptz"
if self.camera_config.onvif.autotracking.enabled_in_config
and object_type in self.camera_config.onvif.autotracking.track
else "static"
)
if object_type in self.trackers:
return self.trackers[object_type][mode]
return self.default_tracker[mode]
def register(self, track_id, obj):
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
id = f"{obj['frame_time']}-{rand_id}"
@ -116,10 +260,13 @@ class NorfairTracker(ObjectTracker):
obj["start_time"] = obj["frame_time"]
obj["motionless_count"] = 0
obj["position_changes"] = 0
# Get the correct tracker for this object's label
tracker = self.get_tracker(obj["label"])
obj["score_history"] = [
p.data["score"]
for p in next(
(o for o in self.tracker.tracked_objects if o.global_id == track_id)
(o for o in tracker.tracked_objects if o.global_id == track_id)
).past_detections
]
self.tracked_objects[id] = obj
@ -137,11 +284,25 @@ class NorfairTracker(ObjectTracker):
self.stationary_box_history[id] = []
def deregister(self, id, track_id):
obj = self.tracked_objects[id]
del self.tracked_objects[id]
del self.disappeared[id]
self.tracker.tracked_objects = [
o for o in self.tracker.tracked_objects if o.global_id != track_id
]
# only manually deregister objects from norfair's list if max_frames is defined
if (
self.detect_config.stationary.max_frames.objects.get(
obj["label"], self.detect_config.stationary.max_frames.default
)
is not None
):
tracker = self.get_tracker(obj["label"])
tracker.tracked_objects = [
o
for o in tracker.tracked_objects
if o.global_id != track_id and o.hit_counter < 0
]
del self.track_id_map[track_id]
# tracks the current position of the object based on the last N bounding boxes
@ -287,9 +448,13 @@ class NorfairTracker(ObjectTracker):
def match_and_update(
self, frame_name: str, frame_time: float, detections: list[dict[str, any]]
):
norfair_detections = []
# Group detections by object type
detections_by_type = {}
for obj in detections:
label = obj[0]
if label not in detections_by_type:
detections_by_type[label] = []
# centroid is used for other things downstream
centroid_x = int((obj[2][0] + obj[2][2]) / 2.0)
centroid_y = int((obj[2][1] + obj[2][3]) / 2.0)
@ -297,22 +462,32 @@ class NorfairTracker(ObjectTracker):
# track based on top,left and bottom,right corners instead of centroid
points = np.array([[obj[2][0], obj[2][1]], [obj[2][2], obj[2][3]]])
norfair_detections.append(
Detection(
points=points,
label=obj[0],
data={
"label": obj[0],
"score": obj[1],
"box": obj[2],
"area": obj[3],
"ratio": obj[4],
"region": obj[5],
"frame_time": frame_time,
"centroid": (centroid_x, centroid_y),
},
embedding = None
if self.ptz_metrics.autotracker_enabled.value:
yuv_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
embedding = get_histogram(
yuv_frame, obj[2][0], obj[2][1], obj[2][2], obj[2][3]
)
detection = Detection(
points=points,
label=label,
# TODO: stationary objects won't have embeddings
embedding=embedding,
data={
"label": label,
"score": obj[1],
"box": obj[2],
"area": obj[3],
"ratio": obj[4],
"region": obj[5],
"frame_time": frame_time,
"centroid": (centroid_x, centroid_y),
},
)
detections_by_type[label].append(detection)
coord_transformations = None
@ -327,13 +502,32 @@ class NorfairTracker(ObjectTracker):
detections, frame_name, frame_time, self.camera_name
)
tracked_objects = self.tracker.update(
detections=norfair_detections, coord_transformations=coord_transformations
# Update all configured trackers
all_tracked_objects = []
for label in self.trackers:
tracker = self.get_tracker(label)
tracked_objects = tracker.update(
detections=detections_by_type.get(label, []),
coord_transformations=coord_transformations,
)
all_tracked_objects.extend(tracked_objects)
# Collect detections for objects without specific trackers
default_detections = []
for label, dets in detections_by_type.items():
if label not in self.trackers:
default_detections.extend(dets)
# Update default tracker with untracked detections
mode = "ptz" if self.ptz_metrics.autotracker_enabled.value else "static"
tracked_objects = self.default_tracker[mode].update(
detections=default_detections, coord_transformations=coord_transformations
)
all_tracked_objects.extend(tracked_objects)
# update or create new tracks
active_ids = []
for t in tracked_objects:
for t in all_tracked_objects:
estimate = tuple(t.estimate.flatten().astype(int))
# keep the estimate within the bounds of the image
estimate = (
@ -373,19 +567,55 @@ class NorfairTracker(ObjectTracker):
o[2] for o in detections if o[2] not in tracked_object_boxes
]
def print_objects_as_table(self, tracked_objects: Sequence):
"""Used for helping in debugging"""
print()
console = Console()
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Id", style="yellow", justify="center")
table.add_column("Age", justify="right")
table.add_column("Hit Counter", justify="right")
table.add_column("Last distance", justify="right")
table.add_column("Init Id", justify="center")
for obj in tracked_objects:
table.add_row(
str(obj.id),
str(obj.age),
str(obj.hit_counter),
f"{obj.last_distance:.4f}" if obj.last_distance is not None else "N/A",
str(obj.initializing_id),
)
console.print(table)
def debug_draw(self, frame, frame_time):
# Collect all tracked objects from each tracker
all_tracked_objects = []
# print a table to the console with norfair tracked object info
if False:
self.print_objects_as_table(self.trackers["person"]["ptz"].tracked_objects)
# Get tracked objects from type-specific trackers
for object_trackers in self.trackers.values():
for tracker in object_trackers.values():
all_tracked_objects.extend(tracker.tracked_objects)
# Get tracked objects from default trackers
for tracker in self.default_tracker.values():
all_tracked_objects.extend(tracker.tracked_objects)
active_detections = [
Drawable(id=obj.id, points=obj.last_detection.points, label=obj.label)
for obj in self.tracker.tracked_objects
for obj in all_tracked_objects
if obj.last_detection.data["frame_time"] == frame_time
]
missing_detections = [
Drawable(id=obj.id, points=obj.last_detection.points, label=obj.label)
for obj in self.tracker.tracked_objects
for obj in all_tracked_objects
if obj.last_detection.data["frame_time"] != frame_time
]
# draw the estimated bounding box
draw_boxes(frame, self.tracker.tracked_objects, color="green", draw_ids=True)
draw_boxes(frame, all_tracked_objects, color="green", draw_ids=True)
# draw the detections that were detected in the current frame
draw_boxes(frame, active_detections, color="blue", draw_ids=True)
# draw the detections that are missing in the current frame
@ -393,7 +623,7 @@ class NorfairTracker(ObjectTracker):
# draw the distance calculation for the last detection
# estimate vs detection
for obj in self.tracker.tracked_objects:
for obj in all_tracked_objects:
ld = obj.last_detection
# bottom right
text_anchor = (

View File

@ -949,3 +949,13 @@ def get_image_from_recording(
return process.stdout
else:
return None
def get_histogram(image, x_min, y_min, x_max, y_max):
image_bgr = cv2.cvtColor(image, cv2.COLOR_YUV2BGR_I420)
image_bgr = image_bgr[y_min:y_max, x_min:x_max]
hist = cv2.calcHist(
[image_bgr], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]
)
return cv2.normalize(hist, hist).flatten()

View File

@ -435,7 +435,11 @@ def track_camera(
object_filters = config.objects.filters
motion_detector = ImprovedMotionDetector(
frame_shape, config.motion, config.detect.fps, name=config.name
frame_shape,
config.motion,
config.detect.fps,
name=config.name,
ptz_metrics=ptz_metrics,
)
object_detector = RemoteObjectDetector(
name, labelmap, detection_queue, result_connection, model_config, stop_event
@ -506,14 +510,7 @@ def detect(
height = y_max - y_min
area = width * height
ratio = width / max(1, height)
det = (
d[0],
d[1],
(x_min, y_min, x_max, y_max),
area,
ratio,
region,
)
det = (d[0], d[1], (x_min, y_min, x_max, y_max), area, ratio, region)
# apply object filters
if is_object_filtered(det, objects_to_track, object_filters):
continue