Enable mypy for track and fix typing errors (#19529)

* Enable mypy for track

* WIP cleaning up tracked object

* Fix tracked object typing

* Fix typing and imports of centroid tracker

* Cleanup typing

* Cleanup

* Formatting

* Fix imports

* Don't specify callable type

* Type out json setting
This commit is contained in:
Nicolas Mowen 2025-08-17 11:27:42 -06:00 committed by GitHub
parent 856aab8e6e
commit 5a49d1f73c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 194 additions and 160 deletions

View File

@ -54,7 +54,7 @@ class CameraState:
self.ptz_autotracker_thread = ptz_autotracker_thread
self.prev_enabled = self.camera_config.enabled
def get_current_frame(self, draw_options: dict[str, Any] = {}):
def get_current_frame(self, draw_options: dict[str, Any] = {}) -> np.ndarray:
with self.current_frame_lock:
frame_copy = np.copy(self._current_frame)
frame_time = self.current_frame_time
@ -272,7 +272,7 @@ class CameraState:
def finished(self, obj_id):
del self.tracked_objects[obj_id]
def on(self, event_type: str, callback: Callable[[dict], None]):
def on(self, event_type: str, callback: Callable):
self.callbacks[event_type].append(callback)
def update(

View File

@ -8,7 +8,7 @@ from .zmq_proxy import Publisher, Subscriber
class EventUpdatePublisher(
Publisher[tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]]]
Publisher[tuple[EventTypeEnum, EventStateEnum, str | None, str, dict[str, Any]]]
):
"""Publishes events (objects, audio, manual)."""
@ -19,7 +19,7 @@ class EventUpdatePublisher(
def publish(
self,
payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]],
payload: tuple[EventTypeEnum, EventStateEnum, str | None, str, dict[str, Any]],
sub_topic: str = "",
) -> None:
super().publish(payload, sub_topic)

View File

@ -2,7 +2,7 @@
import json
import threading
from typing import Any, Generic, Optional, TypeVar
from typing import Generic, TypeVar
import zmq
@ -70,7 +70,7 @@ class Publisher(Generic[T]):
self.context.destroy()
class Subscriber:
class Subscriber(Generic[T]):
"""Receives messages."""
topic_base: str = ""
@ -82,9 +82,7 @@ class Subscriber:
self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic)
self.socket.connect(SOCKET_SUB)
def check_for_update(
self, timeout: float | None = FAST_QUEUE_TIMEOUT
) -> tuple[str, Any] | tuple[None, None] | None:
def check_for_update(self, timeout: float | None = FAST_QUEUE_TIMEOUT) -> T | None:
"""Returns message or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
@ -101,7 +99,5 @@ class Subscriber:
self.socket.close()
self.context.destroy()
def _return_object(
self, topic: str, payload: Optional[tuple[str, Any]]
) -> tuple[str, Any] | tuple[None, None] | None:
def _return_object(self, topic: str, payload: T | None) -> T | None:
return payload

View File

@ -53,6 +53,9 @@ ignore_errors = false
[mypy-frigate.stats]
ignore_errors = false
[mypy-frigate.track.*]
ignore_errors = false
[mypy-frigate.types]
ignore_errors = false

View File

@ -60,10 +60,10 @@ class PtzMotionEstimator:
def motion_estimator(
self,
detections: list[dict[str, Any]],
detections: list[tuple[Any, Any, Any, Any, Any, Any]],
frame_name: str,
frame_time: float,
camera: str,
camera: str | None,
):
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
if self.ptz_metrics.reset.is_set():

View File

@ -11,6 +11,9 @@ class ObjectTracker(ABC):
@abstractmethod
def match_and_update(
self, frame_name: str, frame_time: float, detections: list[dict[str, Any]]
self,
frame_name: str,
frame_time: float,
detections: list[tuple[Any, Any, Any, Any, Any, Any]],
) -> None:
pass

View File

@ -1,25 +1,26 @@
import random
import string
from collections import defaultdict
from typing import Any
import numpy as np
from scipy.spatial import distance as dist
from frigate.config import DetectConfig
from frigate.track import ObjectTracker
from frigate.util import intersection_over_union
from frigate.util.image import intersection_over_union
class CentroidTracker(ObjectTracker):
def __init__(self, config: DetectConfig):
self.tracked_objects = {}
self.untracked_object_boxes = []
self.disappeared = {}
self.positions = {}
self.tracked_objects: dict[str, dict[str, Any]] = {}
self.untracked_object_boxes: list[tuple[int, int, int, int]] = []
self.disappeared: dict[str, Any] = {}
self.positions: dict[str, Any] = {}
self.max_disappeared = config.max_disappeared
self.detect_config = config
def register(self, index, obj):
def register(self, obj: dict[str, Any]) -> None:
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
id = f"{obj['frame_time']}-{rand_id}"
obj["id"] = id
@ -39,13 +40,13 @@ class CentroidTracker(ObjectTracker):
"ymax": self.detect_config.height,
}
def deregister(self, id):
def deregister(self, id: str) -> None:
del self.tracked_objects[id]
del self.disappeared[id]
# tracks the current position of the object based on the last N bounding boxes
# returns False if the object has moved outside its previous position
def update_position(self, id, box):
def update_position(self, id: str, box: tuple[int, int, int, int]) -> bool:
position = self.positions[id]
position_box = (
position["xmin"],
@ -88,7 +89,7 @@ class CentroidTracker(ObjectTracker):
return True
def is_expired(self, id):
def is_expired(self, id: str) -> bool:
obj = self.tracked_objects[id]
# get the max frames for this label type or the default
max_frames = self.detect_config.stationary.max_frames.objects.get(
@ -108,7 +109,7 @@ class CentroidTracker(ObjectTracker):
return False
def update(self, id, new_obj):
def update(self, id: str, new_obj: dict[str, Any]) -> None:
self.disappeared[id] = 0
# update the motionless count if the object has not moved to a new position
if self.update_position(id, new_obj["box"]):
@ -129,25 +130,30 @@ class CentroidTracker(ObjectTracker):
self.tracked_objects[id].update(new_obj)
def update_frame_times(self, frame_name, frame_time):
def update_frame_times(self, frame_name: str, frame_time: float) -> None:
for id in list(self.tracked_objects.keys()):
self.tracked_objects[id]["frame_time"] = frame_time
self.tracked_objects[id]["motionless_count"] += 1
if self.is_expired(id):
self.deregister(id)
def match_and_update(self, frame_time, detections):
def match_and_update(
self,
frame_name: str,
frame_time: float,
detections: list[tuple[Any, Any, Any, Any, Any, Any]],
) -> None:
# group by name
detection_groups = defaultdict(lambda: [])
for obj in detections:
detection_groups[obj[0]].append(
for det in detections:
detection_groups[det[0]].append(
{
"label": obj[0],
"score": obj[1],
"box": obj[2],
"area": obj[3],
"ratio": obj[4],
"region": obj[5],
"label": det[0],
"score": det[1],
"box": det[2],
"area": det[3],
"ratio": det[4],
"region": det[5],
"frame_time": frame_time,
}
)
@ -180,7 +186,7 @@ class CentroidTracker(ObjectTracker):
if len(current_objects) == 0:
for index, obj in enumerate(group):
self.register(index, obj)
self.register(obj)
continue
new_centroids = np.array([o["centroid"] for o in group])
@ -238,4 +244,4 @@ class CentroidTracker(ObjectTracker):
# register each new input centroid as a trackable object
else:
for col in unusedCols:
self.register(col, group[col])
self.register(group[col])

View File

@ -13,6 +13,7 @@ from norfair import (
draw_boxes,
)
from norfair.drawing.drawer import Drawer
from norfair.tracker import TrackedObject
from rich import print
from rich.console import Console
from rich.table import Table
@ -43,7 +44,7 @@ MAX_STATIONARY_HISTORY = 10
# - could be variable based on time since last_detection
# - include estimated velocity in the distance (car driving by of a parked car)
# - include some visual similarity factor in the distance for occlusions
def distance(detection: np.array, estimate: np.array) -> float:
def distance(detection: np.ndarray, estimate: np.ndarray) -> float:
# ultimately, this should try and estimate distance in 3-dimensional space
# consider change in location, width, and height
@ -73,14 +74,16 @@ def distance(detection: np.array, estimate: np.array) -> float:
change = np.append(distance, np.array([width_ratio, height_ratio]))
# calculate euclidean distance of the change vector
return np.linalg.norm(change)
return float(np.linalg.norm(change))
def frigate_distance(detection: Detection, tracked_object) -> float:
def frigate_distance(detection: Detection, tracked_object: TrackedObject) -> float:
return distance(detection.points, tracked_object.estimate)
def histogram_distance(matched_not_init_trackers, unmatched_trackers):
def histogram_distance(
matched_not_init_trackers: TrackedObject, unmatched_trackers: TrackedObject
) -> float:
snd_embedding = unmatched_trackers.last_detection.embedding
if snd_embedding is None:
@ -110,17 +113,17 @@ class NorfairTracker(ObjectTracker):
ptz_metrics: PTZMetrics,
):
self.frame_manager = SharedMemoryFrameManager()
self.tracked_objects = {}
self.tracked_objects: dict[str, dict[str, Any]] = {}
self.untracked_object_boxes: list[list[int]] = []
self.disappeared = {}
self.positions = {}
self.stationary_box_history: dict[str, list[list[int, int, int, int]]] = {}
self.disappeared: dict[str, int] = {}
self.positions: dict[str, dict[str, Any]] = {}
self.stationary_box_history: dict[str, list[list[int]]] = {}
self.camera_config = config
self.detect_config = config.detect
self.ptz_metrics = ptz_metrics
self.ptz_motion_estimator = {}
self.ptz_motion_estimator: PtzMotionEstimator | None = None
self.camera_name = config.name
self.track_id_map = {}
self.track_id_map: dict[str, str] = {}
# Define tracker configurations for static camera
self.object_type_configs = {
@ -169,7 +172,7 @@ class NorfairTracker(ObjectTracker):
"distance_threshold": 3,
}
self.trackers = {}
self.trackers: dict[str, dict[str, Tracker]] = {}
# Handle static trackers
for obj_type, tracker_config in self.object_type_configs.items():
if obj_type in self.camera_config.objects.track:
@ -216,7 +219,7 @@ class NorfairTracker(ObjectTracker):
self.camera_config, self.ptz_metrics
)
def _create_tracker(self, obj_type, tracker_config):
def _create_tracker(self, obj_type: str, tracker_config: dict[str, Any]) -> Tracker:
"""Helper function to create a tracker with given configuration."""
tracker_params = {
"distance_function": tracker_config["distance_function"],
@ -258,7 +261,7 @@ class NorfairTracker(ObjectTracker):
return self.trackers[object_type][mode]
return self.default_tracker[mode]
def register(self, track_id, obj):
def register(self, track_id: str, obj: dict[str, Any]) -> None:
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
id = f"{obj['frame_time']}-{rand_id}"
self.track_id_map[track_id] = id
@ -297,7 +300,7 @@ class NorfairTracker(ObjectTracker):
}
self.stationary_box_history[id] = boxes
def deregister(self, id, track_id):
def deregister(self, id: str, track_id: str) -> None:
obj = self.tracked_objects[id]
del self.tracked_objects[id]
@ -321,7 +324,7 @@ class NorfairTracker(ObjectTracker):
# tracks the current position of the object based on the last N bounding boxes
# returns False if the object has moved outside its previous position
def update_position(self, id: str, box: list[int, int, int, int], stationary: bool):
def update_position(self, id: str, box: list[int], stationary: bool) -> bool:
xmin, ymin, xmax, ymax = box
position = self.positions[id]
self.stationary_box_history[id].append(box)
@ -396,7 +399,7 @@ class NorfairTracker(ObjectTracker):
return True
def is_expired(self, id):
def is_expired(self, id: str) -> bool:
obj = self.tracked_objects[id]
# get the max frames for this label type or the default
max_frames = self.detect_config.stationary.max_frames.objects.get(
@ -416,7 +419,7 @@ class NorfairTracker(ObjectTracker):
return False
def update(self, track_id, obj):
def update(self, track_id: str, obj: dict[str, Any]) -> None:
id = self.track_id_map[track_id]
self.disappeared[id] = 0
stationary = (
@ -443,7 +446,7 @@ class NorfairTracker(ObjectTracker):
self.tracked_objects[id].update(obj)
def update_frame_times(self, frame_name: str, frame_time: float):
def update_frame_times(self, frame_name: str, frame_time: float) -> None:
# if the object was there in the last frame, assume it's still there
detections = [
(
@ -460,10 +463,13 @@ class NorfairTracker(ObjectTracker):
self.match_and_update(frame_name, frame_time, detections=detections)
def match_and_update(
self, frame_name: str, frame_time: float, detections: list[dict[str, Any]]
):
self,
frame_name: str,
frame_time: float,
detections: list[tuple[Any, Any, Any, Any, Any, Any]],
) -> None:
# Group detections by object type
detections_by_type = {}
detections_by_type: dict[str, list[Detection]] = {}
for obj in detections:
label = obj[0]
if label not in detections_by_type:
@ -551,17 +557,17 @@ class NorfairTracker(ObjectTracker):
estimate = (
max(0, estimate[0]),
max(0, estimate[1]),
min(self.detect_config.width - 1, estimate[2]),
min(self.detect_config.height - 1, estimate[3]),
min(self.detect_config.width - 1, estimate[2]), # type: ignore[operator]
min(self.detect_config.height - 1, estimate[3]), # type: ignore[operator]
)
obj = {
new_obj = {
**t.last_detection.data,
"estimate": estimate,
"estimate_velocity": t.estimate_velocity,
}
active_ids.append(t.global_id)
if t.global_id not in self.track_id_map:
self.register(t.global_id, obj)
self.register(t.global_id, new_obj)
# if there wasn't a detection in this frame, increment disappeared
elif t.last_detection.data["frame_time"] != frame_time:
id = self.track_id_map[t.global_id]
@ -569,10 +575,10 @@ class NorfairTracker(ObjectTracker):
# sometimes the estimate gets way off
# only update if the upper left corner is actually upper left
if estimate[0] < estimate[2] and estimate[1] < estimate[3]:
self.tracked_objects[id]["estimate"] = obj["estimate"]
self.tracked_objects[id]["estimate"] = new_obj["estimate"]
# else update it
else:
self.update(t.global_id, obj)
self.update(t.global_id, new_obj)
# clear expired tracks
expired_ids = [k for k in self.track_id_map.keys() if k not in active_ids]
@ -585,7 +591,7 @@ class NorfairTracker(ObjectTracker):
o[2] for o in detections if o[2] not in tracked_object_boxes
]
def print_objects_as_table(self, tracked_objects: Sequence):
def print_objects_as_table(self, tracked_objects: Sequence) -> None:
"""Used for helping in debugging"""
print()
console = Console()
@ -605,13 +611,13 @@ class NorfairTracker(ObjectTracker):
)
console.print(table)
def debug_draw(self, frame, frame_time):
def debug_draw(self, frame: np.ndarray, frame_time: float) -> None:
# Collect all tracked objects from each tracker
all_tracked_objects = []
# print a table to the console with norfair tracked object info
if False:
if len(self.trackers["license_plate"]["static"].tracked_objects) > 0:
if len(self.trackers["license_plate"]["static"].tracked_objects) > 0: # type: ignore[unreachable]
self.print_objects_as_table(
self.trackers["license_plate"]["static"].tracked_objects
)
@ -662,7 +668,7 @@ class NorfairTracker(ObjectTracker):
if False:
# draw the current formatted time on the frame
from datetime import datetime
from datetime import datetime # type: ignore[unreachable]
formatted_time = datetime.fromtimestamp(frame_time).strftime(
"%m/%d/%Y %I:%M:%S %p"

View File

@ -6,6 +6,7 @@ import queue
import threading
from collections import defaultdict
from enum import Enum
from multiprocessing import Queue as MpQueue
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
@ -39,6 +40,7 @@ from frigate.const import (
)
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event, ReviewSegment, Timeline
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import SharedMemoryFrameManager
@ -56,10 +58,10 @@ class TrackedObjectProcessor(threading.Thread):
self,
config: FrigateConfig,
dispatcher: Dispatcher,
tracked_objects_queue,
ptz_autotracker_thread,
stop_event,
):
tracked_objects_queue: MpQueue,
ptz_autotracker_thread: PtzAutoTrackerThread,
stop_event: MpEvent,
) -> None:
super().__init__(name="detected_frames_processor")
self.config = config
self.dispatcher = dispatcher
@ -98,8 +100,12 @@ class TrackedObjectProcessor(threading.Thread):
# }
# }
# }
self.zone_data = defaultdict(lambda: defaultdict(dict))
self.active_zone_data = defaultdict(lambda: defaultdict(dict))
self.zone_data: dict[str, dict[str, Any]] = defaultdict(
lambda: defaultdict(dict)
)
self.active_zone_data: dict[str, dict[str, Any]] = defaultdict(
lambda: defaultdict(dict)
)
for camera in self.config.cameras.keys():
self.create_camera_state(camera)
@ -107,7 +113,7 @@ class TrackedObjectProcessor(threading.Thread):
def create_camera_state(self, camera: str) -> None:
"""Creates a new camera state."""
def start(camera: str, obj: TrackedObject, frame_name: str):
def start(camera: str, obj: TrackedObject, frame_name: str) -> None:
self.event_sender.publish(
(
EventTypeEnum.tracked_object,
@ -118,7 +124,7 @@ class TrackedObjectProcessor(threading.Thread):
)
)
def update(camera: str, obj: TrackedObject, frame_name: str):
def update(camera: str, obj: TrackedObject, frame_name: str) -> None:
obj.has_snapshot = self.should_save_snapshot(camera, obj)
obj.has_clip = self.should_retain_recording(camera, obj)
after = obj.to_dict()
@ -139,10 +145,10 @@ class TrackedObjectProcessor(threading.Thread):
)
)
def autotrack(camera: str, obj: TrackedObject, frame_name: str):
def autotrack(camera: str, obj: TrackedObject, frame_name: str) -> None:
self.ptz_autotracker_thread.ptz_autotracker.autotrack_object(camera, obj)
def end(camera: str, obj: TrackedObject, frame_name: str):
def end(camera: str, obj: TrackedObject, frame_name: str) -> None:
# populate has_snapshot
obj.has_snapshot = self.should_save_snapshot(camera, obj)
obj.has_clip = self.should_retain_recording(camera, obj)
@ -211,7 +217,7 @@ class TrackedObjectProcessor(threading.Thread):
return False
def camera_activity(camera, activity):
def camera_activity(camera: str, activity: dict[str, Any]) -> None:
last_activity = self.camera_activity.get(camera)
if not last_activity or activity != last_activity:
@ -229,7 +235,7 @@ class TrackedObjectProcessor(threading.Thread):
camera_state.on("camera_activity", camera_activity)
self.camera_states[camera] = camera_state
def should_save_snapshot(self, camera, obj: TrackedObject):
def should_save_snapshot(self, camera: str, obj: TrackedObject) -> bool:
if obj.false_positive:
return False
@ -252,7 +258,7 @@ class TrackedObjectProcessor(threading.Thread):
return True
def should_retain_recording(self, camera: str, obj: TrackedObject):
def should_retain_recording(self, camera: str, obj: TrackedObject) -> bool:
if obj.false_positive:
return False
@ -272,7 +278,7 @@ class TrackedObjectProcessor(threading.Thread):
return True
def should_mqtt_snapshot(self, camera, obj: TrackedObject):
def should_mqtt_snapshot(self, camera: str, obj: TrackedObject) -> bool:
# object never changed position
if obj.is_stationary():
return False
@ -287,7 +293,9 @@ class TrackedObjectProcessor(threading.Thread):
return True
def update_mqtt_motion(self, camera, frame_time, motion_boxes):
def update_mqtt_motion(
self, camera: str, frame_time: float, motion_boxes: list
) -> None:
# publish if motion is currently being detected
if motion_boxes:
# only send ON if motion isn't already active
@ -313,11 +321,15 @@ class TrackedObjectProcessor(threading.Thread):
# reset the last_motion so redundant `off` commands aren't sent
self.last_motion_detected[camera] = 0
def get_best(self, camera, label):
def get_best(self, camera: str, label: str) -> dict[str, Any]:
# TODO: need a lock here
camera_state = self.camera_states[camera]
if label in camera_state.best_objects:
best_obj = camera_state.best_objects[label]
if not best_obj.thumbnail_data:
return {}
best = best_obj.thumbnail_data.copy()
best["frame"] = camera_state.frame_cache.get(
best_obj.thumbnail_data["frame_time"]
@ -340,7 +352,7 @@ class TrackedObjectProcessor(threading.Thread):
return self.camera_states[camera].get_current_frame(draw_options)
def get_current_frame_time(self, camera) -> int:
def get_current_frame_time(self, camera: str) -> float:
"""Returns the latest frame time for a given camera."""
return self.camera_states[camera].current_frame_time
@ -348,7 +360,7 @@ class TrackedObjectProcessor(threading.Thread):
self, event_id: str, sub_label: str | None, score: float | None
) -> None:
"""Update sub label for given event id."""
tracked_obj: TrackedObject = None
tracked_obj: TrackedObject | None = None
for state in self.camera_states.values():
tracked_obj = state.tracked_objects.get(event_id)
@ -357,7 +369,7 @@ class TrackedObjectProcessor(threading.Thread):
break
try:
event: Event = Event.get(Event.id == event_id)
event: Event | None = Event.get(Event.id == event_id)
except DoesNotExist:
event = None
@ -368,12 +380,12 @@ class TrackedObjectProcessor(threading.Thread):
tracked_obj.obj_data["sub_label"] = (sub_label, score)
if event:
event.sub_label = sub_label
event.sub_label = sub_label # type: ignore[assignment]
data = event.data
if sub_label is None:
data["sub_label_score"] = None
data["sub_label_score"] = None # type: ignore[index]
elif score is not None:
data["sub_label_score"] = score
data["sub_label_score"] = score # type: ignore[index]
event.data = data
event.save()
@ -402,7 +414,7 @@ class TrackedObjectProcessor(threading.Thread):
objects_list = []
sub_labels = set()
events = Event.select(Event.id, Event.label, Event.sub_label).where(
Event.id.in_(detection_ids)
Event.id.in_(detection_ids) # type: ignore[call-arg, misc]
)
for det_event in events:
if det_event.sub_label:
@ -431,13 +443,11 @@ class TrackedObjectProcessor(threading.Thread):
f"Updated sub_label for event {event_id} in review segment {review_segment.id}"
)
except ReviewSegment.DoesNotExist:
except DoesNotExist:
logger.debug(
f"No review segment found with event ID {event_id} when updating sub_label"
)
return True
def set_object_attribute(
self,
event_id: str,
@ -446,7 +456,7 @@ class TrackedObjectProcessor(threading.Thread):
score: float | None,
) -> None:
"""Update attribute for given event id."""
tracked_obj: TrackedObject = None
tracked_obj: TrackedObject | None = None
for state in self.camera_states.values():
tracked_obj = state.tracked_objects.get(event_id)
@ -455,7 +465,7 @@ class TrackedObjectProcessor(threading.Thread):
break
try:
event: Event = Event.get(Event.id == event_id)
event: Event | None = Event.get(Event.id == event_id)
except DoesNotExist:
event = None
@ -470,16 +480,14 @@ class TrackedObjectProcessor(threading.Thread):
if event:
data = event.data
data[field_name] = field_value
data[field_name] = field_value # type: ignore[index]
if field_value is None:
data[f"{field_name}_score"] = None
data[f"{field_name}_score"] = None # type: ignore[index]
elif score is not None:
data[f"{field_name}_score"] = score
data[f"{field_name}_score"] = score # type: ignore[index]
event.data = data
event.save()
return True
def save_lpr_snapshot(self, payload: tuple) -> None:
# save the snapshot image
(frame, event_id, camera) = payload
@ -638,7 +646,7 @@ class TrackedObjectProcessor(threading.Thread):
)
self.ongoing_manual_events.pop(event_id)
def force_end_all_events(self, camera: str, camera_state: CameraState):
def force_end_all_events(self, camera: str, camera_state: CameraState) -> None:
"""Ends all active events on camera when disabling."""
last_frame_name = camera_state.previous_frame_id
for obj_id, obj in list(camera_state.tracked_objects.items()):
@ -656,7 +664,7 @@ class TrackedObjectProcessor(threading.Thread):
{"enabled": False, "motion": 0, "objects": []},
)
def run(self):
def run(self) -> None:
while not self.stop_event.is_set():
# check for config updates
updated_topics = self.camera_config_subscriber.check_for_updates()
@ -698,11 +706,14 @@ class TrackedObjectProcessor(threading.Thread):
# check for sub label updates
while True:
(raw_topic, payload) = self.sub_label_subscriber.check_for_update(
timeout=0
)
update = self.sub_label_subscriber.check_for_update(timeout=0)
if not raw_topic:
if not update:
break
(raw_topic, payload) = update
if not raw_topic or not payload:
break
topic = str(raw_topic)

View File

@ -5,18 +5,19 @@ import math
import os
from collections import defaultdict
from statistics import median
from typing import Any, Optional
from typing import Any, Optional, cast
import cv2
import numpy as np
from frigate.config import (
CameraConfig,
ModelConfig,
FilterConfig,
SnapshotsConfig,
UIConfig,
)
from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.detectors.detector_config import ModelConfig
from frigate.review.types import SeverityEnum
from frigate.util.builtin import sanitize_float
from frigate.util.image import (
@ -46,11 +47,11 @@ class TrackedObject:
model_config: ModelConfig,
camera_config: CameraConfig,
ui_config: UIConfig,
frame_cache,
frame_cache: dict[float, dict[str, Any]],
obj_data: dict[str, Any],
):
) -> None:
# set the score history then remove as it is not part of object state
self.score_history = obj_data["score_history"]
self.score_history: list[float] = obj_data["score_history"]
del obj_data["score_history"]
self.obj_data = obj_data
@ -61,24 +62,24 @@ class TrackedObject:
self.frame_cache = frame_cache
self.zone_presence: dict[str, int] = {}
self.zone_loitering: dict[str, int] = {}
self.current_zones = []
self.entered_zones = []
self.attributes = defaultdict(float)
self.current_zones: list[str] = []
self.entered_zones: list[str] = []
self.attributes: dict[str, float] = defaultdict(float)
self.false_positive = True
self.has_clip = False
self.has_snapshot = False
self.top_score = self.computed_score = 0.0
self.thumbnail_data = None
self.thumbnail_data: dict[str, Any] | None = None
self.last_updated = 0
self.last_published = 0
self.frame = None
self.active = True
self.pending_loitering = False
self.speed_history = []
self.current_estimated_speed = 0
self.average_estimated_speed = 0
self.speed_history: list[float] = []
self.current_estimated_speed: float = 0
self.average_estimated_speed: float = 0
self.velocity_angle = 0
self.path_data = []
self.path_data: list[tuple[Any, float]] = []
self.previous = self.to_dict()
@property
@ -111,7 +112,7 @@ class TrackedObject:
return None
def _is_false_positive(self):
def _is_false_positive(self) -> bool:
# once a true positive, always a true positive
if not self.false_positive:
return False
@ -119,11 +120,13 @@ class TrackedObject:
threshold = self.camera_config.objects.filters[self.obj_data["label"]].threshold
return self.computed_score < threshold
def compute_score(self):
def compute_score(self) -> float:
"""get median of scores for object."""
return median(self.score_history)
def update(self, current_frame_time: float, obj_data, has_valid_frame: bool):
def update(
self, current_frame_time: float, obj_data: dict[str, Any], has_valid_frame: bool
) -> tuple[bool, bool, bool, bool]:
thumb_update = False
significant_change = False
path_update = False
@ -305,7 +308,7 @@ class TrackedObject:
k: self.attributes[k] for k in self.logos if k in self.attributes
}
if len(recognized_logos) > 0:
max_logo = max(recognized_logos, key=recognized_logos.get)
max_logo = max(recognized_logos, key=recognized_logos.get) # type: ignore[arg-type]
# don't overwrite sub label if it is already set
if (
@ -342,28 +345,30 @@ class TrackedObject:
# update path
width = self.camera_config.detect.width
height = self.camera_config.detect.height
bottom_center = (
round(obj_data["centroid"][0] / width, 4),
round(obj_data["box"][3] / height, 4),
)
# calculate a reasonable movement threshold (e.g., 5% of the frame diagonal)
threshold = 0.05 * math.sqrt(width**2 + height**2) / max(width, height)
if not self.path_data:
self.path_data.append((bottom_center, obj_data["frame_time"]))
path_update = True
elif (
math.dist(self.path_data[-1][0], bottom_center) >= threshold
or len(self.path_data) == 1
):
# check Euclidean distance before appending
self.path_data.append((bottom_center, obj_data["frame_time"]))
path_update = True
logger.debug(
f"Point tracking: {obj_data['id']}, {bottom_center}, {obj_data['frame_time']}"
if width is not None and height is not None:
bottom_center = (
round(obj_data["centroid"][0] / width, 4),
round(obj_data["box"][3] / height, 4),
)
# calculate a reasonable movement threshold (e.g., 5% of the frame diagonal)
threshold = 0.05 * math.sqrt(width**2 + height**2) / max(width, height)
if not self.path_data:
self.path_data.append((bottom_center, obj_data["frame_time"]))
path_update = True
elif (
math.dist(self.path_data[-1][0], bottom_center) >= threshold
or len(self.path_data) == 1
):
# check Euclidean distance before appending
self.path_data.append((bottom_center, obj_data["frame_time"]))
path_update = True
logger.debug(
f"Point tracking: {obj_data['id']}, {bottom_center}, {obj_data['frame_time']}"
)
self.obj_data.update(obj_data)
self.current_zones = current_zones
logger.debug(
@ -371,7 +376,7 @@ class TrackedObject:
)
return (thumb_update, significant_change, path_update, autotracker_update)
def to_dict(self):
def to_dict(self) -> dict[str, Any]:
event = {
"id": self.obj_data["id"],
"camera": self.camera_config.name,
@ -413,10 +418,8 @@ class TrackedObject:
return not self.is_stationary()
def is_stationary(self) -> bool:
return (
self.obj_data["motionless_count"]
> self.camera_config.detect.stationary.threshold
)
count = cast(int | float, self.obj_data["motionless_count"])
return count > (self.camera_config.detect.stationary.threshold or 50)
def get_thumbnail(self, ext: str) -> bytes | None:
img_bytes = self.get_img_bytes(
@ -453,9 +456,9 @@ class TrackedObject:
def get_img_bytes(
self,
ext: str,
timestamp=False,
bounding_box=False,
crop=False,
timestamp: bool = False,
bounding_box: bool = False,
crop: bool = False,
height: int | None = None,
quality: int | None = None,
) -> bytes | None:
@ -532,18 +535,18 @@ class TrackedObject:
best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA
)
if timestamp:
color = self.camera_config.timestamp_style.color
colors = self.camera_config.timestamp_style.color
draw_timestamp(
best_frame,
self.thumbnail_data["frame_time"],
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red),
font_color=(colors.blue, colors.green, colors.red),
position=self.camera_config.timestamp_style.position,
)
quality_params = None
quality_params = []
if ext == "jpg":
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70]
@ -596,6 +599,9 @@ class TrackedObject:
p.write(png_bytes)
def write_thumbnail_to_disk(self) -> None:
if not self.camera_config.name:
return
directory = os.path.join(THUMB_DIR, self.camera_config.name)
if not os.path.exists(directory):
@ -603,11 +609,14 @@ class TrackedObject:
thumb_bytes = self.get_thumbnail("webp")
with open(os.path.join(directory, f"{self.obj_data['id']}.webp"), "wb") as f:
f.write(thumb_bytes)
if thumb_bytes:
with open(
os.path.join(directory, f"{self.obj_data['id']}.webp"), "wb"
) as f:
f.write(thumb_bytes)
def zone_filtered(obj: TrackedObject, object_config):
def zone_filtered(obj: TrackedObject, object_config: dict[str, FilterConfig]) -> bool:
object_name = obj.obj_data["label"]
if object_name in object_config:
@ -657,9 +666,9 @@ class TrackedObjectAttribute:
def find_best_object(self, objects: list[dict[str, Any]]) -> Optional[str]:
"""Find the best attribute for each object and return its ID."""
best_object_area = None
best_object_id = None
best_object_label = None
best_object_area: float | None = None
best_object_id: str | None = None
best_object_label: str | None = None
for obj in objects:
if not box_inside(obj["box"], self.box):