From 2f209b2cf4acdb7bf71d8a42befce36d679815a6 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Thu, 25 Sep 2025 09:18:45 -0600 Subject: [PATCH] Implement stationary car classifier to improve parked car management (#20206) * Implement stationary car classifier to base stationary state on visual changes and not just bounding box stability * Cleanup * Fix mypy * Move to new file and add config to disable if needed * Cleanup * Undo --- docs/docs/configuration/reference.md | 3 + frigate/config/camera/detect.py | 4 + frigate/track/norfair_tracker.py | 109 ++++++++----- frigate/track/stationary_classifier.py | 202 +++++++++++++++++++++++++ 4 files changed, 283 insertions(+), 35 deletions(-) create mode 100644 frigate/track/stationary_classifier.py diff --git a/docs/docs/configuration/reference.md b/docs/docs/configuration/reference.md index 5b741ec18..5f4cff4e7 100644 --- a/docs/docs/configuration/reference.md +++ b/docs/docs/configuration/reference.md @@ -287,6 +287,9 @@ detect: max_disappeared: 25 # Optional: Configuration for stationary object tracking stationary: + # Optional: Stationary classifier that uses visual characteristics to determine if an object + # is stationary even if the box changes enough to be considered motion (default: shown below). + classifier: True # Optional: Frequency for confirming stationary objects (default: same as threshold) # When set to 1, object detection will run to confirm the object still exists on every frame. # If set to 10, object detection will run to confirm the object still exists on every 10th frame. diff --git a/frigate/config/camera/detect.py b/frigate/config/camera/detect.py index 99e02c2c8..1926f3254 100644 --- a/frigate/config/camera/detect.py +++ b/frigate/config/camera/detect.py @@ -29,6 +29,10 @@ class StationaryConfig(FrigateBaseModel): default_factory=StationaryMaxFramesConfig, title="Max frames for stationary objects.", ) + classifier: bool = Field( + default=True, + title="Enable visual classifier for determing if objects with jittery bounding boxes are stationary.", + ) class DetectConfig(FrigateBaseModel): diff --git a/frigate/track/norfair_tracker.py b/frigate/track/norfair_tracker.py index cdbd35f10..9cddb17ed 100644 --- a/frigate/track/norfair_tracker.py +++ b/frigate/track/norfair_tracker.py @@ -1,7 +1,7 @@ import logging import random import string -from typing import Any, Sequence +from typing import Any, Sequence, cast import cv2 import numpy as np @@ -17,6 +17,7 @@ from frigate.camera import PTZMetrics from frigate.config import CameraConfig from frigate.ptz.autotrack import PtzMotionEstimator from frigate.track import ObjectTracker +from frigate.track.stationary_classifier import StationaryMotionClassifier from frigate.util.image import ( SharedMemoryFrameManager, get_histogram, @@ -119,6 +120,7 @@ class NorfairTracker(ObjectTracker): self.ptz_motion_estimator: PtzMotionEstimator | None = None self.camera_name = config.name self.track_id_map: dict[str, str] = {} + self.stationary_classifier = StationaryMotionClassifier() # Define tracker configurations for static camera self.object_type_configs = { @@ -321,7 +323,13 @@ class NorfairTracker(ObjectTracker): # tracks the current position of the object based on the last N bounding boxes # returns False if the object has moved outside its previous position - def update_position(self, id: str, box: list[int], stationary: bool) -> bool: + def update_position( + self, + id: str, + box: list[int], + stationary: bool, + yuv_frame: np.ndarray | None, + ) -> bool: xmin, ymin, xmax, ymax = box position = self.positions[id] self.stationary_box_history[id].append(box) @@ -331,30 +339,44 @@ class NorfairTracker(ObjectTracker): -MAX_STATIONARY_HISTORY: ] - avg_iou = intersection_over_union( - box, average_boxes(self.stationary_box_history[id]) - ) + avg_box = average_boxes(self.stationary_box_history[id]) + avg_iou = intersection_over_union(box, avg_box) + median_box = median_of_boxes(self.stationary_box_history[id]) + + # Establish anchor early when stationary and stable + if stationary and yuv_frame is not None: + history = self.stationary_box_history[id] + if id not in self.stationary_classifier.anchor_crops and len(history) >= 5: + stability_iou = intersection_over_union(avg_box, median_box) + if stability_iou >= 0.7: + self.stationary_classifier.ensure_anchor( + id, yuv_frame, cast(tuple[int, int, int, int], median_box) + ) # object has minimal or zero iou # assume object is active if avg_iou < THRESHOLD_KNOWN_ACTIVE_IOU: - self.positions[id] = { - "xmins": [xmin], - "ymins": [ymin], - "xmaxs": [xmax], - "ymaxs": [ymax], - "xmin": xmin, - "ymin": ymin, - "xmax": xmax, - "ymax": ymax, - } - return False + if stationary and yuv_frame is not None: + if not self.stationary_classifier.evaluate( + id, yuv_frame, cast(tuple[int, int, int, int], tuple(box)) + ): + self.positions[id] = { + "xmins": [xmin], + "ymins": [ymin], + "xmaxs": [xmax], + "ymaxs": [ymax], + "xmin": xmin, + "ymin": ymin, + "xmax": xmax, + "ymax": ymax, + } + return False threshold = ( THRESHOLD_STATIONARY_CHECK_IOU if stationary else THRESHOLD_ACTIVE_CHECK_IOU ) - # object has iou below threshold, check median to reduce outliers + # object has iou below threshold, check median and optionally crop similarity if avg_iou < threshold: median_iou = intersection_over_union( ( @@ -363,23 +385,28 @@ class NorfairTracker(ObjectTracker): position["xmax"], position["ymax"], ), - median_of_boxes(self.stationary_box_history[id]), + median_box, ) # if the median iou drops below the threshold # assume object is no longer stationary if median_iou < threshold: - self.positions[id] = { - "xmins": [xmin], - "ymins": [ymin], - "xmaxs": [xmax], - "ymaxs": [ymax], - "xmin": xmin, - "ymin": ymin, - "xmax": xmax, - "ymax": ymax, - } - return False + # Before flipping to active, check with classifier if we have YUV frame + if stationary and yuv_frame is not None: + if not self.stationary_classifier.evaluate( + id, yuv_frame, cast(tuple[int, int, int, int], tuple(box)) + ): + self.positions[id] = { + "xmins": [xmin], + "ymins": [ymin], + "xmaxs": [xmax], + "ymaxs": [ymax], + "xmin": xmin, + "ymin": ymin, + "xmax": xmax, + "ymax": ymax, + } + return False # if there are more than 5 and less than 10 entries for the position, add the bounding box # and recompute the position box @@ -416,7 +443,12 @@ class NorfairTracker(ObjectTracker): return False - def update(self, track_id: str, obj: dict[str, Any]) -> None: + def update( + self, + track_id: str, + obj: dict[str, Any], + yuv_frame: np.ndarray | None, + ) -> None: id = self.track_id_map[track_id] self.disappeared[id] = 0 stationary = ( @@ -424,7 +456,7 @@ class NorfairTracker(ObjectTracker): >= self.detect_config.stationary.threshold ) # update the motionless count if the object has not moved to a new position - if self.update_position(id, obj["box"], stationary): + if self.update_position(id, obj["box"], stationary, yuv_frame): self.tracked_objects[id]["motionless_count"] += 1 if self.is_expired(id): self.deregister(id, track_id) @@ -440,6 +472,7 @@ class NorfairTracker(ObjectTracker): self.tracked_objects[id]["position_changes"] += 1 self.tracked_objects[id]["motionless_count"] = 0 self.stationary_box_history[id] = [] + self.stationary_classifier.on_active(id) self.tracked_objects[id].update(obj) @@ -467,6 +500,15 @@ class NorfairTracker(ObjectTracker): ) -> None: # Group detections by object type detections_by_type: dict[str, list[Detection]] = {} + yuv_frame: np.ndarray | None = None + + if self.ptz_metrics.autotracker_enabled.value or ( + self.detect_config.stationary.classifier + and any(obj[0] == "car" for obj in detections) + ): + yuv_frame = self.frame_manager.get( + frame_name, self.camera_config.frame_shape_yuv + ) for obj in detections: label = obj[0] if label not in detections_by_type: @@ -481,9 +523,6 @@ class NorfairTracker(ObjectTracker): embedding = None if self.ptz_metrics.autotracker_enabled.value: - yuv_frame = self.frame_manager.get( - frame_name, self.camera_config.frame_shape_yuv - ) embedding = get_histogram( yuv_frame, obj[2][0], obj[2][1], obj[2][2], obj[2][3] ) @@ -575,7 +614,7 @@ class NorfairTracker(ObjectTracker): self.tracked_objects[id]["estimate"] = new_obj["estimate"] # else update it else: - self.update(str(t.global_id), new_obj) + self.update(str(t.global_id), new_obj, yuv_frame) # clear expired tracks expired_ids = [k for k in self.track_id_map.keys() if k not in active_ids] diff --git a/frigate/track/stationary_classifier.py b/frigate/track/stationary_classifier.py new file mode 100644 index 000000000..45e51b839 --- /dev/null +++ b/frigate/track/stationary_classifier.py @@ -0,0 +1,202 @@ +"""Tools for determining if an object is stationary.""" + +import logging +from typing import Any, cast + +import cv2 +import numpy as np +from scipy.ndimage import gaussian_filter + +logger = logging.getLogger(__name__) + + +THRESHOLD_KNOWN_ACTIVE_IOU = 0.2 +THRESHOLD_STATIONARY_CHECK_IOU = 0.6 +THRESHOLD_ACTIVE_CHECK_IOU = 0.9 +MAX_STATIONARY_HISTORY = 10 + + +class StationaryMotionClassifier: + """Fallback classifier to prevent false flips from stationary to active. + + Uses appearance consistency on a fixed spatial region (historical median box) + to detect actual movement, ignoring bounding box detection variations. + """ + + CROP_SIZE = 96 + NCC_KEEP_THRESHOLD = 0.90 # High correlation = keep stationary + NCC_ACTIVE_THRESHOLD = 0.85 # Low correlation = consider active + SHIFT_KEEP_THRESHOLD = 0.02 # Small shift = keep stationary + SHIFT_ACTIVE_THRESHOLD = 0.04 # Large shift = consider active + DRIFT_ACTIVE_THRESHOLD = 0.12 # Cumulative drift over 5 frames + CHANGED_FRAMES_TO_FLIP = 2 + + def __init__(self) -> None: + self.anchor_crops: dict[str, np.ndarray] = {} + self.anchor_boxes: dict[str, tuple[int, int, int, int]] = {} + self.changed_counts: dict[str, int] = {} + self.shift_histories: dict[str, list[float]] = {} + + # Pre-compute Hanning window for phase correlation + hann = np.hanning(self.CROP_SIZE).astype(np.float64) + self._hann2d = np.outer(hann, hann) + + def reset(self, id: str) -> None: + logger.debug("StationaryMotionClassifier.reset: id=%s", id) + if id in self.anchor_crops: + del self.anchor_crops[id] + if id in self.anchor_boxes: + del self.anchor_boxes[id] + self.changed_counts[id] = 0 + self.shift_histories[id] = [] + + def _extract_y_crop( + self, yuv_frame: np.ndarray, box: tuple[int, int, int, int] + ) -> np.ndarray: + """Extract and normalize Y-plane crop from bounding box.""" + y_height = yuv_frame.shape[0] // 3 * 2 + width = yuv_frame.shape[1] + x1 = max(0, min(width - 1, box[0])) + y1 = max(0, min(y_height - 1, box[1])) + x2 = max(0, min(width - 1, box[2])) + y2 = max(0, min(y_height - 1, box[3])) + + if x2 <= x1: + x2 = min(width - 1, x1 + 1) + if y2 <= y1: + y2 = min(y_height - 1, y1 + 1) + + # Extract Y-plane crop, resize, and blur + y_plane = yuv_frame[0:y_height, 0:width] + crop = y_plane[y1:y2, x1:x2] + crop_resized = cv2.resize( + crop, (self.CROP_SIZE, self.CROP_SIZE), interpolation=cv2.INTER_AREA + ) + result = cast(np.ndarray[Any, Any], gaussian_filter(crop_resized, sigma=0.5)) + logger.debug( + "_extract_y_crop: box=%s clamped=(%d,%d,%d,%d) crop_shape=%s", + box, + x1, + y1, + x2, + y2, + crop.shape if "crop" in locals() else None, + ) + return result + + def ensure_anchor( + self, id: str, yuv_frame: np.ndarray, median_box: tuple[int, int, int, int] + ) -> None: + """Initialize anchor crop from stable median box when object becomes stationary.""" + if id not in self.anchor_crops: + self.anchor_boxes[id] = median_box + self.anchor_crops[id] = self._extract_y_crop(yuv_frame, median_box) + self.changed_counts[id] = 0 + self.shift_histories[id] = [] + logger.debug( + "ensure_anchor: initialized id=%s median_box=%s crop_shape=%s", + id, + median_box, + self.anchor_crops[id].shape, + ) + + def on_active(self, id: str) -> None: + """Reset state when object becomes active to allow re-anchoring.""" + logger.debug("on_active: id=%s became active; resetting state", id) + self.reset(id) + + def evaluate( + self, id: str, yuv_frame: np.ndarray, current_box: tuple[int, int, int, int] + ) -> bool: + """Return True to keep stationary, False to flip to active. + + Compares the same spatial region (historical median box) across frames + to detect actual movement, ignoring bounding box variations. + """ + + if id not in self.anchor_crops or id not in self.anchor_boxes: + logger.debug("evaluate: id=%s has no anchor; default keep stationary", id) + return True + + # Compare same spatial region across frames + anchor_box = self.anchor_boxes[id] + anchor_crop = self.anchor_crops[id] + curr_crop = self._extract_y_crop(yuv_frame, anchor_box) + + # Compute appearance and motion metrics + ncc = cv2.matchTemplate(curr_crop, anchor_crop, cv2.TM_CCOEFF_NORMED)[0, 0] + a64 = anchor_crop.astype(np.float64) * self._hann2d + c64 = curr_crop.astype(np.float64) * self._hann2d + (shift_x, shift_y), _ = cv2.phaseCorrelate(a64, c64) + shift_norm = float(np.hypot(shift_x, shift_y)) / float(self.CROP_SIZE) + + logger.debug( + "evaluate: id=%s metrics ncc=%.4f shift_norm=%.4f (shift_x=%.3f, shift_y=%.3f)", + id, + float(ncc), + shift_norm, + float(shift_x), + float(shift_y), + ) + + # Update rolling shift history + history = self.shift_histories.get(id, []) + history.append(shift_norm) + if len(history) > 5: + history = history[-5:] + self.shift_histories[id] = history + drift_sum = float(sum(history)) + + logger.debug( + "evaluate: id=%s history_len=%d last_shift=%.4f drift_sum=%.4f", + id, + len(history), + history[-1] if history else -1.0, + drift_sum, + ) + + # Early exit for clear stationary case + if ncc >= self.NCC_KEEP_THRESHOLD and shift_norm < self.SHIFT_KEEP_THRESHOLD: + self.changed_counts[id] = 0 + logger.debug( + "evaluate: id=%s early-stationary keep=True (ncc>=%.2f and shift<%.2f)", + id, + self.NCC_KEEP_THRESHOLD, + self.SHIFT_KEEP_THRESHOLD, + ) + return True + + # Check for movement indicators + movement_detected = ( + ncc < self.NCC_ACTIVE_THRESHOLD + or shift_norm >= self.SHIFT_ACTIVE_THRESHOLD + or drift_sum >= self.DRIFT_ACTIVE_THRESHOLD + ) + + if movement_detected: + cnt = self.changed_counts.get(id, 0) + 1 + self.changed_counts[id] = cnt + if ( + cnt >= self.CHANGED_FRAMES_TO_FLIP + or drift_sum >= self.DRIFT_ACTIVE_THRESHOLD + ): + logger.debug( + "evaluate: id=%s flip_to_active=True cnt=%d drift_sum=%.4f thresholds(changed>=%d drift>=%.2f)", + id, + cnt, + drift_sum, + self.CHANGED_FRAMES_TO_FLIP, + self.DRIFT_ACTIVE_THRESHOLD, + ) + return False + logger.debug( + "evaluate: id=%s movement_detected cnt=%d keep_until_cnt>=%d", + id, + cnt, + self.CHANGED_FRAMES_TO_FLIP, + ) + else: + self.changed_counts[id] = 0 + logger.debug("evaluate: id=%s no_movement keep=True", id) + + return True