From 4d4d54d030b662371d8b864c667cc2ec311ee754 Mon Sep 17 00:00:00 2001 From: Martin Weinelt Date: Tue, 13 May 2025 16:27:20 +0200 Subject: [PATCH] Fix various typing issues (#18187) * Fix the `Any` typing hint treewide There has been confusion between the Any type[1] and the any function[2] in typing hints. [1] https://docs.python.org/3/library/typing.html#typing.Any [2] https://docs.python.org/3/library/functions.html#any * Fix typing for various frame_shape members Frame shapes are most likely defined by height and width, so a single int cannot express that. * Wrap gpu stats functions in Optional[] These can return `None`, so they need to be `Type | None`, which is what `Optional` expresses very nicely. * Fix return type in get_latest_segment_datetime Returns a datetime object, not an integer. * Make the return type of FrameManager.write optional This is necessary since the SharedMemoryFrameManager.write function can return None. * Fix total_seconds() return type in get_tz_modifiers The function returns a float, not an int. https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds * Account for floating point results in to_relative_box Because the function uses division the return types may either be int or float. * Resolve ruff deprecation warning The config has been split into formatter and linter, and the global options are deprecated. --- .../usr/local/ffmpeg/get_ffmpeg_path.py | 7 ++++--- .../rootfs/usr/local/go2rtc/create_config.py | 11 +++++----- .../rootfs/usr/local/nginx/get_base_path.py | 3 ++- .../usr/local/nginx/get_tls_settings.py | 9 ++++---- frigate/api/app.py | 8 +++---- frigate/api/classification.py | 7 ++++--- frigate/api/media.py | 3 ++- frigate/api/notification.py | 3 ++- frigate/camera/activity_manager.py | 12 +++++------ frigate/camera/state.py | 4 ++-- frigate/comms/config_updater.py | 6 +++--- frigate/comms/detections_updater.py | 6 +++--- frigate/comms/embeddings_updater.py | 4 ++-- frigate/comms/event_metadata_updater.py | 3 ++- frigate/comms/events_updater.py | 6 ++++-- frigate/comms/inter_process.py | 4 ++-- frigate/comms/zmq_proxy.py | 8 +++---- .../common/license_plate/mixin.py | 10 ++++----- frigate/data_processing/post/api.py | 5 +++-- frigate/data_processing/post/license_plate.py | 7 ++++--- frigate/data_processing/real_time/api.py | 7 ++++--- frigate/data_processing/real_time/bird.py | 5 +++-- frigate/data_processing/real_time/face.py | 10 ++++----- .../real_time/license_plate.py | 7 ++++--- frigate/detectors/detector_config.py | 4 ++-- frigate/embeddings/__init__.py | 12 +++++------ frigate/embeddings/maintainer.py | 8 +++---- frigate/embeddings/onnx/base_embedding.py | 7 ++++--- frigate/events/audio.py | 4 ++-- frigate/events/cleanup.py | 3 ++- frigate/output/birdseye.py | 10 ++++----- frigate/output/preview.py | 5 +++-- frigate/ptz/autotrack.py | 3 ++- frigate/ptz/onvif.py | 3 ++- frigate/record/maintainer.py | 2 +- frigate/review/maintainer.py | 8 +++---- frigate/stats/emitter.py | 10 ++++----- frigate/timeline.py | 15 ++++++------- frigate/track/__init__.py | 3 ++- frigate/track/norfair_tracker.py | 4 ++-- frigate/track/object_processing.py | 5 +++-- frigate/track/tracked_object.py | 8 +++---- frigate/util/builtin.py | 4 ++-- frigate/util/config.py | 20 +++++++++--------- frigate/util/image.py | 8 +++---- frigate/util/model.py | 3 ++- frigate/util/object.py | 21 ++++++++++--------- frigate/util/services.py | 18 ++++++++-------- frigate/video.py | 8 ++++--- pyproject.toml | 4 ++-- 50 files changed, 191 insertions(+), 164 deletions(-) diff --git a/docker/main/rootfs/usr/local/ffmpeg/get_ffmpeg_path.py b/docker/main/rootfs/usr/local/ffmpeg/get_ffmpeg_path.py index 3de7d9f4a..0f492cc5c 100644 --- a/docker/main/rootfs/usr/local/ffmpeg/get_ffmpeg_path.py +++ b/docker/main/rootfs/usr/local/ffmpeg/get_ffmpeg_path.py @@ -1,5 +1,6 @@ import json import sys +from typing import Any from ruamel.yaml import YAML @@ -21,11 +22,11 @@ try: raw_config = f.read() if config_file.endswith((".yaml", ".yml")): - config: dict[str, any] = yaml.load(raw_config) + config: dict[str, Any] = yaml.load(raw_config) elif config_file.endswith(".json"): - config: dict[str, any] = json.loads(raw_config) + config: dict[str, Any] = json.loads(raw_config) except FileNotFoundError: - config: dict[str, any] = {} + config: dict[str, Any] = {} path = config.get("ffmpeg", {}).get("path", "default") if path == "default": diff --git a/docker/main/rootfs/usr/local/go2rtc/create_config.py b/docker/main/rootfs/usr/local/go2rtc/create_config.py index 21da66f3e..1b44a8067 100644 --- a/docker/main/rootfs/usr/local/go2rtc/create_config.py +++ b/docker/main/rootfs/usr/local/go2rtc/create_config.py @@ -4,6 +4,7 @@ import json import os import sys from pathlib import Path +from typing import Any from ruamel.yaml import YAML @@ -37,13 +38,13 @@ try: raw_config = f.read() if config_file.endswith((".yaml", ".yml")): - config: dict[str, any] = yaml.load(raw_config) + config: dict[str, Any] = yaml.load(raw_config) elif config_file.endswith(".json"): - config: dict[str, any] = json.loads(raw_config) + config: dict[str, Any] = json.loads(raw_config) except FileNotFoundError: - config: dict[str, any] = {} + config: dict[str, Any] = {} -go2rtc_config: dict[str, any] = config.get("go2rtc", {}) +go2rtc_config: dict[str, Any] = config.get("go2rtc", {}) # Need to enable CORS for go2rtc so the frigate integration / card work automatically if go2rtc_config.get("api") is None: @@ -134,7 +135,7 @@ for name in go2rtc_config.get("streams", {}): # add birdseye restream stream if enabled if config.get("birdseye", {}).get("restream", False): - birdseye: dict[str, any] = config.get("birdseye") + birdseye: dict[str, Any] = config.get("birdseye") input = f"-f rawvideo -pix_fmt yuv420p -video_size {birdseye.get('width', 1280)}x{birdseye.get('height', 720)} -r 10 -i {BIRDSEYE_PIPE}" ffmpeg_cmd = f"exec:{parse_preset_hardware_acceleration_encode(ffmpeg_path, config.get('ffmpeg', {}).get('hwaccel_args', ''), input, '-rtsp_transport tcp -f rtsp {output}')}" diff --git a/docker/main/rootfs/usr/local/nginx/get_base_path.py b/docker/main/rootfs/usr/local/nginx/get_base_path.py index e6fc8cfc6..2e78a7de9 100644 --- a/docker/main/rootfs/usr/local/nginx/get_base_path.py +++ b/docker/main/rootfs/usr/local/nginx/get_base_path.py @@ -2,9 +2,10 @@ import json import os +from typing import Any base_path = os.environ.get("FRIGATE_BASE_PATH", "") -result: dict[str, any] = {"base_path": base_path} +result: dict[str, Any] = {"base_path": base_path} print(json.dumps(result)) diff --git a/docker/main/rootfs/usr/local/nginx/get_tls_settings.py b/docker/main/rootfs/usr/local/nginx/get_tls_settings.py index 2ababa282..d2e704056 100644 --- a/docker/main/rootfs/usr/local/nginx/get_tls_settings.py +++ b/docker/main/rootfs/usr/local/nginx/get_tls_settings.py @@ -2,6 +2,7 @@ import json import sys +from typing import Any from ruamel.yaml import YAML @@ -19,12 +20,12 @@ try: raw_config = f.read() if config_file.endswith((".yaml", ".yml")): - config: dict[str, any] = yaml.load(raw_config) + config: dict[str, Any] = yaml.load(raw_config) elif config_file.endswith(".json"): - config: dict[str, any] = json.loads(raw_config) + config: dict[str, Any] = json.loads(raw_config) except FileNotFoundError: - config: dict[str, any] = {} + config: dict[str, Any] = {} -tls_config: dict[str, any] = config.get("tls", {"enabled": True}) +tls_config: dict[str, Any] = config.get("tls", {"enabled": True}) print(json.dumps(tls_config)) diff --git a/frigate/api/app.py b/frigate/api/app.py index 2301d1be1..5860377e7 100644 --- a/frigate/api/app.py +++ b/frigate/api/app.py @@ -131,7 +131,7 @@ def metrics(request: Request): @router.get("/config") def config(request: Request): config_obj: FrigateConfig = request.app.frigate_config - config: dict[str, dict[str, any]] = config_obj.model_dump( + config: dict[str, dict[str, Any]] = config_obj.model_dump( mode="json", warnings="none", exclude_none=True ) @@ -158,7 +158,7 @@ def config(request: Request): camera_dict["zones"][zone_name]["color"] = zone.color # remove go2rtc stream passwords - go2rtc: dict[str, any] = config_obj.go2rtc.model_dump( + go2rtc: dict[str, Any] = config_obj.go2rtc.model_dump( mode="json", warnings="none", exclude_none=True ) for stream_name, stream in go2rtc.get("streams", {}).items(): @@ -648,7 +648,7 @@ def plusModels(request: Request, filterByCurrentModelDetector: bool = False): status_code=400, ) - models: dict[any, any] = request.app.frigate_config.plus_api.get_models() + models: dict[Any, Any] = request.app.frigate_config.plus_api.get_models() if not models["list"]: return JSONResponse( @@ -801,7 +801,7 @@ def hourly_timeline(params: AppTimelineHourlyQueryParameters = Depends()): count = 0 start = 0 end = 0 - hours: dict[str, list[dict[str, any]]] = {} + hours: dict[str, list[dict[str, Any]]] = {} for t in timeline: if count == 0: diff --git a/frigate/api/classification.py b/frigate/api/classification.py index d0fcf775c..75ca13735 100644 --- a/frigate/api/classification.py +++ b/frigate/api/classification.py @@ -4,6 +4,7 @@ import datetime import logging import os import shutil +from typing import Any import cv2 from fastapi import APIRouter, Depends, Request, UploadFile @@ -58,7 +59,7 @@ def reclassify_face(request: Request, body: dict = None): content={"message": "Face recognition is not enabled.", "success": False}, ) - json: dict[str, any] = body or {} + json: dict[str, Any] = body or {} training_file = os.path.join( FACE_DIR, f"train/{sanitize_filename(json.get('training_file', ''))}" ) @@ -91,7 +92,7 @@ def train_face(request: Request, name: str, body: dict = None): content={"message": "Face recognition is not enabled.", "success": False}, ) - json: dict[str, any] = body or {} + json: dict[str, Any] = body or {} training_file_name = sanitize_filename(json.get("training_file", "")) training_file = os.path.join(FACE_DIR, f"train/{training_file_name}") event_id = json.get("event_id") @@ -246,7 +247,7 @@ def deregister_faces(request: Request, name: str, body: dict = None): content={"message": "Face recognition is not enabled.", "success": False}, ) - json: dict[str, any] = body or {} + json: dict[str, Any] = body or {} list_of_ids = json.get("ids", "") context: EmbeddingsContext = request.app.embeddings diff --git a/frigate/api/media.py b/frigate/api/media.py index 9aac3d7e6..9c56e363d 100644 --- a/frigate/api/media.py +++ b/frigate/api/media.py @@ -9,6 +9,7 @@ import subprocess as sp import time from datetime import datetime, timedelta, timezone from pathlib import Path as FilePath +from typing import Any from urllib.parse import unquote import cv2 @@ -89,7 +90,7 @@ def imagestream( camera_name: str, fps: int, height: int, - draw_options: dict[str, any], + draw_options: dict[str, Any], ): while True: # max out at specified FPS diff --git a/frigate/api/notification.py b/frigate/api/notification.py index 0ae1cc9d0..7858ec1a7 100644 --- a/frigate/api/notification.py +++ b/frigate/api/notification.py @@ -2,6 +2,7 @@ import logging import os +from typing import Any from cryptography.hazmat.primitives import serialization from fastapi import APIRouter, Request @@ -41,7 +42,7 @@ def register_notifications(request: Request, body: dict = None): else: username = "admin" - json: dict[str, any] = body or {} + json: dict[str, Any] = body or {} sub = json.get("sub") if not sub: diff --git a/frigate/camera/activity_manager.py b/frigate/camera/activity_manager.py index 7f6354641..6039a07f6 100644 --- a/frigate/camera/activity_manager.py +++ b/frigate/camera/activity_manager.py @@ -1,18 +1,18 @@ """Manage camera activity and updating listeners.""" from collections import Counter -from typing import Callable +from typing import Any, Callable from frigate.config.config import FrigateConfig class CameraActivityManager: def __init__( - self, config: FrigateConfig, publish: Callable[[str, any], None] + self, config: FrigateConfig, publish: Callable[[str, Any], None] ) -> None: self.config = config self.publish = publish - self.last_camera_activity: dict[str, dict[str, any]] = {} + self.last_camera_activity: dict[str, dict[str, Any]] = {} self.camera_all_object_counts: dict[str, Counter] = {} self.camera_active_object_counts: dict[str, Counter] = {} self.zone_all_object_counts: dict[str, Counter] = {} @@ -39,8 +39,8 @@ class CameraActivityManager: else camera_config.objects.track ) - def update_activity(self, new_activity: dict[str, dict[str, any]]) -> None: - all_objects: list[dict[str, any]] = [] + def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None: + all_objects: list[dict[str, Any]] = [] for camera in new_activity.keys(): new_objects = new_activity[camera].get("objects", []) @@ -93,7 +93,7 @@ class CameraActivityManager: self.last_camera_activity = new_activity def compare_camera_activity( - self, camera: str, new_activity: dict[str, any] + self, camera: str, new_activity: dict[str, Any] ) -> None: all_objects = Counter( obj["label"].replace("-verified", "") for obj in new_activity diff --git a/frigate/camera/state.py b/frigate/camera/state.py index c307bea1e..f7b60ed68 100644 --- a/frigate/camera/state.py +++ b/frigate/camera/state.py @@ -239,7 +239,7 @@ class CameraState: self, frame_name: str, frame_time: float, - current_detections: dict[str, dict[str, any]], + current_detections: dict[str, dict[str, Any]], motion_boxes: list[tuple[int, int, int, int]], regions: list[tuple[int, int, int, int]], ): @@ -337,7 +337,7 @@ class CameraState: # TODO: can i switch to looking this up and only changing when an event ends? # maintain best objects - camera_activity: dict[str, list[any]] = { + camera_activity: dict[str, list[Any]] = { "motion": len(motion_boxes) > 0, "objects": [], } diff --git a/frigate/comms/config_updater.py b/frigate/comms/config_updater.py index 49be36c1e..06b870c62 100644 --- a/frigate/comms/config_updater.py +++ b/frigate/comms/config_updater.py @@ -2,7 +2,7 @@ import multiprocessing as mp from multiprocessing.synchronize import Event as MpEvent -from typing import Optional +from typing import Any, Optional import zmq @@ -18,7 +18,7 @@ class ConfigPublisher: self.socket.bind(SOCKET_PUB_SUB) self.stop_event: MpEvent = mp.Event() - def publish(self, topic: str, payload: any) -> None: + def publish(self, topic: str, payload: Any) -> None: """There is no communication back to the processes.""" self.socket.send_string(topic, flags=zmq.SNDMORE) self.socket.send_pyobj(payload) @@ -40,7 +40,7 @@ class ConfigSubscriber: self.socket.setsockopt_string(zmq.SUBSCRIBE, topic) self.socket.connect(SOCKET_PUB_SUB) - def check_for_update(self) -> Optional[tuple[str, any]]: + def check_for_update(self) -> Optional[tuple[str, Any]]: """Returns updated config or None if no update.""" try: topic = self.socket.recv_string(flags=zmq.NOBLOCK) diff --git a/frigate/comms/detections_updater.py b/frigate/comms/detections_updater.py index f585b570d..1718d1347 100644 --- a/frigate/comms/detections_updater.py +++ b/frigate/comms/detections_updater.py @@ -1,7 +1,7 @@ """Facilitates communication between processes.""" from enum import Enum -from typing import Optional +from typing import Any, Optional from .zmq_proxy import Publisher, Subscriber @@ -35,10 +35,10 @@ class DetectionSubscriber(Subscriber): def check_for_update( self, timeout: float = None - ) -> Optional[tuple[DetectionTypeEnum, any]]: + ) -> Optional[tuple[DetectionTypeEnum, Any]]: return super().check_for_update(timeout) - def _return_object(self, topic: str, payload: any) -> any: + def _return_object(self, topic: str, payload: Any) -> Any: if payload is None: return (None, None) return (DetectionTypeEnum[topic[len(self.topic_base) :]], payload) diff --git a/frigate/comms/embeddings_updater.py b/frigate/comms/embeddings_updater.py index 6c26af3d1..74a87e60f 100644 --- a/frigate/comms/embeddings_updater.py +++ b/frigate/comms/embeddings_updater.py @@ -1,7 +1,7 @@ """Facilitates communication between processes.""" from enum import Enum -from typing import Callable +from typing import Any, Callable import zmq @@ -58,7 +58,7 @@ class EmbeddingsRequestor: self.socket = self.context.socket(zmq.REQ) self.socket.connect(SOCKET_REP_REQ) - def send_data(self, topic: str, data: any) -> str: + def send_data(self, topic: str, data: Any) -> str: """Sends data and then waits for reply.""" try: self.socket.send_json((topic, data)) diff --git a/frigate/comms/event_metadata_updater.py b/frigate/comms/event_metadata_updater.py index 6adcaf4be..6305de5a1 100644 --- a/frigate/comms/event_metadata_updater.py +++ b/frigate/comms/event_metadata_updater.py @@ -2,6 +2,7 @@ import logging from enum import Enum +from typing import Any from .zmq_proxy import Publisher, Subscriber @@ -27,7 +28,7 @@ class EventMetadataPublisher(Publisher): def __init__(self) -> None: super().__init__() - def publish(self, topic: EventMetadataTypeEnum, payload: any) -> None: + def publish(self, topic: EventMetadataTypeEnum, payload: Any) -> None: super().publish(payload, topic.value) diff --git a/frigate/comms/events_updater.py b/frigate/comms/events_updater.py index 98b6ccb7a..b1d7a6328 100644 --- a/frigate/comms/events_updater.py +++ b/frigate/comms/events_updater.py @@ -1,5 +1,7 @@ """Facilitates communication between processes.""" +from typing import Any + from frigate.events.types import EventStateEnum, EventTypeEnum from .zmq_proxy import Publisher, Subscriber @@ -14,7 +16,7 @@ class EventUpdatePublisher(Publisher): super().__init__("update") def publish( - self, payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, any]] + self, payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]] ) -> None: super().publish(payload) @@ -37,7 +39,7 @@ class EventEndPublisher(Publisher): super().__init__("finalized") def publish( - self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]] + self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, Any]] ) -> None: super().publish(payload) diff --git a/frigate/comms/inter_process.py b/frigate/comms/inter_process.py index 36a6857a4..ee1a78efc 100644 --- a/frigate/comms/inter_process.py +++ b/frigate/comms/inter_process.py @@ -3,7 +3,7 @@ import multiprocessing as mp import threading from multiprocessing.synchronize import Event as MpEvent -from typing import Callable +from typing import Any, Callable import zmq @@ -63,7 +63,7 @@ class InterProcessRequestor: self.socket = self.context.socket(zmq.REQ) self.socket.connect(SOCKET_REP_REQ) - def send_data(self, topic: str, data: any) -> any: + def send_data(self, topic: str, data: Any) -> Any: """Sends data and then waits for reply.""" try: self.socket.send_json((topic, data)) diff --git a/frigate/comms/zmq_proxy.py b/frigate/comms/zmq_proxy.py index 6a90d6887..d26da3312 100644 --- a/frigate/comms/zmq_proxy.py +++ b/frigate/comms/zmq_proxy.py @@ -2,7 +2,7 @@ import json import threading -from typing import Optional +from typing import Any, Optional import zmq @@ -58,7 +58,7 @@ class Publisher: self.socket = self.context.socket(zmq.PUB) self.socket.connect(SOCKET_PUB) - def publish(self, payload: any, sub_topic: str = "") -> None: + def publish(self, payload: Any, sub_topic: str = "") -> None: """Publish message.""" self.socket.send_string(f"{self.topic}{sub_topic} {json.dumps(payload)}") @@ -81,7 +81,7 @@ class Subscriber: def check_for_update( self, timeout: float = FAST_QUEUE_TIMEOUT - ) -> Optional[tuple[str, any]]: + ) -> Optional[tuple[str, Any]]: """Returns message or None if no update.""" try: has_update, _, _ = zmq.select([self.socket], [], [], timeout) @@ -98,5 +98,5 @@ class Subscriber: self.socket.close() self.context.destroy() - def _return_object(self, topic: str, payload: any) -> any: + def _return_object(self, topic: str, payload: Any) -> Any: return payload diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index e94c1b564..62a13cec0 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -10,7 +10,7 @@ import random import re import string from pathlib import Path -from typing import List, Optional, Tuple +from typing import Any, List, Optional, Tuple import cv2 import numpy as np @@ -1181,7 +1181,7 @@ class LicensePlateProcessingMixin: return event_id def lpr_process( - self, obj_data: dict[str, any], frame: np.ndarray, dedicated_lpr: bool = False + self, obj_data: dict[str, Any], frame: np.ndarray, dedicated_lpr: bool = False ): """Look for license plates in image.""" self.metrics.alpr_pps.value = self.plates_rec_second.eps() @@ -1272,7 +1272,7 @@ class LicensePlateProcessingMixin: ) return - license_plate: Optional[dict[str, any]] = None + license_plate: Optional[dict[str, Any]] = None if "license_plate" not in self.config.cameras[camera].objects.track: logger.debug(f"{camera}: Running manual license_plate detection.") @@ -1341,7 +1341,7 @@ class LicensePlateProcessingMixin: return if obj_data.get("label") in ["car", "motorcycle"]: - attributes: list[dict[str, any]] = obj_data.get( + attributes: list[dict[str, Any]] = obj_data.get( "current_attributes", [] ) for attr in attributes: @@ -1567,7 +1567,7 @@ class LicensePlateProcessingMixin: "last_seen": current_time if dedicated_lpr else None, } - def handle_request(self, topic, request_data) -> dict[str, any] | None: + def handle_request(self, topic, request_data) -> dict[str, Any] | None: return def expire_object(self, object_id: str, camera: str): diff --git a/frigate/data_processing/post/api.py b/frigate/data_processing/post/api.py index c40caef71..cd6dda128 100644 --- a/frigate/data_processing/post/api.py +++ b/frigate/data_processing/post/api.py @@ -2,6 +2,7 @@ import logging from abc import ABC, abstractmethod +from typing import Any from frigate.config import FrigateConfig @@ -25,7 +26,7 @@ class PostProcessorApi(ABC): @abstractmethod def process_data( - self, data: dict[str, any], data_type: PostProcessDataEnum + self, data: dict[str, Any], data_type: PostProcessDataEnum ) -> None: """Processes the data of data type. Args: @@ -38,7 +39,7 @@ class PostProcessorApi(ABC): pass @abstractmethod - def handle_request(self, request_data: dict[str, any]) -> dict[str, any] | None: + def handle_request(self, request_data: dict[str, Any]) -> dict[str, Any] | None: """Handle metadata requests. Args: request_data (dict): containing data about requested change to process. diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index 6bd3edf60..e95cf234e 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -2,6 +2,7 @@ import datetime import logging +from typing import Any import cv2 import numpy as np @@ -36,7 +37,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): sub_label_publisher: EventMetadataPublisher, metrics: DataProcessorMetrics, model_runner: LicensePlateModelRunner, - detected_license_plates: dict[str, dict[str, any]], + detected_license_plates: dict[str, dict[str, Any]], ): self.requestor = requestor self.detected_license_plates = detected_license_plates @@ -47,7 +48,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): super().__init__(config, metrics, model_runner) def process_data( - self, data: dict[str, any], data_type: PostProcessDataEnum + self, data: dict[str, Any], data_type: PostProcessDataEnum ) -> None: """Look for license plates in recording stream image Args: @@ -214,7 +215,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): logger.debug(f"Post processing plate: {event_id}, {frame_time}") self.lpr_process(keyframe_obj_data, frame) - def handle_request(self, topic, request_data) -> dict[str, any] | None: + def handle_request(self, topic, request_data) -> dict[str, Any] | None: if topic == EmbeddingsRequestEnum.reprocess_plate.value: event = request_data["event"] diff --git a/frigate/data_processing/real_time/api.py b/frigate/data_processing/real_time/api.py index 3d8e735c1..0fa0f9952 100644 --- a/frigate/data_processing/real_time/api.py +++ b/frigate/data_processing/real_time/api.py @@ -2,6 +2,7 @@ import logging from abc import ABC, abstractmethod +from typing import Any import numpy as np @@ -24,7 +25,7 @@ class RealTimeProcessorApi(ABC): pass @abstractmethod - def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None: + def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray) -> None: """Processes the frame with object data. Args: obj_data (dict): containing data about focused object in frame. @@ -37,8 +38,8 @@ class RealTimeProcessorApi(ABC): @abstractmethod def handle_request( - self, topic: str, request_data: dict[str, any] - ) -> dict[str, any] | None: + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | None: """Handle metadata requests. Args: topic (str): topic that dictates what work is requested. diff --git a/frigate/data_processing/real_time/bird.py b/frigate/data_processing/real_time/bird.py index 7fd2a5edb..ce4e50ef2 100644 --- a/frigate/data_processing/real_time/bird.py +++ b/frigate/data_processing/real_time/bird.py @@ -2,6 +2,7 @@ import logging import os +from typing import Any import cv2 import numpy as np @@ -35,8 +36,8 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): super().__init__(config, metrics) self.interpreter: Interpreter = None self.sub_label_publisher = sub_label_publisher - self.tensor_input_details: dict[str, any] = None - self.tensor_output_details: dict[str, any] = None + self.tensor_input_details: dict[str, Any] = None + self.tensor_output_details: dict[str, Any] = None self.detected_birds: dict[str, float] = {} self.labelmap: dict[int, str] = {} diff --git a/frigate/data_processing/real_time/face.py b/frigate/data_processing/real_time/face.py index 2b6239742..bb427b846 100644 --- a/frigate/data_processing/real_time/face.py +++ b/frigate/data_processing/real_time/face.py @@ -6,7 +6,7 @@ import json import logging import os import shutil -from typing import Optional +from typing import Any, Optional import cv2 import numpy as np @@ -157,7 +157,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.faces_per_second.update() self.inference_speed.update(duration) - def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): + def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray): """Look for faces in image.""" self.metrics.face_rec_fps.value = self.faces_per_second.eps() camera = obj_data["camera"] @@ -198,7 +198,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): logger.debug("Not processing due to hitting max rec attempts.") return - face: Optional[dict[str, any]] = None + face: Optional[dict[str, Any]] = None if self.requires_face_detection: logger.debug("Running manual face detection.") @@ -238,7 +238,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): logger.debug("No attributes to parse.") return - attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) + attributes: list[dict[str, Any]] = obj_data.get("current_attributes", []) for attr in attributes: if attr.get("label") != "face": continue @@ -323,7 +323,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.__update_metrics(datetime.datetime.now().timestamp() - start) - def handle_request(self, topic, request_data) -> dict[str, any] | None: + def handle_request(self, topic, request_data) -> dict[str, Any] | None: if topic == EmbeddingsRequestEnum.clear_face_classifier.value: self.recognizer.clear() elif topic == EmbeddingsRequestEnum.recognize_face.value: diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py index 13596057d..13be45a77 100644 --- a/frigate/data_processing/real_time/license_plate.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -2,6 +2,7 @@ import json import logging +from typing import Any import numpy as np @@ -30,7 +31,7 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess sub_label_publisher: EventMetadataPublisher, metrics: DataProcessorMetrics, model_runner: LicensePlateModelRunner, - detected_license_plates: dict[str, dict[str, any]], + detected_license_plates: dict[str, dict[str, Any]], ): self.requestor = requestor self.detected_license_plates = detected_license_plates @@ -43,14 +44,14 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess def process_frame( self, - obj_data: dict[str, any], + obj_data: dict[str, Any], frame: np.ndarray, dedicated_lpr: bool | None = False, ): """Look for license plates in image.""" self.lpr_process(obj_data, frame, dedicated_lpr) - def handle_request(self, topic, request_data) -> dict[str, any] | None: + def handle_request(self, topic, request_data) -> dict[str, Any] | None: return def expire_object(self, object_id: str, camera: str): diff --git a/frigate/detectors/detector_config.py b/frigate/detectors/detector_config.py index f14da57a8..3893908d2 100644 --- a/frigate/detectors/detector_config.py +++ b/frigate/detectors/detector_config.py @@ -3,7 +3,7 @@ import json import logging import os from enum import Enum -from typing import Dict, Optional, Tuple +from typing import Any, Dict, Optional, Tuple import requests from pydantic import BaseModel, ConfigDict, Field @@ -147,7 +147,7 @@ class ModelConfig(BaseModel): json.dump(model_info, f) else: with open(model_info_path, "r") as f: - model_info: dict[str, any] = json.load(f) + model_info: dict[str, Any] = json.load(f) if detector and detector not in model_info["supportedDetectors"]: raise ValueError(f"Model does not support detector type of {detector}") diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 3687021b0..169548577 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -9,7 +9,7 @@ import re import signal import threading from types import FrameType -from typing import Optional, Union +from typing import Any, Optional, Union from pathvalidate import ValidationError, sanitize_filename from setproctitle import setproctitle @@ -190,7 +190,7 @@ class EmbeddingsContext: return results - def register_face(self, face_name: str, image_data: bytes) -> dict[str, any]: + def register_face(self, face_name: str, image_data: bytes) -> dict[str, Any]: return self.requestor.send_data( EmbeddingsRequestEnum.register_face.value, { @@ -199,7 +199,7 @@ class EmbeddingsContext: }, ) - def recognize_face(self, image_data: bytes) -> dict[str, any]: + def recognize_face(self, image_data: bytes) -> dict[str, Any]: return self.requestor.send_data( EmbeddingsRequestEnum.recognize_face.value, { @@ -217,7 +217,7 @@ class EmbeddingsContext: return self.db.execute_sql(sql_query).fetchall() - def reprocess_face(self, face_file: str) -> dict[str, any]: + def reprocess_face(self, face_file: str) -> dict[str, Any]: return self.requestor.send_data( EmbeddingsRequestEnum.reprocess_face.value, {"image_file": face_file} ) @@ -284,10 +284,10 @@ class EmbeddingsContext: {"id": event_id, "description": description}, ) - def reprocess_plate(self, event: dict[str, any]) -> dict[str, any]: + def reprocess_plate(self, event: dict[str, Any]) -> dict[str, Any]: return self.requestor.send_data( EmbeddingsRequestEnum.reprocess_plate.value, {"event": event} ) - def reindex_embeddings(self) -> dict[str, any]: + def reindex_embeddings(self) -> dict[str, Any]: return self.requestor.send_data(EmbeddingsRequestEnum.reindex.value, {}) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 487d5dbc4..86bc75737 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -7,7 +7,7 @@ import os import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path -from typing import Optional +from typing import Any, Optional import cv2 import numpy as np @@ -104,7 +104,7 @@ class EmbeddingMaintainer(threading.Thread): self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() - self.detected_license_plates: dict[str, dict[str, any]] = {} + self.detected_license_plates: dict[str, dict[str, Any]] = {} # model runners to share between realtime and post processors if self.config.lpr.enabled: @@ -159,7 +159,7 @@ class EmbeddingMaintainer(threading.Thread): ) self.stop_event = stop_event - self.tracked_events: dict[str, list[any]] = {} + self.tracked_events: dict[str, list[Any]] = {} self.early_request_sent: dict[str, bool] = {} self.genai_client = get_genai_client(config) @@ -190,7 +190,7 @@ class EmbeddingMaintainer(threading.Thread): def _process_requests(self) -> None: """Process embeddings requests""" - def _handle_request(topic: str, data: dict[str, any]) -> str: + def _handle_request(topic: str, data: dict[str, Any]) -> str: try: # First handle the embedding-specific topics when semantic search is enabled if self.config.semantic_search.enabled: diff --git a/frigate/embeddings/onnx/base_embedding.py b/frigate/embeddings/onnx/base_embedding.py index 7403f0ac1..fcadd2852 100644 --- a/frigate/embeddings/onnx/base_embedding.py +++ b/frigate/embeddings/onnx/base_embedding.py @@ -5,6 +5,7 @@ import os from abc import ABC, abstractmethod from enum import Enum from io import BytesIO +from typing import Any import numpy as np import requests @@ -59,7 +60,7 @@ class BaseEmbedding(ABC): pass @abstractmethod - def _preprocess_inputs(self, raw_inputs: any) -> any: + def _preprocess_inputs(self, raw_inputs: Any) -> Any: pass def _process_image(self, image, output: str = "RGB") -> Image.Image: @@ -74,7 +75,7 @@ class BaseEmbedding(ABC): return image - def _postprocess_outputs(self, outputs: any) -> any: + def _postprocess_outputs(self, outputs: Any) -> Any: return outputs def __call__( @@ -84,7 +85,7 @@ class BaseEmbedding(ABC): processed = self._preprocess_inputs(inputs) input_names = self.runner.get_input_names() onnx_inputs = {name: [] for name in input_names} - input: dict[str, any] + input: dict[str, Any] for input in processed: for key, value in input.items(): if key in input_names: diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 024739366..f2a217fd3 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -6,7 +6,7 @@ import random import string import threading import time -from typing import Tuple +from typing import Any, Tuple import numpy as np @@ -126,7 +126,7 @@ class AudioEventMaintainer(threading.Thread): self.config = camera self.camera_metrics = camera_metrics - self.detections: dict[dict[str, any]] = {} + self.detections: dict[dict[str, Any]] = {} self.stop_event = stop_event self.detector = AudioTfl(stop_event, self.config.audio.num_threads) self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),) diff --git a/frigate/events/cleanup.py b/frigate/events/cleanup.py index fbc7b6c3d..fde27aa0f 100644 --- a/frigate/events/cleanup.py +++ b/frigate/events/cleanup.py @@ -6,6 +6,7 @@ import os import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path +from typing import Any from frigate.config import FrigateConfig from frigate.const import CLIPS_DIR @@ -29,7 +30,7 @@ class EventCleanup(threading.Thread): self.db = db self.camera_keys = list(self.config.cameras.keys()) self.removed_camera_labels: list[str] = None - self.camera_labels: dict[str, dict[str, any]] = {} + self.camera_labels: dict[str, dict[str, Any]] = {} def get_removed_camera_labels(self) -> list[Event]: """Get a list of distinct labels for removed cameras.""" diff --git a/frigate/output/birdseye.py b/frigate/output/birdseye.py index 20cda50f7..b295af82e 100644 --- a/frigate/output/birdseye.py +++ b/frigate/output/birdseye.py @@ -10,7 +10,7 @@ import queue import subprocess as sp import threading import traceback -from typing import Optional +from typing import Any, Optional import cv2 import numpy as np @@ -542,10 +542,10 @@ class BirdsEyeFrameManager: self, cameras_to_add: list[str], coefficient: float, - ) -> tuple[any]: + ) -> tuple[Any]: """Calculate the optimal layout for 2+ cameras.""" - def map_layout(camera_layout: list[list[any]], row_height: int): + def map_layout(camera_layout: list[list[Any]], row_height: int): """Map the calculated layout.""" candidate_layout = [] starting_x = 0 @@ -588,7 +588,7 @@ class BirdsEyeFrameManager: return max_width, y, candidate_layout canvas_aspect_x, canvas_aspect_y = self.canvas.get_aspect(coefficient) - camera_layout: list[list[any]] = [] + camera_layout: list[list[Any]] = [] camera_layout.append([]) starting_x = 0 x = starting_x @@ -786,7 +786,7 @@ class Birdseye: def write_data( self, camera: str, - current_tracked_objects: list[dict[str, any]], + current_tracked_objects: list[dict[str, Any]], motion_boxes: list[list[int]], frame_time: float, frame: np.ndarray, diff --git a/frigate/output/preview.py b/frigate/output/preview.py index 3ffca2f04..be9da292b 100644 --- a/frigate/output/preview.py +++ b/frigate/output/preview.py @@ -8,6 +8,7 @@ import subprocess as sp import threading import time from pathlib import Path +from typing import Any import cv2 import numpy as np @@ -255,7 +256,7 @@ class PreviewRecorder: def should_write_frame( self, - current_tracked_objects: list[dict[str, any]], + current_tracked_objects: list[dict[str, Any]], motion_boxes: list[list[int]], frame_time: float, ) -> bool: @@ -315,7 +316,7 @@ class PreviewRecorder: def write_data( self, - current_tracked_objects: list[dict[str, any]], + current_tracked_objects: list[dict[str, Any]], motion_boxes: list[list[int]], frame_time: float, frame: np.ndarray, diff --git a/frigate/ptz/autotrack.py b/frigate/ptz/autotrack.py index 709165c24..92bfe0ec0 100644 --- a/frigate/ptz/autotrack.py +++ b/frigate/ptz/autotrack.py @@ -7,6 +7,7 @@ import threading import time from collections import deque from multiprocessing.synchronize import Event as MpEvent +from typing import Any import cv2 import numpy as np @@ -59,7 +60,7 @@ class PtzMotionEstimator: def motion_estimator( self, - detections: list[dict[str, any]], + detections: list[dict[str, Any]], frame_name: str, frame_time: float, camera: str, diff --git a/frigate/ptz/onvif.py b/frigate/ptz/onvif.py index b01d9ed96..0277fa083 100644 --- a/frigate/ptz/onvif.py +++ b/frigate/ptz/onvif.py @@ -7,6 +7,7 @@ import time from enum import Enum from importlib.util import find_spec from pathlib import Path +from typing import Any import numpy from onvif import ONVIFCamera, ONVIFError, ONVIFService @@ -646,7 +647,7 @@ class OnvifController: f"Error executing command {command} for camera {camera_name}: {e}" ) - async def get_camera_info(self, camera_name: str) -> dict[str, any]: + async def get_camera_info(self, camera_name: str) -> dict[str, Any]: """ Get ptz capabilities and presets, attempting to reconnect if ONVIF is configured but not initialized. diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 25783965c..f1b9a600e 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -242,7 +242,7 @@ class RecordingMaintainer(threading.Thread): self.end_time_cache.pop(cache_path, None) async def validate_and_move_segment( - self, camera: str, reviews: list[ReviewSegment], recording: dict[str, any] + self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any] ) -> None: cache_path: str = recording["cache_path"] start_time: datetime.datetime = recording["start_time"] diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 6b5c32956..b144b6e52 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -10,7 +10,7 @@ import sys import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path -from typing import Optional +from typing import Any, Optional import cv2 import numpy as np @@ -156,7 +156,7 @@ class ReviewSegmentMaintainer(threading.Thread): self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) # manual events - self.indefinite_events: dict[str, dict[str, any]] = {} + self.indefinite_events: dict[str, dict[str, Any]] = {} # ensure dirs Path(os.path.join(CLIPS_DIR, "review")).mkdir(exist_ok=True) @@ -194,7 +194,7 @@ class ReviewSegmentMaintainer(threading.Thread): camera_config: CameraConfig, frame, objects: list[TrackedObject], - prev_data: dict[str, any], + prev_data: dict[str, Any], ) -> None: """Update segment.""" if frame is not None: @@ -219,7 +219,7 @@ class ReviewSegmentMaintainer(threading.Thread): def _publish_segment_end( self, segment: PendingReviewSegment, - prev_data: dict[str, any], + prev_data: dict[str, Any], ) -> None: """End segment.""" final_data = segment.get_data(ended=True) diff --git a/frigate/stats/emitter.py b/frigate/stats/emitter.py index 022e99213..42d4c16a8 100644 --- a/frigate/stats/emitter.py +++ b/frigate/stats/emitter.py @@ -6,7 +6,7 @@ import logging import threading import time from multiprocessing.synchronize import Event as MpEvent -from typing import Optional +from typing import Any, Optional from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig @@ -33,12 +33,12 @@ class StatsEmitter(threading.Thread): self.stats_tracking = stats_tracking self.stop_event = stop_event self.hwaccel_errors: list[str] = [] - self.stats_history: list[dict[str, any]] = [] + self.stats_history: list[dict[str, Any]] = [] # create communication for stats self.requestor = InterProcessRequestor() - def get_latest_stats(self) -> dict[str, any]: + def get_latest_stats(self) -> dict[str, Any]: """Get latest stats.""" if len(self.stats_history) > 0: return self.stats_history[-1] @@ -51,12 +51,12 @@ class StatsEmitter(threading.Thread): def get_stats_history( self, keys: Optional[list[str]] = None - ) -> list[dict[str, any]]: + ) -> list[dict[str, Any]]: """Get stats history.""" if not keys: return self.stats_history - selected_stats: list[dict[str, any]] = [] + selected_stats: list[dict[str, Any]] = [] for s in self.stats_history: selected = {} diff --git a/frigate/timeline.py b/frigate/timeline.py index 8055ccddc..4e3c8e293 100644 --- a/frigate/timeline.py +++ b/frigate/timeline.py @@ -5,6 +5,7 @@ import queue import threading from multiprocessing import Queue from multiprocessing.synchronize import Event as MpEvent +from typing import Any from frigate.config import FrigateConfig from frigate.events.maintainer import EventStateEnum, EventTypeEnum @@ -27,7 +28,7 @@ class TimelineProcessor(threading.Thread): self.config = config self.queue = queue self.stop_event = stop_event - self.pre_event_cache: dict[str, list[dict[str, any]]] = {} + self.pre_event_cache: dict[str, list[dict[str, Any]]] = {} def run(self) -> None: while not self.stop_event.is_set(): @@ -55,9 +56,9 @@ class TimelineProcessor(threading.Thread): def insert_or_save( self, - entry: dict[str, any], - prev_event_data: dict[any, any], - event_data: dict[any, any], + entry: dict[str, Any], + prev_event_data: dict[Any, Any], + event_data: dict[Any, Any], ) -> None: """Insert into db or cache.""" id = entry[Timeline.source_id] @@ -81,8 +82,8 @@ class TimelineProcessor(threading.Thread): self, camera: str, event_type: str, - prev_event_data: dict[any, any], - event_data: dict[any, any], + prev_event_data: dict[Any, Any], + event_data: dict[Any, Any], ) -> bool: """Handle object detection.""" save = False @@ -153,7 +154,7 @@ class TimelineProcessor(threading.Thread): self, camera: str, event_type: str, - event_data: dict[any, any], + event_data: dict[Any, Any], ) -> bool: if event_type != "new": return False diff --git a/frigate/track/__init__.py b/frigate/track/__init__.py index 4fe51f476..dc72be4f0 100644 --- a/frigate/track/__init__.py +++ b/frigate/track/__init__.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from typing import Any from frigate.config import DetectConfig @@ -10,6 +11,6 @@ class ObjectTracker(ABC): @abstractmethod def match_and_update( - self, frame_name: str, frame_time: float, detections: list[dict[str, any]] + self, frame_name: str, frame_time: float, detections: list[dict[str, Any]] ) -> None: pass diff --git a/frigate/track/norfair_tracker.py b/frigate/track/norfair_tracker.py index ec2bdfa7d..900971e0d 100644 --- a/frigate/track/norfair_tracker.py +++ b/frigate/track/norfair_tracker.py @@ -1,7 +1,7 @@ import logging import random import string -from typing import Sequence +from typing import Any, Sequence import cv2 import numpy as np @@ -460,7 +460,7 @@ class NorfairTracker(ObjectTracker): self.match_and_update(frame_name, frame_time, detections=detections) def match_and_update( - self, frame_name: str, frame_time: float, detections: list[dict[str, any]] + self, frame_name: str, frame_time: float, detections: list[dict[str, Any]] ): # Group detections by object type detections_by_type = {} diff --git a/frigate/track/object_processing.py b/frigate/track/object_processing.py index 702df32bd..beccc4cbf 100644 --- a/frigate/track/object_processing.py +++ b/frigate/track/object_processing.py @@ -7,6 +7,7 @@ import threading from collections import defaultdict from enum import Enum from multiprocessing.synchronize import Event as MpEvent +from typing import Any import cv2 import numpy as np @@ -70,7 +71,7 @@ class TrackedObjectProcessor(threading.Thread): self.event_end_subscriber = EventEndSubscriber() self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all) - self.camera_activity: dict[str, dict[str, any]] = {} + self.camera_activity: dict[str, dict[str, Any]] = {} self.ongoing_manual_events: dict[str, str] = {} # { @@ -301,7 +302,7 @@ class TrackedObjectProcessor(threading.Thread): return {} def get_current_frame( - self, camera: str, draw_options: dict[str, any] = {} + self, camera: str, draw_options: dict[str, Any] = {} ) -> np.ndarray | None: if camera == "birdseye": return self.frame_manager.get( diff --git a/frigate/track/tracked_object.py b/frigate/track/tracked_object.py index 77ebc6646..da0291aad 100644 --- a/frigate/track/tracked_object.py +++ b/frigate/track/tracked_object.py @@ -5,7 +5,7 @@ import math import os from collections import defaultdict from statistics import median -from typing import Optional +from typing import Any, Optional import cv2 import numpy as np @@ -38,7 +38,7 @@ class TrackedObject: camera_config: CameraConfig, ui_config: UIConfig, frame_cache, - obj_data: dict[str, any], + obj_data: dict[str, Any], ): # set the score history then remove as it is not part of object state self.score_history = obj_data["score_history"] @@ -621,7 +621,7 @@ class TrackedObjectAttribute: self.ratio = raw_data[4] self.region = raw_data[5] - def get_tracking_data(self) -> dict[str, any]: + def get_tracking_data(self) -> dict[str, Any]: """Return data saved to the object.""" return { "label": self.label, @@ -629,7 +629,7 @@ class TrackedObjectAttribute: "box": self.box, } - def find_best_object(self, objects: list[dict[str, any]]) -> Optional[str]: + def find_best_object(self, objects: list[dict[str, Any]]) -> Optional[str]: """Find the best attribute for each object and return its ID.""" best_object_area = None best_object_id = None diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index e7a03052d..b35c7b942 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -156,7 +156,7 @@ def load_labels(path: Optional[str], encoding="utf-8", prefill=91): return labels -def get_tz_modifiers(tz_name: str) -> Tuple[str, str, int]: +def get_tz_modifiers(tz_name: str) -> Tuple[str, str, float]: seconds_offset = ( datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds() ) @@ -169,7 +169,7 @@ def get_tz_modifiers(tz_name: str) -> Tuple[str, str, int]: def to_relative_box( width: int, height: int, box: Tuple[int, int, int, int] -) -> Tuple[int, int, int, int]: +) -> Tuple[int | float, int | float, int | float, int | float]: return ( box[0] / width, # x box[1] / height, # y diff --git a/frigate/util/config.py b/frigate/util/config.py index 59d704b9e..70492adbc 100644 --- a/frigate/util/config.py +++ b/frigate/util/config.py @@ -4,7 +4,7 @@ import asyncio import logging import os import shutil -from typing import Optional, Union +from typing import Any, Optional, Union from ruamel.yaml import YAML @@ -37,7 +37,7 @@ def migrate_frigate_config(config_file: str): yaml = YAML() yaml.indent(mapping=2, sequence=4, offset=2) with open(config_file, "r") as f: - config: dict[str, dict[str, any]] = yaml.load(f) + config: dict[str, dict[str, Any]] = yaml.load(f) if config is None: logger.error(f"Failed to load config at {config_file}") @@ -94,7 +94,7 @@ def migrate_frigate_config(config_file: str): logger.info("Finished frigate config migration...") -def migrate_014(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]: +def migrate_014(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]: """Handle migrating frigate config to 0.14""" # migrate record.events.required_zones to review.alerts.required_zones new_config = config.copy() @@ -142,7 +142,7 @@ def migrate_014(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]: del new_config["rtmp"] for name, camera in config.get("cameras", {}).items(): - camera_config: dict[str, dict[str, any]] = camera.copy() + camera_config: dict[str, dict[str, Any]] = camera.copy() required_zones = ( camera_config.get("record", {}).get("events", {}).get("required_zones", []) ) @@ -181,7 +181,7 @@ def migrate_014(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]: return new_config -def migrate_015_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]: +def migrate_015_0(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]: """Handle migrating frigate config to 0.15-0""" new_config = config.copy() @@ -232,9 +232,9 @@ def migrate_015_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any] del new_config["record"]["events"] for name, camera in config.get("cameras", {}).items(): - camera_config: dict[str, dict[str, any]] = camera.copy() + camera_config: dict[str, dict[str, Any]] = camera.copy() - record_events: dict[str, any] = camera_config.get("record", {}).get("events") + record_events: dict[str, Any] = camera_config.get("record", {}).get("events") if record_events: alerts_retention = {"retain": {}} @@ -281,7 +281,7 @@ def migrate_015_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any] return new_config -def migrate_015_1(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]: +def migrate_015_1(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]: """Handle migrating frigate config to 0.15-1""" new_config = config.copy() @@ -296,7 +296,7 @@ def migrate_015_1(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any] return new_config -def migrate_016_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]: +def migrate_016_0(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]: """Handle migrating frigate config to 0.16-0""" new_config = config.copy() @@ -307,7 +307,7 @@ def migrate_016_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any] new_config["detect"] = detect_config for name, camera in config.get("cameras", {}).items(): - camera_config: dict[str, dict[str, any]] = camera.copy() + camera_config: dict[str, dict[str, Any]] = camera.copy() live_config = camera_config.get("live", {}) if "stream_name" in live_config: diff --git a/frigate/util/image.py b/frigate/util/image.py index 5abbfb943..58afe8b36 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -8,7 +8,7 @@ from abc import ABC, abstractmethod from multiprocessing import resource_tracker as _mprt from multiprocessing import shared_memory as _mpshm from string import printable -from typing import AnyStr, Optional +from typing import Any, AnyStr, Optional import cv2 import numpy as np @@ -766,7 +766,7 @@ class FrameManager(ABC): pass @abstractmethod - def write(self, name: str) -> memoryview: + def write(self, name: str) -> Optional[memoryview]: pass @abstractmethod @@ -847,7 +847,7 @@ class SharedMemoryFrameManager(FrameManager): self.shm_store[name] = shm return shm.buf - def write(self, name: str) -> memoryview: + def write(self, name: str) -> Optional[memoryview]: try: if name in self.shm_store: shm = self.shm_store[name] @@ -944,7 +944,7 @@ def get_image_from_recording( relative_frame_time: float, codec: str, height: Optional[int] = None, -) -> Optional[any]: +) -> Optional[Any]: """retrieve a frame from given time in recording file.""" ffmpeg_cmd = [ diff --git a/frigate/util/model.py b/frigate/util/model.py index 5528c15ca..7ac7d15b0 100644 --- a/frigate/util/model.py +++ b/frigate/util/model.py @@ -2,6 +2,7 @@ import logging import os +from typing import Any import cv2 import numpy as np @@ -284,7 +285,7 @@ def post_process_yolox( def get_ort_providers( force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False -) -> tuple[list[str], list[dict[str, any]]]: +) -> tuple[list[str], list[dict[str, Any]]]: if force_cpu: return ( ["CPUExecutionProvider"], diff --git a/frigate/util/object.py b/frigate/util/object.py index da0a54279..d9a8c2f71 100644 --- a/frigate/util/object.py +++ b/frigate/util/object.py @@ -4,6 +4,7 @@ import datetime import logging import math from collections import defaultdict +from typing import Any import cv2 import numpy as np @@ -38,7 +39,7 @@ def get_camera_regions_grid( name: str, detect: DetectConfig, min_region_size: int, -) -> list[list[dict[str, any]]]: +) -> list[list[dict[str, Any]]]: """Build a grid of expected region sizes for a camera.""" # get grid from db if available try: @@ -163,10 +164,10 @@ def get_cluster_region_from_grid(frame_shape, min_region, cluster, boxes, region def get_region_from_grid( - frame_shape: tuple[int], + frame_shape: tuple[int, int], cluster: list[int], min_region: int, - region_grid: list[list[dict[str, any]]], + region_grid: list[list[dict[str, Any]]], ) -> list[int]: """Get a region for a box based on the region grid.""" box = calculate_region( @@ -446,9 +447,9 @@ def get_cluster_region(frame_shape, min_region, cluster, boxes): def get_startup_regions( - frame_shape: tuple[int], + frame_shape: tuple[int, int], region_min_size: int, - region_grid: list[list[dict[str, any]]], + region_grid: list[list[dict[str, Any]]], ) -> list[list[int]]: """Get a list of regions to run on startup.""" # return 8 most popular regions for the camera @@ -480,12 +481,12 @@ def get_startup_regions( def reduce_detections( - frame_shape: tuple[int], - all_detections: list[tuple[any]], -) -> list[tuple[any]]: + frame_shape: tuple[int, int], + all_detections: list[tuple[Any]], +) -> list[tuple[Any]]: """Take a list of detections and reduce overlaps to create a list of confident detections.""" - def reduce_overlapping_detections(detections: list[tuple[any]]) -> list[tuple[any]]: + def reduce_overlapping_detections(detections: list[tuple[Any]]) -> list[tuple[Any]]: """apply non-maxima suppression to suppress weak, overlapping bounding boxes.""" detected_object_groups = defaultdict(lambda: []) for detection in detections: @@ -524,7 +525,7 @@ def reduce_detections( # set the detections list to only include top objects return selected_objects - def get_consolidated_object_detections(detections: list[tuple[any]]): + def get_consolidated_object_detections(detections: list[tuple[Any]]): """Drop detections that overlap too much.""" detected_object_groups = defaultdict(lambda: []) for detection in detections: diff --git a/frigate/util/services.py b/frigate/util/services.py index 1c778bac4..c63b2cb34 100644 --- a/frigate/util/services.py +++ b/frigate/util/services.py @@ -9,7 +9,7 @@ import signal import subprocess as sp import traceback from datetime import datetime -from typing import List, Optional, Tuple +from typing import Any, List, Optional, Tuple import cv2 import psutil @@ -230,7 +230,7 @@ def is_vaapi_amd_driver() -> bool: return any("AMD Radeon Graphics" in line for line in output) -def get_amd_gpu_stats() -> dict[str, str]: +def get_amd_gpu_stats() -> Optional[dict[str, str]]: """Get stats using radeontop.""" radeontop_command = ["radeontop", "-d", "-", "-l", "1"] @@ -256,7 +256,7 @@ def get_amd_gpu_stats() -> dict[str, str]: return results -def get_intel_gpu_stats(sriov: bool) -> dict[str, str]: +def get_intel_gpu_stats(sriov: bool) -> Optional[dict[str, str]]: """Get stats using intel_gpu_top.""" def get_stats_manually(output: str) -> dict[str, str]: @@ -382,7 +382,7 @@ def get_intel_gpu_stats(sriov: bool) -> dict[str, str]: return results -def get_rockchip_gpu_stats() -> dict[str, str]: +def get_rockchip_gpu_stats() -> Optional[dict[str, str]]: """Get GPU stats using rk.""" try: with open("/sys/kernel/debug/rkrga/load", "r") as f: @@ -403,7 +403,7 @@ def get_rockchip_gpu_stats() -> dict[str, str]: return {"gpu": average_load, "mem": "-"} -def get_rockchip_npu_stats() -> dict[str, str]: +def get_rockchip_npu_stats() -> Optional[dict[str, float | str]]: """Get NPU stats using rk.""" try: with open("/sys/kernel/debug/rknpu/load", "r") as f: @@ -494,7 +494,7 @@ def get_nvidia_gpu_stats() -> dict[int, dict]: return results -def get_jetson_stats() -> dict[int, dict]: +def get_jetson_stats() -> Optional[dict[int, dict]]: results = {} try: @@ -537,7 +537,7 @@ def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess: return sp.run(ffprobe_cmd, capture_output=True) -def get_nvidia_driver_info() -> dict[str, any]: +def get_nvidia_driver_info() -> dict[str, Any]: """Get general hardware info for nvidia GPU.""" results = {} try: @@ -596,8 +596,8 @@ def auto_detect_hwaccel() -> str: async def get_video_properties( ffmpeg, url: str, get_duration: bool = False -) -> dict[str, any]: - async def calculate_duration(video: Optional[any]) -> float: +) -> dict[str, Any]: + async def calculate_duration(video: Optional[Any]) -> float: duration = None if video is not None: diff --git a/frigate/video.py b/frigate/video.py index e6243295c..754c9b764 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -184,7 +184,7 @@ class CameraWatchdog(threading.Thread): self.capture_thread = None self.ffmpeg_detect_process = None self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") - self.ffmpeg_other_processes: list[dict[str, any]] = [] + self.ffmpeg_other_processes: list[dict[str, Any]] = [] self.camera_fps = camera_fps self.skipped_fps = skipped_fps self.ffmpeg_pid = ffmpeg_pid @@ -371,7 +371,9 @@ class CameraWatchdog(threading.Thread): p["logpipe"].close() self.ffmpeg_other_processes.clear() - def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int: + def get_latest_segment_datetime( + self, latest_segment: datetime.datetime + ) -> datetime.datetime: """Checks if ffmpeg is still writing recording segments to cache.""" cache_files = sorted( [ @@ -859,7 +861,7 @@ def process_frames( detections[obj["id"]] = {**obj, "attributes": []} # find the best object for each attribute to be assigned to - all_objects: list[dict[str, any]] = object_tracker.tracked_objects.values() + all_objects: list[dict[str, Any]] = object_tracker.tracked_objects.values() for attributes in attribute_detections.values(): for attribute in attributes: filtered_objects = filter( diff --git a/pyproject.toml b/pyproject.toml index a1d5cf00d..d17a60e72 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,3 @@ -[tool.ruff] +[tool.ruff.lint] ignore = ["E501","E711","E712"] -extend-select = ["I"] \ No newline at end of file +extend-select = ["I"]