From 5a49d1f73cbd5fbaebb00ddf4344e88c6a14ecd1 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Sun, 17 Aug 2025 11:27:42 -0600 Subject: [PATCH] Enable mypy for `track` and fix typing errors (#19529) * Enable mypy for track * WIP cleaning up tracked object * Fix tracked object typing * Fix typing and imports of centroid tracker * Cleanup typing * Cleanup * Formatting * Fix imports * Don't specify callable type * Type out json setting --- frigate/camera/state.py | 4 +- frigate/comms/events_updater.py | 4 +- frigate/comms/zmq_proxy.py | 12 +-- frigate/mypy.ini | 3 + frigate/ptz/autotrack.py | 4 +- frigate/track/__init__.py | 5 +- frigate/track/centroid_tracker.py | 50 +++++++------ frigate/track/norfair_tracker.py | 68 +++++++++-------- frigate/track/object_processing.py | 89 ++++++++++++---------- frigate/track/tracked_object.py | 115 ++++++++++++++++------------- 10 files changed, 194 insertions(+), 160 deletions(-) diff --git a/frigate/camera/state.py b/frigate/camera/state.py index a1041e95b..328c7bd23 100644 --- a/frigate/camera/state.py +++ b/frigate/camera/state.py @@ -54,7 +54,7 @@ class CameraState: self.ptz_autotracker_thread = ptz_autotracker_thread self.prev_enabled = self.camera_config.enabled - def get_current_frame(self, draw_options: dict[str, Any] = {}): + def get_current_frame(self, draw_options: dict[str, Any] = {}) -> np.ndarray: with self.current_frame_lock: frame_copy = np.copy(self._current_frame) frame_time = self.current_frame_time @@ -272,7 +272,7 @@ class CameraState: def finished(self, obj_id): del self.tracked_objects[obj_id] - def on(self, event_type: str, callback: Callable[[dict], None]): + def on(self, event_type: str, callback: Callable): self.callbacks[event_type].append(callback) def update( diff --git a/frigate/comms/events_updater.py b/frigate/comms/events_updater.py index f25f760ac..cfd958d2c 100644 --- a/frigate/comms/events_updater.py +++ b/frigate/comms/events_updater.py @@ -8,7 +8,7 @@ from .zmq_proxy import Publisher, Subscriber class EventUpdatePublisher( - Publisher[tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]]] + Publisher[tuple[EventTypeEnum, EventStateEnum, str | None, str, dict[str, Any]]] ): """Publishes events (objects, audio, manual).""" @@ -19,7 +19,7 @@ class EventUpdatePublisher( def publish( self, - payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]], + payload: tuple[EventTypeEnum, EventStateEnum, str | None, str, dict[str, Any]], sub_topic: str = "", ) -> None: super().publish(payload, sub_topic) diff --git a/frigate/comms/zmq_proxy.py b/frigate/comms/zmq_proxy.py index 72a61d814..29329ec59 100644 --- a/frigate/comms/zmq_proxy.py +++ b/frigate/comms/zmq_proxy.py @@ -2,7 +2,7 @@ import json import threading -from typing import Any, Generic, Optional, TypeVar +from typing import Generic, TypeVar import zmq @@ -70,7 +70,7 @@ class Publisher(Generic[T]): self.context.destroy() -class Subscriber: +class Subscriber(Generic[T]): """Receives messages.""" topic_base: str = "" @@ -82,9 +82,7 @@ class Subscriber: self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic) self.socket.connect(SOCKET_SUB) - def check_for_update( - self, timeout: float | None = FAST_QUEUE_TIMEOUT - ) -> tuple[str, Any] | tuple[None, None] | None: + def check_for_update(self, timeout: float | None = FAST_QUEUE_TIMEOUT) -> T | None: """Returns message or None if no update.""" try: has_update, _, _ = zmq.select([self.socket], [], [], timeout) @@ -101,7 +99,5 @@ class Subscriber: self.socket.close() self.context.destroy() - def _return_object( - self, topic: str, payload: Optional[tuple[str, Any]] - ) -> tuple[str, Any] | tuple[None, None] | None: + def _return_object(self, topic: str, payload: T | None) -> T | None: return payload diff --git a/frigate/mypy.ini b/frigate/mypy.ini index f99f31ac5..5bad10f49 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -53,6 +53,9 @@ ignore_errors = false [mypy-frigate.stats] ignore_errors = false +[mypy-frigate.track.*] +ignore_errors = false + [mypy-frigate.types] ignore_errors = false diff --git a/frigate/ptz/autotrack.py b/frigate/ptz/autotrack.py index f0d8419dd..beecc62ab 100644 --- a/frigate/ptz/autotrack.py +++ b/frigate/ptz/autotrack.py @@ -60,10 +60,10 @@ class PtzMotionEstimator: def motion_estimator( self, - detections: list[dict[str, Any]], + detections: list[tuple[Any, Any, Any, Any, Any, Any]], frame_name: str, frame_time: float, - camera: str, + camera: str | None, ): # If we've just started up or returned to our preset, reset motion estimator for new tracking session if self.ptz_metrics.reset.is_set(): diff --git a/frigate/track/__init__.py b/frigate/track/__init__.py index dc72be4f0..b5453aaeb 100644 --- a/frigate/track/__init__.py +++ b/frigate/track/__init__.py @@ -11,6 +11,9 @@ class ObjectTracker(ABC): @abstractmethod def match_and_update( - self, frame_name: str, frame_time: float, detections: list[dict[str, Any]] + self, + frame_name: str, + frame_time: float, + detections: list[tuple[Any, Any, Any, Any, Any, Any]], ) -> None: pass diff --git a/frigate/track/centroid_tracker.py b/frigate/track/centroid_tracker.py index 25d4cb860..56f20629c 100644 --- a/frigate/track/centroid_tracker.py +++ b/frigate/track/centroid_tracker.py @@ -1,25 +1,26 @@ import random import string from collections import defaultdict +from typing import Any import numpy as np from scipy.spatial import distance as dist from frigate.config import DetectConfig from frigate.track import ObjectTracker -from frigate.util import intersection_over_union +from frigate.util.image import intersection_over_union class CentroidTracker(ObjectTracker): def __init__(self, config: DetectConfig): - self.tracked_objects = {} - self.untracked_object_boxes = [] - self.disappeared = {} - self.positions = {} + self.tracked_objects: dict[str, dict[str, Any]] = {} + self.untracked_object_boxes: list[tuple[int, int, int, int]] = [] + self.disappeared: dict[str, Any] = {} + self.positions: dict[str, Any] = {} self.max_disappeared = config.max_disappeared self.detect_config = config - def register(self, index, obj): + def register(self, obj: dict[str, Any]) -> None: rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) id = f"{obj['frame_time']}-{rand_id}" obj["id"] = id @@ -39,13 +40,13 @@ class CentroidTracker(ObjectTracker): "ymax": self.detect_config.height, } - def deregister(self, id): + def deregister(self, id: str) -> None: del self.tracked_objects[id] del self.disappeared[id] # tracks the current position of the object based on the last N bounding boxes # returns False if the object has moved outside its previous position - def update_position(self, id, box): + def update_position(self, id: str, box: tuple[int, int, int, int]) -> bool: position = self.positions[id] position_box = ( position["xmin"], @@ -88,7 +89,7 @@ class CentroidTracker(ObjectTracker): return True - def is_expired(self, id): + def is_expired(self, id: str) -> bool: obj = self.tracked_objects[id] # get the max frames for this label type or the default max_frames = self.detect_config.stationary.max_frames.objects.get( @@ -108,7 +109,7 @@ class CentroidTracker(ObjectTracker): return False - def update(self, id, new_obj): + def update(self, id: str, new_obj: dict[str, Any]) -> None: self.disappeared[id] = 0 # update the motionless count if the object has not moved to a new position if self.update_position(id, new_obj["box"]): @@ -129,25 +130,30 @@ class CentroidTracker(ObjectTracker): self.tracked_objects[id].update(new_obj) - def update_frame_times(self, frame_name, frame_time): + def update_frame_times(self, frame_name: str, frame_time: float) -> None: for id in list(self.tracked_objects.keys()): self.tracked_objects[id]["frame_time"] = frame_time self.tracked_objects[id]["motionless_count"] += 1 if self.is_expired(id): self.deregister(id) - def match_and_update(self, frame_time, detections): + def match_and_update( + self, + frame_name: str, + frame_time: float, + detections: list[tuple[Any, Any, Any, Any, Any, Any]], + ) -> None: # group by name detection_groups = defaultdict(lambda: []) - for obj in detections: - detection_groups[obj[0]].append( + for det in detections: + detection_groups[det[0]].append( { - "label": obj[0], - "score": obj[1], - "box": obj[2], - "area": obj[3], - "ratio": obj[4], - "region": obj[5], + "label": det[0], + "score": det[1], + "box": det[2], + "area": det[3], + "ratio": det[4], + "region": det[5], "frame_time": frame_time, } ) @@ -180,7 +186,7 @@ class CentroidTracker(ObjectTracker): if len(current_objects) == 0: for index, obj in enumerate(group): - self.register(index, obj) + self.register(obj) continue new_centroids = np.array([o["centroid"] for o in group]) @@ -238,4 +244,4 @@ class CentroidTracker(ObjectTracker): # register each new input centroid as a trackable object else: for col in unusedCols: - self.register(col, group[col]) + self.register(group[col]) diff --git a/frigate/track/norfair_tracker.py b/frigate/track/norfair_tracker.py index 900971e0d..5bb15f94e 100644 --- a/frigate/track/norfair_tracker.py +++ b/frigate/track/norfair_tracker.py @@ -13,6 +13,7 @@ from norfair import ( draw_boxes, ) from norfair.drawing.drawer import Drawer +from norfair.tracker import TrackedObject from rich import print from rich.console import Console from rich.table import Table @@ -43,7 +44,7 @@ MAX_STATIONARY_HISTORY = 10 # - could be variable based on time since last_detection # - include estimated velocity in the distance (car driving by of a parked car) # - include some visual similarity factor in the distance for occlusions -def distance(detection: np.array, estimate: np.array) -> float: +def distance(detection: np.ndarray, estimate: np.ndarray) -> float: # ultimately, this should try and estimate distance in 3-dimensional space # consider change in location, width, and height @@ -73,14 +74,16 @@ def distance(detection: np.array, estimate: np.array) -> float: change = np.append(distance, np.array([width_ratio, height_ratio])) # calculate euclidean distance of the change vector - return np.linalg.norm(change) + return float(np.linalg.norm(change)) -def frigate_distance(detection: Detection, tracked_object) -> float: +def frigate_distance(detection: Detection, tracked_object: TrackedObject) -> float: return distance(detection.points, tracked_object.estimate) -def histogram_distance(matched_not_init_trackers, unmatched_trackers): +def histogram_distance( + matched_not_init_trackers: TrackedObject, unmatched_trackers: TrackedObject +) -> float: snd_embedding = unmatched_trackers.last_detection.embedding if snd_embedding is None: @@ -110,17 +113,17 @@ class NorfairTracker(ObjectTracker): ptz_metrics: PTZMetrics, ): self.frame_manager = SharedMemoryFrameManager() - self.tracked_objects = {} + self.tracked_objects: dict[str, dict[str, Any]] = {} self.untracked_object_boxes: list[list[int]] = [] - self.disappeared = {} - self.positions = {} - self.stationary_box_history: dict[str, list[list[int, int, int, int]]] = {} + self.disappeared: dict[str, int] = {} + self.positions: dict[str, dict[str, Any]] = {} + self.stationary_box_history: dict[str, list[list[int]]] = {} self.camera_config = config self.detect_config = config.detect self.ptz_metrics = ptz_metrics - self.ptz_motion_estimator = {} + self.ptz_motion_estimator: PtzMotionEstimator | None = None self.camera_name = config.name - self.track_id_map = {} + self.track_id_map: dict[str, str] = {} # Define tracker configurations for static camera self.object_type_configs = { @@ -169,7 +172,7 @@ class NorfairTracker(ObjectTracker): "distance_threshold": 3, } - self.trackers = {} + self.trackers: dict[str, dict[str, Tracker]] = {} # Handle static trackers for obj_type, tracker_config in self.object_type_configs.items(): if obj_type in self.camera_config.objects.track: @@ -216,7 +219,7 @@ class NorfairTracker(ObjectTracker): self.camera_config, self.ptz_metrics ) - def _create_tracker(self, obj_type, tracker_config): + def _create_tracker(self, obj_type: str, tracker_config: dict[str, Any]) -> Tracker: """Helper function to create a tracker with given configuration.""" tracker_params = { "distance_function": tracker_config["distance_function"], @@ -258,7 +261,7 @@ class NorfairTracker(ObjectTracker): return self.trackers[object_type][mode] return self.default_tracker[mode] - def register(self, track_id, obj): + def register(self, track_id: str, obj: dict[str, Any]) -> None: rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) id = f"{obj['frame_time']}-{rand_id}" self.track_id_map[track_id] = id @@ -297,7 +300,7 @@ class NorfairTracker(ObjectTracker): } self.stationary_box_history[id] = boxes - def deregister(self, id, track_id): + def deregister(self, id: str, track_id: str) -> None: obj = self.tracked_objects[id] del self.tracked_objects[id] @@ -321,7 +324,7 @@ class NorfairTracker(ObjectTracker): # tracks the current position of the object based on the last N bounding boxes # returns False if the object has moved outside its previous position - def update_position(self, id: str, box: list[int, int, int, int], stationary: bool): + def update_position(self, id: str, box: list[int], stationary: bool) -> bool: xmin, ymin, xmax, ymax = box position = self.positions[id] self.stationary_box_history[id].append(box) @@ -396,7 +399,7 @@ class NorfairTracker(ObjectTracker): return True - def is_expired(self, id): + def is_expired(self, id: str) -> bool: obj = self.tracked_objects[id] # get the max frames for this label type or the default max_frames = self.detect_config.stationary.max_frames.objects.get( @@ -416,7 +419,7 @@ class NorfairTracker(ObjectTracker): return False - def update(self, track_id, obj): + def update(self, track_id: str, obj: dict[str, Any]) -> None: id = self.track_id_map[track_id] self.disappeared[id] = 0 stationary = ( @@ -443,7 +446,7 @@ class NorfairTracker(ObjectTracker): self.tracked_objects[id].update(obj) - def update_frame_times(self, frame_name: str, frame_time: float): + def update_frame_times(self, frame_name: str, frame_time: float) -> None: # if the object was there in the last frame, assume it's still there detections = [ ( @@ -460,10 +463,13 @@ class NorfairTracker(ObjectTracker): self.match_and_update(frame_name, frame_time, detections=detections) def match_and_update( - self, frame_name: str, frame_time: float, detections: list[dict[str, Any]] - ): + self, + frame_name: str, + frame_time: float, + detections: list[tuple[Any, Any, Any, Any, Any, Any]], + ) -> None: # Group detections by object type - detections_by_type = {} + detections_by_type: dict[str, list[Detection]] = {} for obj in detections: label = obj[0] if label not in detections_by_type: @@ -551,17 +557,17 @@ class NorfairTracker(ObjectTracker): estimate = ( max(0, estimate[0]), max(0, estimate[1]), - min(self.detect_config.width - 1, estimate[2]), - min(self.detect_config.height - 1, estimate[3]), + min(self.detect_config.width - 1, estimate[2]), # type: ignore[operator] + min(self.detect_config.height - 1, estimate[3]), # type: ignore[operator] ) - obj = { + new_obj = { **t.last_detection.data, "estimate": estimate, "estimate_velocity": t.estimate_velocity, } active_ids.append(t.global_id) if t.global_id not in self.track_id_map: - self.register(t.global_id, obj) + self.register(t.global_id, new_obj) # if there wasn't a detection in this frame, increment disappeared elif t.last_detection.data["frame_time"] != frame_time: id = self.track_id_map[t.global_id] @@ -569,10 +575,10 @@ class NorfairTracker(ObjectTracker): # sometimes the estimate gets way off # only update if the upper left corner is actually upper left if estimate[0] < estimate[2] and estimate[1] < estimate[3]: - self.tracked_objects[id]["estimate"] = obj["estimate"] + self.tracked_objects[id]["estimate"] = new_obj["estimate"] # else update it else: - self.update(t.global_id, obj) + self.update(t.global_id, new_obj) # clear expired tracks expired_ids = [k for k in self.track_id_map.keys() if k not in active_ids] @@ -585,7 +591,7 @@ class NorfairTracker(ObjectTracker): o[2] for o in detections if o[2] not in tracked_object_boxes ] - def print_objects_as_table(self, tracked_objects: Sequence): + def print_objects_as_table(self, tracked_objects: Sequence) -> None: """Used for helping in debugging""" print() console = Console() @@ -605,13 +611,13 @@ class NorfairTracker(ObjectTracker): ) console.print(table) - def debug_draw(self, frame, frame_time): + def debug_draw(self, frame: np.ndarray, frame_time: float) -> None: # Collect all tracked objects from each tracker all_tracked_objects = [] # print a table to the console with norfair tracked object info if False: - if len(self.trackers["license_plate"]["static"].tracked_objects) > 0: + if len(self.trackers["license_plate"]["static"].tracked_objects) > 0: # type: ignore[unreachable] self.print_objects_as_table( self.trackers["license_plate"]["static"].tracked_objects ) @@ -662,7 +668,7 @@ class NorfairTracker(ObjectTracker): if False: # draw the current formatted time on the frame - from datetime import datetime + from datetime import datetime # type: ignore[unreachable] formatted_time = datetime.fromtimestamp(frame_time).strftime( "%m/%d/%Y %I:%M:%S %p" diff --git a/frigate/track/object_processing.py b/frigate/track/object_processing.py index 5ffd14446..25128d6df 100644 --- a/frigate/track/object_processing.py +++ b/frigate/track/object_processing.py @@ -6,6 +6,7 @@ import queue import threading from collections import defaultdict from enum import Enum +from multiprocessing import Queue as MpQueue from multiprocessing.synchronize import Event as MpEvent from typing import Any @@ -39,6 +40,7 @@ from frigate.const import ( ) from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.models import Event, ReviewSegment, Timeline +from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.track.tracked_object import TrackedObject from frigate.util.image import SharedMemoryFrameManager @@ -56,10 +58,10 @@ class TrackedObjectProcessor(threading.Thread): self, config: FrigateConfig, dispatcher: Dispatcher, - tracked_objects_queue, - ptz_autotracker_thread, - stop_event, - ): + tracked_objects_queue: MpQueue, + ptz_autotracker_thread: PtzAutoTrackerThread, + stop_event: MpEvent, + ) -> None: super().__init__(name="detected_frames_processor") self.config = config self.dispatcher = dispatcher @@ -98,8 +100,12 @@ class TrackedObjectProcessor(threading.Thread): # } # } # } - self.zone_data = defaultdict(lambda: defaultdict(dict)) - self.active_zone_data = defaultdict(lambda: defaultdict(dict)) + self.zone_data: dict[str, dict[str, Any]] = defaultdict( + lambda: defaultdict(dict) + ) + self.active_zone_data: dict[str, dict[str, Any]] = defaultdict( + lambda: defaultdict(dict) + ) for camera in self.config.cameras.keys(): self.create_camera_state(camera) @@ -107,7 +113,7 @@ class TrackedObjectProcessor(threading.Thread): def create_camera_state(self, camera: str) -> None: """Creates a new camera state.""" - def start(camera: str, obj: TrackedObject, frame_name: str): + def start(camera: str, obj: TrackedObject, frame_name: str) -> None: self.event_sender.publish( ( EventTypeEnum.tracked_object, @@ -118,7 +124,7 @@ class TrackedObjectProcessor(threading.Thread): ) ) - def update(camera: str, obj: TrackedObject, frame_name: str): + def update(camera: str, obj: TrackedObject, frame_name: str) -> None: obj.has_snapshot = self.should_save_snapshot(camera, obj) obj.has_clip = self.should_retain_recording(camera, obj) after = obj.to_dict() @@ -139,10 +145,10 @@ class TrackedObjectProcessor(threading.Thread): ) ) - def autotrack(camera: str, obj: TrackedObject, frame_name: str): + def autotrack(camera: str, obj: TrackedObject, frame_name: str) -> None: self.ptz_autotracker_thread.ptz_autotracker.autotrack_object(camera, obj) - def end(camera: str, obj: TrackedObject, frame_name: str): + def end(camera: str, obj: TrackedObject, frame_name: str) -> None: # populate has_snapshot obj.has_snapshot = self.should_save_snapshot(camera, obj) obj.has_clip = self.should_retain_recording(camera, obj) @@ -211,7 +217,7 @@ class TrackedObjectProcessor(threading.Thread): return False - def camera_activity(camera, activity): + def camera_activity(camera: str, activity: dict[str, Any]) -> None: last_activity = self.camera_activity.get(camera) if not last_activity or activity != last_activity: @@ -229,7 +235,7 @@ class TrackedObjectProcessor(threading.Thread): camera_state.on("camera_activity", camera_activity) self.camera_states[camera] = camera_state - def should_save_snapshot(self, camera, obj: TrackedObject): + def should_save_snapshot(self, camera: str, obj: TrackedObject) -> bool: if obj.false_positive: return False @@ -252,7 +258,7 @@ class TrackedObjectProcessor(threading.Thread): return True - def should_retain_recording(self, camera: str, obj: TrackedObject): + def should_retain_recording(self, camera: str, obj: TrackedObject) -> bool: if obj.false_positive: return False @@ -272,7 +278,7 @@ class TrackedObjectProcessor(threading.Thread): return True - def should_mqtt_snapshot(self, camera, obj: TrackedObject): + def should_mqtt_snapshot(self, camera: str, obj: TrackedObject) -> bool: # object never changed position if obj.is_stationary(): return False @@ -287,7 +293,9 @@ class TrackedObjectProcessor(threading.Thread): return True - def update_mqtt_motion(self, camera, frame_time, motion_boxes): + def update_mqtt_motion( + self, camera: str, frame_time: float, motion_boxes: list + ) -> None: # publish if motion is currently being detected if motion_boxes: # only send ON if motion isn't already active @@ -313,11 +321,15 @@ class TrackedObjectProcessor(threading.Thread): # reset the last_motion so redundant `off` commands aren't sent self.last_motion_detected[camera] = 0 - def get_best(self, camera, label): + def get_best(self, camera: str, label: str) -> dict[str, Any]: # TODO: need a lock here camera_state = self.camera_states[camera] if label in camera_state.best_objects: best_obj = camera_state.best_objects[label] + + if not best_obj.thumbnail_data: + return {} + best = best_obj.thumbnail_data.copy() best["frame"] = camera_state.frame_cache.get( best_obj.thumbnail_data["frame_time"] @@ -340,7 +352,7 @@ class TrackedObjectProcessor(threading.Thread): return self.camera_states[camera].get_current_frame(draw_options) - def get_current_frame_time(self, camera) -> int: + def get_current_frame_time(self, camera: str) -> float: """Returns the latest frame time for a given camera.""" return self.camera_states[camera].current_frame_time @@ -348,7 +360,7 @@ class TrackedObjectProcessor(threading.Thread): self, event_id: str, sub_label: str | None, score: float | None ) -> None: """Update sub label for given event id.""" - tracked_obj: TrackedObject = None + tracked_obj: TrackedObject | None = None for state in self.camera_states.values(): tracked_obj = state.tracked_objects.get(event_id) @@ -357,7 +369,7 @@ class TrackedObjectProcessor(threading.Thread): break try: - event: Event = Event.get(Event.id == event_id) + event: Event | None = Event.get(Event.id == event_id) except DoesNotExist: event = None @@ -368,12 +380,12 @@ class TrackedObjectProcessor(threading.Thread): tracked_obj.obj_data["sub_label"] = (sub_label, score) if event: - event.sub_label = sub_label + event.sub_label = sub_label # type: ignore[assignment] data = event.data if sub_label is None: - data["sub_label_score"] = None + data["sub_label_score"] = None # type: ignore[index] elif score is not None: - data["sub_label_score"] = score + data["sub_label_score"] = score # type: ignore[index] event.data = data event.save() @@ -402,7 +414,7 @@ class TrackedObjectProcessor(threading.Thread): objects_list = [] sub_labels = set() events = Event.select(Event.id, Event.label, Event.sub_label).where( - Event.id.in_(detection_ids) + Event.id.in_(detection_ids) # type: ignore[call-arg, misc] ) for det_event in events: if det_event.sub_label: @@ -431,13 +443,11 @@ class TrackedObjectProcessor(threading.Thread): f"Updated sub_label for event {event_id} in review segment {review_segment.id}" ) - except ReviewSegment.DoesNotExist: + except DoesNotExist: logger.debug( f"No review segment found with event ID {event_id} when updating sub_label" ) - return True - def set_object_attribute( self, event_id: str, @@ -446,7 +456,7 @@ class TrackedObjectProcessor(threading.Thread): score: float | None, ) -> None: """Update attribute for given event id.""" - tracked_obj: TrackedObject = None + tracked_obj: TrackedObject | None = None for state in self.camera_states.values(): tracked_obj = state.tracked_objects.get(event_id) @@ -455,7 +465,7 @@ class TrackedObjectProcessor(threading.Thread): break try: - event: Event = Event.get(Event.id == event_id) + event: Event | None = Event.get(Event.id == event_id) except DoesNotExist: event = None @@ -470,16 +480,14 @@ class TrackedObjectProcessor(threading.Thread): if event: data = event.data - data[field_name] = field_value + data[field_name] = field_value # type: ignore[index] if field_value is None: - data[f"{field_name}_score"] = None + data[f"{field_name}_score"] = None # type: ignore[index] elif score is not None: - data[f"{field_name}_score"] = score + data[f"{field_name}_score"] = score # type: ignore[index] event.data = data event.save() - return True - def save_lpr_snapshot(self, payload: tuple) -> None: # save the snapshot image (frame, event_id, camera) = payload @@ -638,7 +646,7 @@ class TrackedObjectProcessor(threading.Thread): ) self.ongoing_manual_events.pop(event_id) - def force_end_all_events(self, camera: str, camera_state: CameraState): + def force_end_all_events(self, camera: str, camera_state: CameraState) -> None: """Ends all active events on camera when disabling.""" last_frame_name = camera_state.previous_frame_id for obj_id, obj in list(camera_state.tracked_objects.items()): @@ -656,7 +664,7 @@ class TrackedObjectProcessor(threading.Thread): {"enabled": False, "motion": 0, "objects": []}, ) - def run(self): + def run(self) -> None: while not self.stop_event.is_set(): # check for config updates updated_topics = self.camera_config_subscriber.check_for_updates() @@ -698,11 +706,14 @@ class TrackedObjectProcessor(threading.Thread): # check for sub label updates while True: - (raw_topic, payload) = self.sub_label_subscriber.check_for_update( - timeout=0 - ) + update = self.sub_label_subscriber.check_for_update(timeout=0) - if not raw_topic: + if not update: + break + + (raw_topic, payload) = update + + if not raw_topic or not payload: break topic = str(raw_topic) diff --git a/frigate/track/tracked_object.py b/frigate/track/tracked_object.py index afe85228e..111dd2c40 100644 --- a/frigate/track/tracked_object.py +++ b/frigate/track/tracked_object.py @@ -5,18 +5,19 @@ import math import os from collections import defaultdict from statistics import median -from typing import Any, Optional +from typing import Any, Optional, cast import cv2 import numpy as np from frigate.config import ( CameraConfig, - ModelConfig, + FilterConfig, SnapshotsConfig, UIConfig, ) from frigate.const import CLIPS_DIR, THUMB_DIR +from frigate.detectors.detector_config import ModelConfig from frigate.review.types import SeverityEnum from frigate.util.builtin import sanitize_float from frigate.util.image import ( @@ -46,11 +47,11 @@ class TrackedObject: model_config: ModelConfig, camera_config: CameraConfig, ui_config: UIConfig, - frame_cache, + frame_cache: dict[float, dict[str, Any]], obj_data: dict[str, Any], - ): + ) -> None: # set the score history then remove as it is not part of object state - self.score_history = obj_data["score_history"] + self.score_history: list[float] = obj_data["score_history"] del obj_data["score_history"] self.obj_data = obj_data @@ -61,24 +62,24 @@ class TrackedObject: self.frame_cache = frame_cache self.zone_presence: dict[str, int] = {} self.zone_loitering: dict[str, int] = {} - self.current_zones = [] - self.entered_zones = [] - self.attributes = defaultdict(float) + self.current_zones: list[str] = [] + self.entered_zones: list[str] = [] + self.attributes: dict[str, float] = defaultdict(float) self.false_positive = True self.has_clip = False self.has_snapshot = False self.top_score = self.computed_score = 0.0 - self.thumbnail_data = None + self.thumbnail_data: dict[str, Any] | None = None self.last_updated = 0 self.last_published = 0 self.frame = None self.active = True self.pending_loitering = False - self.speed_history = [] - self.current_estimated_speed = 0 - self.average_estimated_speed = 0 + self.speed_history: list[float] = [] + self.current_estimated_speed: float = 0 + self.average_estimated_speed: float = 0 self.velocity_angle = 0 - self.path_data = [] + self.path_data: list[tuple[Any, float]] = [] self.previous = self.to_dict() @property @@ -111,7 +112,7 @@ class TrackedObject: return None - def _is_false_positive(self): + def _is_false_positive(self) -> bool: # once a true positive, always a true positive if not self.false_positive: return False @@ -119,11 +120,13 @@ class TrackedObject: threshold = self.camera_config.objects.filters[self.obj_data["label"]].threshold return self.computed_score < threshold - def compute_score(self): + def compute_score(self) -> float: """get median of scores for object.""" return median(self.score_history) - def update(self, current_frame_time: float, obj_data, has_valid_frame: bool): + def update( + self, current_frame_time: float, obj_data: dict[str, Any], has_valid_frame: bool + ) -> tuple[bool, bool, bool, bool]: thumb_update = False significant_change = False path_update = False @@ -305,7 +308,7 @@ class TrackedObject: k: self.attributes[k] for k in self.logos if k in self.attributes } if len(recognized_logos) > 0: - max_logo = max(recognized_logos, key=recognized_logos.get) + max_logo = max(recognized_logos, key=recognized_logos.get) # type: ignore[arg-type] # don't overwrite sub label if it is already set if ( @@ -342,28 +345,30 @@ class TrackedObject: # update path width = self.camera_config.detect.width height = self.camera_config.detect.height - bottom_center = ( - round(obj_data["centroid"][0] / width, 4), - round(obj_data["box"][3] / height, 4), - ) - # calculate a reasonable movement threshold (e.g., 5% of the frame diagonal) - threshold = 0.05 * math.sqrt(width**2 + height**2) / max(width, height) - - if not self.path_data: - self.path_data.append((bottom_center, obj_data["frame_time"])) - path_update = True - elif ( - math.dist(self.path_data[-1][0], bottom_center) >= threshold - or len(self.path_data) == 1 - ): - # check Euclidean distance before appending - self.path_data.append((bottom_center, obj_data["frame_time"])) - path_update = True - logger.debug( - f"Point tracking: {obj_data['id']}, {bottom_center}, {obj_data['frame_time']}" + if width is not None and height is not None: + bottom_center = ( + round(obj_data["centroid"][0] / width, 4), + round(obj_data["box"][3] / height, 4), ) + # calculate a reasonable movement threshold (e.g., 5% of the frame diagonal) + threshold = 0.05 * math.sqrt(width**2 + height**2) / max(width, height) + + if not self.path_data: + self.path_data.append((bottom_center, obj_data["frame_time"])) + path_update = True + elif ( + math.dist(self.path_data[-1][0], bottom_center) >= threshold + or len(self.path_data) == 1 + ): + # check Euclidean distance before appending + self.path_data.append((bottom_center, obj_data["frame_time"])) + path_update = True + logger.debug( + f"Point tracking: {obj_data['id']}, {bottom_center}, {obj_data['frame_time']}" + ) + self.obj_data.update(obj_data) self.current_zones = current_zones logger.debug( @@ -371,7 +376,7 @@ class TrackedObject: ) return (thumb_update, significant_change, path_update, autotracker_update) - def to_dict(self): + def to_dict(self) -> dict[str, Any]: event = { "id": self.obj_data["id"], "camera": self.camera_config.name, @@ -413,10 +418,8 @@ class TrackedObject: return not self.is_stationary() def is_stationary(self) -> bool: - return ( - self.obj_data["motionless_count"] - > self.camera_config.detect.stationary.threshold - ) + count = cast(int | float, self.obj_data["motionless_count"]) + return count > (self.camera_config.detect.stationary.threshold or 50) def get_thumbnail(self, ext: str) -> bytes | None: img_bytes = self.get_img_bytes( @@ -453,9 +456,9 @@ class TrackedObject: def get_img_bytes( self, ext: str, - timestamp=False, - bounding_box=False, - crop=False, + timestamp: bool = False, + bounding_box: bool = False, + crop: bool = False, height: int | None = None, quality: int | None = None, ) -> bytes | None: @@ -532,18 +535,18 @@ class TrackedObject: best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA ) if timestamp: - color = self.camera_config.timestamp_style.color + colors = self.camera_config.timestamp_style.color draw_timestamp( best_frame, self.thumbnail_data["frame_time"], self.camera_config.timestamp_style.format, font_effect=self.camera_config.timestamp_style.effect, font_thickness=self.camera_config.timestamp_style.thickness, - font_color=(color.blue, color.green, color.red), + font_color=(colors.blue, colors.green, colors.red), position=self.camera_config.timestamp_style.position, ) - quality_params = None + quality_params = [] if ext == "jpg": quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70] @@ -596,6 +599,9 @@ class TrackedObject: p.write(png_bytes) def write_thumbnail_to_disk(self) -> None: + if not self.camera_config.name: + return + directory = os.path.join(THUMB_DIR, self.camera_config.name) if not os.path.exists(directory): @@ -603,11 +609,14 @@ class TrackedObject: thumb_bytes = self.get_thumbnail("webp") - with open(os.path.join(directory, f"{self.obj_data['id']}.webp"), "wb") as f: - f.write(thumb_bytes) + if thumb_bytes: + with open( + os.path.join(directory, f"{self.obj_data['id']}.webp"), "wb" + ) as f: + f.write(thumb_bytes) -def zone_filtered(obj: TrackedObject, object_config): +def zone_filtered(obj: TrackedObject, object_config: dict[str, FilterConfig]) -> bool: object_name = obj.obj_data["label"] if object_name in object_config: @@ -657,9 +666,9 @@ class TrackedObjectAttribute: def find_best_object(self, objects: list[dict[str, Any]]) -> Optional[str]: """Find the best attribute for each object and return its ID.""" - best_object_area = None - best_object_id = None - best_object_label = None + best_object_area: float | None = None + best_object_id: str | None = None + best_object_label: str | None = None for obj in objects: if not box_inside(obj["box"], self.box):