Fix various typing issues (#18187)

* Fix the `Any` typing hint treewide

There has been confusion between the Any type[1] and the any function[2]
in typing hints.

[1] https://docs.python.org/3/library/typing.html#typing.Any
[2] https://docs.python.org/3/library/functions.html#any

* Fix typing for various frame_shape members

Frame shapes are most likely defined by height and width, so a single int
cannot express that.

* Wrap gpu stats functions in Optional[]

These can return `None`, so they need to be `Type | None`, which is what
`Optional` expresses very nicely.

* Fix return type in get_latest_segment_datetime

Returns a datetime object, not an integer.

* Make the return type of FrameManager.write optional

This is necessary since the SharedMemoryFrameManager.write function can
return None.

* Fix total_seconds() return type in get_tz_modifiers

The function returns a float, not an int.

https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds

* Account for floating point results in to_relative_box

Because the function uses division the return types may either be int or
float.

* Resolve ruff deprecation warning

The config has been split into formatter and linter, and the global
options are deprecated.
This commit is contained in:
Martin Weinelt 2025-05-13 16:27:20 +02:00 committed by GitHub
parent 2c9bfaa49c
commit 4d4d54d030
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
50 changed files with 191 additions and 164 deletions

View File

@ -1,5 +1,6 @@
import json
import sys
from typing import Any
from ruamel.yaml import YAML
@ -21,11 +22,11 @@ try:
raw_config = f.read()
if config_file.endswith((".yaml", ".yml")):
config: dict[str, any] = yaml.load(raw_config)
config: dict[str, Any] = yaml.load(raw_config)
elif config_file.endswith(".json"):
config: dict[str, any] = json.loads(raw_config)
config: dict[str, Any] = json.loads(raw_config)
except FileNotFoundError:
config: dict[str, any] = {}
config: dict[str, Any] = {}
path = config.get("ffmpeg", {}).get("path", "default")
if path == "default":

View File

@ -4,6 +4,7 @@ import json
import os
import sys
from pathlib import Path
from typing import Any
from ruamel.yaml import YAML
@ -37,13 +38,13 @@ try:
raw_config = f.read()
if config_file.endswith((".yaml", ".yml")):
config: dict[str, any] = yaml.load(raw_config)
config: dict[str, Any] = yaml.load(raw_config)
elif config_file.endswith(".json"):
config: dict[str, any] = json.loads(raw_config)
config: dict[str, Any] = json.loads(raw_config)
except FileNotFoundError:
config: dict[str, any] = {}
config: dict[str, Any] = {}
go2rtc_config: dict[str, any] = config.get("go2rtc", {})
go2rtc_config: dict[str, Any] = config.get("go2rtc", {})
# Need to enable CORS for go2rtc so the frigate integration / card work automatically
if go2rtc_config.get("api") is None:
@ -134,7 +135,7 @@ for name in go2rtc_config.get("streams", {}):
# add birdseye restream stream if enabled
if config.get("birdseye", {}).get("restream", False):
birdseye: dict[str, any] = config.get("birdseye")
birdseye: dict[str, Any] = config.get("birdseye")
input = f"-f rawvideo -pix_fmt yuv420p -video_size {birdseye.get('width', 1280)}x{birdseye.get('height', 720)} -r 10 -i {BIRDSEYE_PIPE}"
ffmpeg_cmd = f"exec:{parse_preset_hardware_acceleration_encode(ffmpeg_path, config.get('ffmpeg', {}).get('hwaccel_args', ''), input, '-rtsp_transport tcp -f rtsp {output}')}"

View File

@ -2,9 +2,10 @@
import json
import os
from typing import Any
base_path = os.environ.get("FRIGATE_BASE_PATH", "")
result: dict[str, any] = {"base_path": base_path}
result: dict[str, Any] = {"base_path": base_path}
print(json.dumps(result))

View File

@ -2,6 +2,7 @@
import json
import sys
from typing import Any
from ruamel.yaml import YAML
@ -19,12 +20,12 @@ try:
raw_config = f.read()
if config_file.endswith((".yaml", ".yml")):
config: dict[str, any] = yaml.load(raw_config)
config: dict[str, Any] = yaml.load(raw_config)
elif config_file.endswith(".json"):
config: dict[str, any] = json.loads(raw_config)
config: dict[str, Any] = json.loads(raw_config)
except FileNotFoundError:
config: dict[str, any] = {}
config: dict[str, Any] = {}
tls_config: dict[str, any] = config.get("tls", {"enabled": True})
tls_config: dict[str, Any] = config.get("tls", {"enabled": True})
print(json.dumps(tls_config))

View File

@ -131,7 +131,7 @@ def metrics(request: Request):
@router.get("/config")
def config(request: Request):
config_obj: FrigateConfig = request.app.frigate_config
config: dict[str, dict[str, any]] = config_obj.model_dump(
config: dict[str, dict[str, Any]] = config_obj.model_dump(
mode="json", warnings="none", exclude_none=True
)
@ -158,7 +158,7 @@ def config(request: Request):
camera_dict["zones"][zone_name]["color"] = zone.color
# remove go2rtc stream passwords
go2rtc: dict[str, any] = config_obj.go2rtc.model_dump(
go2rtc: dict[str, Any] = config_obj.go2rtc.model_dump(
mode="json", warnings="none", exclude_none=True
)
for stream_name, stream in go2rtc.get("streams", {}).items():
@ -648,7 +648,7 @@ def plusModels(request: Request, filterByCurrentModelDetector: bool = False):
status_code=400,
)
models: dict[any, any] = request.app.frigate_config.plus_api.get_models()
models: dict[Any, Any] = request.app.frigate_config.plus_api.get_models()
if not models["list"]:
return JSONResponse(
@ -801,7 +801,7 @@ def hourly_timeline(params: AppTimelineHourlyQueryParameters = Depends()):
count = 0
start = 0
end = 0
hours: dict[str, list[dict[str, any]]] = {}
hours: dict[str, list[dict[str, Any]]] = {}
for t in timeline:
if count == 0:

View File

@ -4,6 +4,7 @@ import datetime
import logging
import os
import shutil
from typing import Any
import cv2
from fastapi import APIRouter, Depends, Request, UploadFile
@ -58,7 +59,7 @@ def reclassify_face(request: Request, body: dict = None):
content={"message": "Face recognition is not enabled.", "success": False},
)
json: dict[str, any] = body or {}
json: dict[str, Any] = body or {}
training_file = os.path.join(
FACE_DIR, f"train/{sanitize_filename(json.get('training_file', ''))}"
)
@ -91,7 +92,7 @@ def train_face(request: Request, name: str, body: dict = None):
content={"message": "Face recognition is not enabled.", "success": False},
)
json: dict[str, any] = body or {}
json: dict[str, Any] = body or {}
training_file_name = sanitize_filename(json.get("training_file", ""))
training_file = os.path.join(FACE_DIR, f"train/{training_file_name}")
event_id = json.get("event_id")
@ -246,7 +247,7 @@ def deregister_faces(request: Request, name: str, body: dict = None):
content={"message": "Face recognition is not enabled.", "success": False},
)
json: dict[str, any] = body or {}
json: dict[str, Any] = body or {}
list_of_ids = json.get("ids", "")
context: EmbeddingsContext = request.app.embeddings

View File

@ -9,6 +9,7 @@ import subprocess as sp
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path as FilePath
from typing import Any
from urllib.parse import unquote
import cv2
@ -89,7 +90,7 @@ def imagestream(
camera_name: str,
fps: int,
height: int,
draw_options: dict[str, any],
draw_options: dict[str, Any],
):
while True:
# max out at specified FPS

View File

@ -2,6 +2,7 @@
import logging
import os
from typing import Any
from cryptography.hazmat.primitives import serialization
from fastapi import APIRouter, Request
@ -41,7 +42,7 @@ def register_notifications(request: Request, body: dict = None):
else:
username = "admin"
json: dict[str, any] = body or {}
json: dict[str, Any] = body or {}
sub = json.get("sub")
if not sub:

View File

@ -1,18 +1,18 @@
"""Manage camera activity and updating listeners."""
from collections import Counter
from typing import Callable
from typing import Any, Callable
from frigate.config.config import FrigateConfig
class CameraActivityManager:
def __init__(
self, config: FrigateConfig, publish: Callable[[str, any], None]
self, config: FrigateConfig, publish: Callable[[str, Any], None]
) -> None:
self.config = config
self.publish = publish
self.last_camera_activity: dict[str, dict[str, any]] = {}
self.last_camera_activity: dict[str, dict[str, Any]] = {}
self.camera_all_object_counts: dict[str, Counter] = {}
self.camera_active_object_counts: dict[str, Counter] = {}
self.zone_all_object_counts: dict[str, Counter] = {}
@ -39,8 +39,8 @@ class CameraActivityManager:
else camera_config.objects.track
)
def update_activity(self, new_activity: dict[str, dict[str, any]]) -> None:
all_objects: list[dict[str, any]] = []
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
all_objects: list[dict[str, Any]] = []
for camera in new_activity.keys():
new_objects = new_activity[camera].get("objects", [])
@ -93,7 +93,7 @@ class CameraActivityManager:
self.last_camera_activity = new_activity
def compare_camera_activity(
self, camera: str, new_activity: dict[str, any]
self, camera: str, new_activity: dict[str, Any]
) -> None:
all_objects = Counter(
obj["label"].replace("-verified", "") for obj in new_activity

View File

@ -239,7 +239,7 @@ class CameraState:
self,
frame_name: str,
frame_time: float,
current_detections: dict[str, dict[str, any]],
current_detections: dict[str, dict[str, Any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
@ -337,7 +337,7 @@ class CameraState:
# TODO: can i switch to looking this up and only changing when an event ends?
# maintain best objects
camera_activity: dict[str, list[any]] = {
camera_activity: dict[str, list[Any]] = {
"motion": len(motion_boxes) > 0,
"objects": [],
}

View File

@ -2,7 +2,7 @@
import multiprocessing as mp
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
from typing import Any, Optional
import zmq
@ -18,7 +18,7 @@ class ConfigPublisher:
self.socket.bind(SOCKET_PUB_SUB)
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: any) -> None:
def publish(self, topic: str, payload: Any) -> None:
"""There is no communication back to the processes."""
self.socket.send_string(topic, flags=zmq.SNDMORE)
self.socket.send_pyobj(payload)
@ -40,7 +40,7 @@ class ConfigSubscriber:
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
self.socket.connect(SOCKET_PUB_SUB)
def check_for_update(self) -> Optional[tuple[str, any]]:
def check_for_update(self) -> Optional[tuple[str, Any]]:
"""Returns updated config or None if no update."""
try:
topic = self.socket.recv_string(flags=zmq.NOBLOCK)

View File

@ -1,7 +1,7 @@
"""Facilitates communication between processes."""
from enum import Enum
from typing import Optional
from typing import Any, Optional
from .zmq_proxy import Publisher, Subscriber
@ -35,10 +35,10 @@ class DetectionSubscriber(Subscriber):
def check_for_update(
self, timeout: float = None
) -> Optional[tuple[DetectionTypeEnum, any]]:
) -> Optional[tuple[DetectionTypeEnum, Any]]:
return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: any) -> any:
def _return_object(self, topic: str, payload: Any) -> Any:
if payload is None:
return (None, None)
return (DetectionTypeEnum[topic[len(self.topic_base) :]], payload)

View File

@ -1,7 +1,7 @@
"""Facilitates communication between processes."""
from enum import Enum
from typing import Callable
from typing import Any, Callable
import zmq
@ -58,7 +58,7 @@ class EmbeddingsRequestor:
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(SOCKET_REP_REQ)
def send_data(self, topic: str, data: any) -> str:
def send_data(self, topic: str, data: Any) -> str:
"""Sends data and then waits for reply."""
try:
self.socket.send_json((topic, data))

View File

@ -2,6 +2,7 @@
import logging
from enum import Enum
from typing import Any
from .zmq_proxy import Publisher, Subscriber
@ -27,7 +28,7 @@ class EventMetadataPublisher(Publisher):
def __init__(self) -> None:
super().__init__()
def publish(self, topic: EventMetadataTypeEnum, payload: any) -> None:
def publish(self, topic: EventMetadataTypeEnum, payload: Any) -> None:
super().publish(payload, topic.value)

View File

@ -1,5 +1,7 @@
"""Facilitates communication between processes."""
from typing import Any
from frigate.events.types import EventStateEnum, EventTypeEnum
from .zmq_proxy import Publisher, Subscriber
@ -14,7 +16,7 @@ class EventUpdatePublisher(Publisher):
super().__init__("update")
def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, any]]
self, payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]]
) -> None:
super().publish(payload)
@ -37,7 +39,7 @@ class EventEndPublisher(Publisher):
super().__init__("finalized")
def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]
self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, Any]]
) -> None:
super().publish(payload)

View File

@ -3,7 +3,7 @@
import multiprocessing as mp
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
from typing import Any, Callable
import zmq
@ -63,7 +63,7 @@ class InterProcessRequestor:
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(SOCKET_REP_REQ)
def send_data(self, topic: str, data: any) -> any:
def send_data(self, topic: str, data: Any) -> Any:
"""Sends data and then waits for reply."""
try:
self.socket.send_json((topic, data))

View File

@ -2,7 +2,7 @@
import json
import threading
from typing import Optional
from typing import Any, Optional
import zmq
@ -58,7 +58,7 @@ class Publisher:
self.socket = self.context.socket(zmq.PUB)
self.socket.connect(SOCKET_PUB)
def publish(self, payload: any, sub_topic: str = "") -> None:
def publish(self, payload: Any, sub_topic: str = "") -> None:
"""Publish message."""
self.socket.send_string(f"{self.topic}{sub_topic} {json.dumps(payload)}")
@ -81,7 +81,7 @@ class Subscriber:
def check_for_update(
self, timeout: float = FAST_QUEUE_TIMEOUT
) -> Optional[tuple[str, any]]:
) -> Optional[tuple[str, Any]]:
"""Returns message or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
@ -98,5 +98,5 @@ class Subscriber:
self.socket.close()
self.context.destroy()
def _return_object(self, topic: str, payload: any) -> any:
def _return_object(self, topic: str, payload: Any) -> Any:
return payload

View File

@ -10,7 +10,7 @@ import random
import re
import string
from pathlib import Path
from typing import List, Optional, Tuple
from typing import Any, List, Optional, Tuple
import cv2
import numpy as np
@ -1181,7 +1181,7 @@ class LicensePlateProcessingMixin:
return event_id
def lpr_process(
self, obj_data: dict[str, any], frame: np.ndarray, dedicated_lpr: bool = False
self, obj_data: dict[str, Any], frame: np.ndarray, dedicated_lpr: bool = False
):
"""Look for license plates in image."""
self.metrics.alpr_pps.value = self.plates_rec_second.eps()
@ -1272,7 +1272,7 @@ class LicensePlateProcessingMixin:
)
return
license_plate: Optional[dict[str, any]] = None
license_plate: Optional[dict[str, Any]] = None
if "license_plate" not in self.config.cameras[camera].objects.track:
logger.debug(f"{camera}: Running manual license_plate detection.")
@ -1341,7 +1341,7 @@ class LicensePlateProcessingMixin:
return
if obj_data.get("label") in ["car", "motorcycle"]:
attributes: list[dict[str, any]] = obj_data.get(
attributes: list[dict[str, Any]] = obj_data.get(
"current_attributes", []
)
for attr in attributes:
@ -1567,7 +1567,7 @@ class LicensePlateProcessingMixin:
"last_seen": current_time if dedicated_lpr else None,
}
def handle_request(self, topic, request_data) -> dict[str, any] | None:
def handle_request(self, topic, request_data) -> dict[str, Any] | None:
return
def expire_object(self, object_id: str, camera: str):

View File

@ -2,6 +2,7 @@
import logging
from abc import ABC, abstractmethod
from typing import Any
from frigate.config import FrigateConfig
@ -25,7 +26,7 @@ class PostProcessorApi(ABC):
@abstractmethod
def process_data(
self, data: dict[str, any], data_type: PostProcessDataEnum
self, data: dict[str, Any], data_type: PostProcessDataEnum
) -> None:
"""Processes the data of data type.
Args:
@ -38,7 +39,7 @@ class PostProcessorApi(ABC):
pass
@abstractmethod
def handle_request(self, request_data: dict[str, any]) -> dict[str, any] | None:
def handle_request(self, request_data: dict[str, Any]) -> dict[str, Any] | None:
"""Handle metadata requests.
Args:
request_data (dict): containing data about requested change to process.

View File

@ -2,6 +2,7 @@
import datetime
import logging
from typing import Any
import cv2
import numpy as np
@ -36,7 +37,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi):
sub_label_publisher: EventMetadataPublisher,
metrics: DataProcessorMetrics,
model_runner: LicensePlateModelRunner,
detected_license_plates: dict[str, dict[str, any]],
detected_license_plates: dict[str, dict[str, Any]],
):
self.requestor = requestor
self.detected_license_plates = detected_license_plates
@ -47,7 +48,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi):
super().__init__(config, metrics, model_runner)
def process_data(
self, data: dict[str, any], data_type: PostProcessDataEnum
self, data: dict[str, Any], data_type: PostProcessDataEnum
) -> None:
"""Look for license plates in recording stream image
Args:
@ -214,7 +215,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi):
logger.debug(f"Post processing plate: {event_id}, {frame_time}")
self.lpr_process(keyframe_obj_data, frame)
def handle_request(self, topic, request_data) -> dict[str, any] | None:
def handle_request(self, topic, request_data) -> dict[str, Any] | None:
if topic == EmbeddingsRequestEnum.reprocess_plate.value:
event = request_data["event"]

View File

@ -2,6 +2,7 @@
import logging
from abc import ABC, abstractmethod
from typing import Any
import numpy as np
@ -24,7 +25,7 @@ class RealTimeProcessorApi(ABC):
pass
@abstractmethod
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None:
def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray) -> None:
"""Processes the frame with object data.
Args:
obj_data (dict): containing data about focused object in frame.
@ -37,8 +38,8 @@ class RealTimeProcessorApi(ABC):
@abstractmethod
def handle_request(
self, topic: str, request_data: dict[str, any]
) -> dict[str, any] | None:
self, topic: str, request_data: dict[str, Any]
) -> dict[str, Any] | None:
"""Handle metadata requests.
Args:
topic (str): topic that dictates what work is requested.

View File

@ -2,6 +2,7 @@
import logging
import os
from typing import Any
import cv2
import numpy as np
@ -35,8 +36,8 @@ class BirdRealTimeProcessor(RealTimeProcessorApi):
super().__init__(config, metrics)
self.interpreter: Interpreter = None
self.sub_label_publisher = sub_label_publisher
self.tensor_input_details: dict[str, any] = None
self.tensor_output_details: dict[str, any] = None
self.tensor_input_details: dict[str, Any] = None
self.tensor_output_details: dict[str, Any] = None
self.detected_birds: dict[str, float] = {}
self.labelmap: dict[int, str] = {}

View File

@ -6,7 +6,7 @@ import json
import logging
import os
import shutil
from typing import Optional
from typing import Any, Optional
import cv2
import numpy as np
@ -157,7 +157,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.faces_per_second.update()
self.inference_speed.update(duration)
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray):
def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray):
"""Look for faces in image."""
self.metrics.face_rec_fps.value = self.faces_per_second.eps()
camera = obj_data["camera"]
@ -198,7 +198,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
logger.debug("Not processing due to hitting max rec attempts.")
return
face: Optional[dict[str, any]] = None
face: Optional[dict[str, Any]] = None
if self.requires_face_detection:
logger.debug("Running manual face detection.")
@ -238,7 +238,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
attributes: list[dict[str, Any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "face":
continue
@ -323,7 +323,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.__update_metrics(datetime.datetime.now().timestamp() - start)
def handle_request(self, topic, request_data) -> dict[str, any] | None:
def handle_request(self, topic, request_data) -> dict[str, Any] | None:
if topic == EmbeddingsRequestEnum.clear_face_classifier.value:
self.recognizer.clear()
elif topic == EmbeddingsRequestEnum.recognize_face.value:

View File

@ -2,6 +2,7 @@
import json
import logging
from typing import Any
import numpy as np
@ -30,7 +31,7 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess
sub_label_publisher: EventMetadataPublisher,
metrics: DataProcessorMetrics,
model_runner: LicensePlateModelRunner,
detected_license_plates: dict[str, dict[str, any]],
detected_license_plates: dict[str, dict[str, Any]],
):
self.requestor = requestor
self.detected_license_plates = detected_license_plates
@ -43,14 +44,14 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess
def process_frame(
self,
obj_data: dict[str, any],
obj_data: dict[str, Any],
frame: np.ndarray,
dedicated_lpr: bool | None = False,
):
"""Look for license plates in image."""
self.lpr_process(obj_data, frame, dedicated_lpr)
def handle_request(self, topic, request_data) -> dict[str, any] | None:
def handle_request(self, topic, request_data) -> dict[str, Any] | None:
return
def expire_object(self, object_id: str, camera: str):

View File

@ -3,7 +3,7 @@ import json
import logging
import os
from enum import Enum
from typing import Dict, Optional, Tuple
from typing import Any, Dict, Optional, Tuple
import requests
from pydantic import BaseModel, ConfigDict, Field
@ -147,7 +147,7 @@ class ModelConfig(BaseModel):
json.dump(model_info, f)
else:
with open(model_info_path, "r") as f:
model_info: dict[str, any] = json.load(f)
model_info: dict[str, Any] = json.load(f)
if detector and detector not in model_info["supportedDetectors"]:
raise ValueError(f"Model does not support detector type of {detector}")

View File

@ -9,7 +9,7 @@ import re
import signal
import threading
from types import FrameType
from typing import Optional, Union
from typing import Any, Optional, Union
from pathvalidate import ValidationError, sanitize_filename
from setproctitle import setproctitle
@ -190,7 +190,7 @@ class EmbeddingsContext:
return results
def register_face(self, face_name: str, image_data: bytes) -> dict[str, any]:
def register_face(self, face_name: str, image_data: bytes) -> dict[str, Any]:
return self.requestor.send_data(
EmbeddingsRequestEnum.register_face.value,
{
@ -199,7 +199,7 @@ class EmbeddingsContext:
},
)
def recognize_face(self, image_data: bytes) -> dict[str, any]:
def recognize_face(self, image_data: bytes) -> dict[str, Any]:
return self.requestor.send_data(
EmbeddingsRequestEnum.recognize_face.value,
{
@ -217,7 +217,7 @@ class EmbeddingsContext:
return self.db.execute_sql(sql_query).fetchall()
def reprocess_face(self, face_file: str) -> dict[str, any]:
def reprocess_face(self, face_file: str) -> dict[str, Any]:
return self.requestor.send_data(
EmbeddingsRequestEnum.reprocess_face.value, {"image_file": face_file}
)
@ -284,10 +284,10 @@ class EmbeddingsContext:
{"id": event_id, "description": description},
)
def reprocess_plate(self, event: dict[str, any]) -> dict[str, any]:
def reprocess_plate(self, event: dict[str, Any]) -> dict[str, Any]:
return self.requestor.send_data(
EmbeddingsRequestEnum.reprocess_plate.value, {"event": event}
)
def reindex_embeddings(self) -> dict[str, any]:
def reindex_embeddings(self) -> dict[str, Any]:
return self.requestor.send_data(EmbeddingsRequestEnum.reindex.value, {})

View File

@ -7,7 +7,7 @@ import os
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Optional
from typing import Any, Optional
import cv2
import numpy as np
@ -104,7 +104,7 @@ class EmbeddingMaintainer(threading.Thread):
self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager()
self.detected_license_plates: dict[str, dict[str, any]] = {}
self.detected_license_plates: dict[str, dict[str, Any]] = {}
# model runners to share between realtime and post processors
if self.config.lpr.enabled:
@ -159,7 +159,7 @@ class EmbeddingMaintainer(threading.Thread):
)
self.stop_event = stop_event
self.tracked_events: dict[str, list[any]] = {}
self.tracked_events: dict[str, list[Any]] = {}
self.early_request_sent: dict[str, bool] = {}
self.genai_client = get_genai_client(config)
@ -190,7 +190,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_requests(self) -> None:
"""Process embeddings requests"""
def _handle_request(topic: str, data: dict[str, any]) -> str:
def _handle_request(topic: str, data: dict[str, Any]) -> str:
try:
# First handle the embedding-specific topics when semantic search is enabled
if self.config.semantic_search.enabled:

View File

@ -5,6 +5,7 @@ import os
from abc import ABC, abstractmethod
from enum import Enum
from io import BytesIO
from typing import Any
import numpy as np
import requests
@ -59,7 +60,7 @@ class BaseEmbedding(ABC):
pass
@abstractmethod
def _preprocess_inputs(self, raw_inputs: any) -> any:
def _preprocess_inputs(self, raw_inputs: Any) -> Any:
pass
def _process_image(self, image, output: str = "RGB") -> Image.Image:
@ -74,7 +75,7 @@ class BaseEmbedding(ABC):
return image
def _postprocess_outputs(self, outputs: any) -> any:
def _postprocess_outputs(self, outputs: Any) -> Any:
return outputs
def __call__(
@ -84,7 +85,7 @@ class BaseEmbedding(ABC):
processed = self._preprocess_inputs(inputs)
input_names = self.runner.get_input_names()
onnx_inputs = {name: [] for name in input_names}
input: dict[str, any]
input: dict[str, Any]
for input in processed:
for key, value in input.items():
if key in input_names:

View File

@ -6,7 +6,7 @@ import random
import string
import threading
import time
from typing import Tuple
from typing import Any, Tuple
import numpy as np
@ -126,7 +126,7 @@ class AudioEventMaintainer(threading.Thread):
self.config = camera
self.camera_metrics = camera_metrics
self.detections: dict[dict[str, any]] = {}
self.detections: dict[dict[str, Any]] = {}
self.stop_event = stop_event
self.detector = AudioTfl(stop_event, self.config.audio.num_threads)
self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),)

View File

@ -6,6 +6,7 @@ import os
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR
@ -29,7 +30,7 @@ class EventCleanup(threading.Thread):
self.db = db
self.camera_keys = list(self.config.cameras.keys())
self.removed_camera_labels: list[str] = None
self.camera_labels: dict[str, dict[str, any]] = {}
self.camera_labels: dict[str, dict[str, Any]] = {}
def get_removed_camera_labels(self) -> list[Event]:
"""Get a list of distinct labels for removed cameras."""

View File

@ -10,7 +10,7 @@ import queue
import subprocess as sp
import threading
import traceback
from typing import Optional
from typing import Any, Optional
import cv2
import numpy as np
@ -542,10 +542,10 @@ class BirdsEyeFrameManager:
self,
cameras_to_add: list[str],
coefficient: float,
) -> tuple[any]:
) -> tuple[Any]:
"""Calculate the optimal layout for 2+ cameras."""
def map_layout(camera_layout: list[list[any]], row_height: int):
def map_layout(camera_layout: list[list[Any]], row_height: int):
"""Map the calculated layout."""
candidate_layout = []
starting_x = 0
@ -588,7 +588,7 @@ class BirdsEyeFrameManager:
return max_width, y, candidate_layout
canvas_aspect_x, canvas_aspect_y = self.canvas.get_aspect(coefficient)
camera_layout: list[list[any]] = []
camera_layout: list[list[Any]] = []
camera_layout.append([])
starting_x = 0
x = starting_x
@ -786,7 +786,7 @@ class Birdseye:
def write_data(
self,
camera: str,
current_tracked_objects: list[dict[str, any]],
current_tracked_objects: list[dict[str, Any]],
motion_boxes: list[list[int]],
frame_time: float,
frame: np.ndarray,

View File

@ -8,6 +8,7 @@ import subprocess as sp
import threading
import time
from pathlib import Path
from typing import Any
import cv2
import numpy as np
@ -255,7 +256,7 @@ class PreviewRecorder:
def should_write_frame(
self,
current_tracked_objects: list[dict[str, any]],
current_tracked_objects: list[dict[str, Any]],
motion_boxes: list[list[int]],
frame_time: float,
) -> bool:
@ -315,7 +316,7 @@ class PreviewRecorder:
def write_data(
self,
current_tracked_objects: list[dict[str, any]],
current_tracked_objects: list[dict[str, Any]],
motion_boxes: list[list[int]],
frame_time: float,
frame: np.ndarray,

View File

@ -7,6 +7,7 @@ import threading
import time
from collections import deque
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
import cv2
import numpy as np
@ -59,7 +60,7 @@ class PtzMotionEstimator:
def motion_estimator(
self,
detections: list[dict[str, any]],
detections: list[dict[str, Any]],
frame_name: str,
frame_time: float,
camera: str,

View File

@ -7,6 +7,7 @@ import time
from enum import Enum
from importlib.util import find_spec
from pathlib import Path
from typing import Any
import numpy
from onvif import ONVIFCamera, ONVIFError, ONVIFService
@ -646,7 +647,7 @@ class OnvifController:
f"Error executing command {command} for camera {camera_name}: {e}"
)
async def get_camera_info(self, camera_name: str) -> dict[str, any]:
async def get_camera_info(self, camera_name: str) -> dict[str, Any]:
"""
Get ptz capabilities and presets, attempting to reconnect if ONVIF is configured
but not initialized.

View File

@ -242,7 +242,7 @@ class RecordingMaintainer(threading.Thread):
self.end_time_cache.pop(cache_path, None)
async def validate_and_move_segment(
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, any]
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any]
) -> None:
cache_path: str = recording["cache_path"]
start_time: datetime.datetime = recording["start_time"]

View File

@ -10,7 +10,7 @@ import sys
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Optional
from typing import Any, Optional
import cv2
import numpy as np
@ -156,7 +156,7 @@ class ReviewSegmentMaintainer(threading.Thread):
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
# manual events
self.indefinite_events: dict[str, dict[str, any]] = {}
self.indefinite_events: dict[str, dict[str, Any]] = {}
# ensure dirs
Path(os.path.join(CLIPS_DIR, "review")).mkdir(exist_ok=True)
@ -194,7 +194,7 @@ class ReviewSegmentMaintainer(threading.Thread):
camera_config: CameraConfig,
frame,
objects: list[TrackedObject],
prev_data: dict[str, any],
prev_data: dict[str, Any],
) -> None:
"""Update segment."""
if frame is not None:
@ -219,7 +219,7 @@ class ReviewSegmentMaintainer(threading.Thread):
def _publish_segment_end(
self,
segment: PendingReviewSegment,
prev_data: dict[str, any],
prev_data: dict[str, Any],
) -> None:
"""End segment."""
final_data = segment.get_data(ended=True)

View File

@ -6,7 +6,7 @@ import logging
import threading
import time
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
from typing import Any, Optional
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
@ -33,12 +33,12 @@ class StatsEmitter(threading.Thread):
self.stats_tracking = stats_tracking
self.stop_event = stop_event
self.hwaccel_errors: list[str] = []
self.stats_history: list[dict[str, any]] = []
self.stats_history: list[dict[str, Any]] = []
# create communication for stats
self.requestor = InterProcessRequestor()
def get_latest_stats(self) -> dict[str, any]:
def get_latest_stats(self) -> dict[str, Any]:
"""Get latest stats."""
if len(self.stats_history) > 0:
return self.stats_history[-1]
@ -51,12 +51,12 @@ class StatsEmitter(threading.Thread):
def get_stats_history(
self, keys: Optional[list[str]] = None
) -> list[dict[str, any]]:
) -> list[dict[str, Any]]:
"""Get stats history."""
if not keys:
return self.stats_history
selected_stats: list[dict[str, any]] = []
selected_stats: list[dict[str, Any]] = []
for s in self.stats_history:
selected = {}

View File

@ -5,6 +5,7 @@ import queue
import threading
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import FrigateConfig
from frigate.events.maintainer import EventStateEnum, EventTypeEnum
@ -27,7 +28,7 @@ class TimelineProcessor(threading.Thread):
self.config = config
self.queue = queue
self.stop_event = stop_event
self.pre_event_cache: dict[str, list[dict[str, any]]] = {}
self.pre_event_cache: dict[str, list[dict[str, Any]]] = {}
def run(self) -> None:
while not self.stop_event.is_set():
@ -55,9 +56,9 @@ class TimelineProcessor(threading.Thread):
def insert_or_save(
self,
entry: dict[str, any],
prev_event_data: dict[any, any],
event_data: dict[any, any],
entry: dict[str, Any],
prev_event_data: dict[Any, Any],
event_data: dict[Any, Any],
) -> None:
"""Insert into db or cache."""
id = entry[Timeline.source_id]
@ -81,8 +82,8 @@ class TimelineProcessor(threading.Thread):
self,
camera: str,
event_type: str,
prev_event_data: dict[any, any],
event_data: dict[any, any],
prev_event_data: dict[Any, Any],
event_data: dict[Any, Any],
) -> bool:
"""Handle object detection."""
save = False
@ -153,7 +154,7 @@ class TimelineProcessor(threading.Thread):
self,
camera: str,
event_type: str,
event_data: dict[any, any],
event_data: dict[Any, Any],
) -> bool:
if event_type != "new":
return False

View File

@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any
from frigate.config import DetectConfig
@ -10,6 +11,6 @@ class ObjectTracker(ABC):
@abstractmethod
def match_and_update(
self, frame_name: str, frame_time: float, detections: list[dict[str, any]]
self, frame_name: str, frame_time: float, detections: list[dict[str, Any]]
) -> None:
pass

View File

@ -1,7 +1,7 @@
import logging
import random
import string
from typing import Sequence
from typing import Any, Sequence
import cv2
import numpy as np
@ -460,7 +460,7 @@ class NorfairTracker(ObjectTracker):
self.match_and_update(frame_name, frame_time, detections=detections)
def match_and_update(
self, frame_name: str, frame_time: float, detections: list[dict[str, any]]
self, frame_name: str, frame_time: float, detections: list[dict[str, Any]]
):
# Group detections by object type
detections_by_type = {}

View File

@ -7,6 +7,7 @@ import threading
from collections import defaultdict
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
import cv2
import numpy as np
@ -70,7 +71,7 @@ class TrackedObjectProcessor(threading.Thread):
self.event_end_subscriber = EventEndSubscriber()
self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all)
self.camera_activity: dict[str, dict[str, any]] = {}
self.camera_activity: dict[str, dict[str, Any]] = {}
self.ongoing_manual_events: dict[str, str] = {}
# {
@ -301,7 +302,7 @@ class TrackedObjectProcessor(threading.Thread):
return {}
def get_current_frame(
self, camera: str, draw_options: dict[str, any] = {}
self, camera: str, draw_options: dict[str, Any] = {}
) -> np.ndarray | None:
if camera == "birdseye":
return self.frame_manager.get(

View File

@ -5,7 +5,7 @@ import math
import os
from collections import defaultdict
from statistics import median
from typing import Optional
from typing import Any, Optional
import cv2
import numpy as np
@ -38,7 +38,7 @@ class TrackedObject:
camera_config: CameraConfig,
ui_config: UIConfig,
frame_cache,
obj_data: dict[str, any],
obj_data: dict[str, Any],
):
# set the score history then remove as it is not part of object state
self.score_history = obj_data["score_history"]
@ -621,7 +621,7 @@ class TrackedObjectAttribute:
self.ratio = raw_data[4]
self.region = raw_data[5]
def get_tracking_data(self) -> dict[str, any]:
def get_tracking_data(self) -> dict[str, Any]:
"""Return data saved to the object."""
return {
"label": self.label,
@ -629,7 +629,7 @@ class TrackedObjectAttribute:
"box": self.box,
}
def find_best_object(self, objects: list[dict[str, any]]) -> Optional[str]:
def find_best_object(self, objects: list[dict[str, Any]]) -> Optional[str]:
"""Find the best attribute for each object and return its ID."""
best_object_area = None
best_object_id = None

View File

@ -156,7 +156,7 @@ def load_labels(path: Optional[str], encoding="utf-8", prefill=91):
return labels
def get_tz_modifiers(tz_name: str) -> Tuple[str, str, int]:
def get_tz_modifiers(tz_name: str) -> Tuple[str, str, float]:
seconds_offset = (
datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds()
)
@ -169,7 +169,7 @@ def get_tz_modifiers(tz_name: str) -> Tuple[str, str, int]:
def to_relative_box(
width: int, height: int, box: Tuple[int, int, int, int]
) -> Tuple[int, int, int, int]:
) -> Tuple[int | float, int | float, int | float, int | float]:
return (
box[0] / width, # x
box[1] / height, # y

View File

@ -4,7 +4,7 @@ import asyncio
import logging
import os
import shutil
from typing import Optional, Union
from typing import Any, Optional, Union
from ruamel.yaml import YAML
@ -37,7 +37,7 @@ def migrate_frigate_config(config_file: str):
yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2)
with open(config_file, "r") as f:
config: dict[str, dict[str, any]] = yaml.load(f)
config: dict[str, dict[str, Any]] = yaml.load(f)
if config is None:
logger.error(f"Failed to load config at {config_file}")
@ -94,7 +94,7 @@ def migrate_frigate_config(config_file: str):
logger.info("Finished frigate config migration...")
def migrate_014(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
def migrate_014(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""Handle migrating frigate config to 0.14"""
# migrate record.events.required_zones to review.alerts.required_zones
new_config = config.copy()
@ -142,7 +142,7 @@ def migrate_014(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
del new_config["rtmp"]
for name, camera in config.get("cameras", {}).items():
camera_config: dict[str, dict[str, any]] = camera.copy()
camera_config: dict[str, dict[str, Any]] = camera.copy()
required_zones = (
camera_config.get("record", {}).get("events", {}).get("required_zones", [])
)
@ -181,7 +181,7 @@ def migrate_014(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
return new_config
def migrate_015_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
def migrate_015_0(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""Handle migrating frigate config to 0.15-0"""
new_config = config.copy()
@ -232,9 +232,9 @@ def migrate_015_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]
del new_config["record"]["events"]
for name, camera in config.get("cameras", {}).items():
camera_config: dict[str, dict[str, any]] = camera.copy()
camera_config: dict[str, dict[str, Any]] = camera.copy()
record_events: dict[str, any] = camera_config.get("record", {}).get("events")
record_events: dict[str, Any] = camera_config.get("record", {}).get("events")
if record_events:
alerts_retention = {"retain": {}}
@ -281,7 +281,7 @@ def migrate_015_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]
return new_config
def migrate_015_1(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
def migrate_015_1(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""Handle migrating frigate config to 0.15-1"""
new_config = config.copy()
@ -296,7 +296,7 @@ def migrate_015_1(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]
return new_config
def migrate_016_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
def migrate_016_0(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""Handle migrating frigate config to 0.16-0"""
new_config = config.copy()
@ -307,7 +307,7 @@ def migrate_016_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]
new_config["detect"] = detect_config
for name, camera in config.get("cameras", {}).items():
camera_config: dict[str, dict[str, any]] = camera.copy()
camera_config: dict[str, dict[str, Any]] = camera.copy()
live_config = camera_config.get("live", {})
if "stream_name" in live_config:

View File

@ -8,7 +8,7 @@ from abc import ABC, abstractmethod
from multiprocessing import resource_tracker as _mprt
from multiprocessing import shared_memory as _mpshm
from string import printable
from typing import AnyStr, Optional
from typing import Any, AnyStr, Optional
import cv2
import numpy as np
@ -766,7 +766,7 @@ class FrameManager(ABC):
pass
@abstractmethod
def write(self, name: str) -> memoryview:
def write(self, name: str) -> Optional[memoryview]:
pass
@abstractmethod
@ -847,7 +847,7 @@ class SharedMemoryFrameManager(FrameManager):
self.shm_store[name] = shm
return shm.buf
def write(self, name: str) -> memoryview:
def write(self, name: str) -> Optional[memoryview]:
try:
if name in self.shm_store:
shm = self.shm_store[name]
@ -944,7 +944,7 @@ def get_image_from_recording(
relative_frame_time: float,
codec: str,
height: Optional[int] = None,
) -> Optional[any]:
) -> Optional[Any]:
"""retrieve a frame from given time in recording file."""
ffmpeg_cmd = [

View File

@ -2,6 +2,7 @@
import logging
import os
from typing import Any
import cv2
import numpy as np
@ -284,7 +285,7 @@ def post_process_yolox(
def get_ort_providers(
force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
) -> tuple[list[str], list[dict[str, any]]]:
) -> tuple[list[str], list[dict[str, Any]]]:
if force_cpu:
return (
["CPUExecutionProvider"],

View File

@ -4,6 +4,7 @@ import datetime
import logging
import math
from collections import defaultdict
from typing import Any
import cv2
import numpy as np
@ -38,7 +39,7 @@ def get_camera_regions_grid(
name: str,
detect: DetectConfig,
min_region_size: int,
) -> list[list[dict[str, any]]]:
) -> list[list[dict[str, Any]]]:
"""Build a grid of expected region sizes for a camera."""
# get grid from db if available
try:
@ -163,10 +164,10 @@ def get_cluster_region_from_grid(frame_shape, min_region, cluster, boxes, region
def get_region_from_grid(
frame_shape: tuple[int],
frame_shape: tuple[int, int],
cluster: list[int],
min_region: int,
region_grid: list[list[dict[str, any]]],
region_grid: list[list[dict[str, Any]]],
) -> list[int]:
"""Get a region for a box based on the region grid."""
box = calculate_region(
@ -446,9 +447,9 @@ def get_cluster_region(frame_shape, min_region, cluster, boxes):
def get_startup_regions(
frame_shape: tuple[int],
frame_shape: tuple[int, int],
region_min_size: int,
region_grid: list[list[dict[str, any]]],
region_grid: list[list[dict[str, Any]]],
) -> list[list[int]]:
"""Get a list of regions to run on startup."""
# return 8 most popular regions for the camera
@ -480,12 +481,12 @@ def get_startup_regions(
def reduce_detections(
frame_shape: tuple[int],
all_detections: list[tuple[any]],
) -> list[tuple[any]]:
frame_shape: tuple[int, int],
all_detections: list[tuple[Any]],
) -> list[tuple[Any]]:
"""Take a list of detections and reduce overlaps to create a list of confident detections."""
def reduce_overlapping_detections(detections: list[tuple[any]]) -> list[tuple[any]]:
def reduce_overlapping_detections(detections: list[tuple[Any]]) -> list[tuple[Any]]:
"""apply non-maxima suppression to suppress weak, overlapping bounding boxes."""
detected_object_groups = defaultdict(lambda: [])
for detection in detections:
@ -524,7 +525,7 @@ def reduce_detections(
# set the detections list to only include top objects
return selected_objects
def get_consolidated_object_detections(detections: list[tuple[any]]):
def get_consolidated_object_detections(detections: list[tuple[Any]]):
"""Drop detections that overlap too much."""
detected_object_groups = defaultdict(lambda: [])
for detection in detections:

View File

@ -9,7 +9,7 @@ import signal
import subprocess as sp
import traceback
from datetime import datetime
from typing import List, Optional, Tuple
from typing import Any, List, Optional, Tuple
import cv2
import psutil
@ -230,7 +230,7 @@ def is_vaapi_amd_driver() -> bool:
return any("AMD Radeon Graphics" in line for line in output)
def get_amd_gpu_stats() -> dict[str, str]:
def get_amd_gpu_stats() -> Optional[dict[str, str]]:
"""Get stats using radeontop."""
radeontop_command = ["radeontop", "-d", "-", "-l", "1"]
@ -256,7 +256,7 @@ def get_amd_gpu_stats() -> dict[str, str]:
return results
def get_intel_gpu_stats(sriov: bool) -> dict[str, str]:
def get_intel_gpu_stats(sriov: bool) -> Optional[dict[str, str]]:
"""Get stats using intel_gpu_top."""
def get_stats_manually(output: str) -> dict[str, str]:
@ -382,7 +382,7 @@ def get_intel_gpu_stats(sriov: bool) -> dict[str, str]:
return results
def get_rockchip_gpu_stats() -> dict[str, str]:
def get_rockchip_gpu_stats() -> Optional[dict[str, str]]:
"""Get GPU stats using rk."""
try:
with open("/sys/kernel/debug/rkrga/load", "r") as f:
@ -403,7 +403,7 @@ def get_rockchip_gpu_stats() -> dict[str, str]:
return {"gpu": average_load, "mem": "-"}
def get_rockchip_npu_stats() -> dict[str, str]:
def get_rockchip_npu_stats() -> Optional[dict[str, float | str]]:
"""Get NPU stats using rk."""
try:
with open("/sys/kernel/debug/rknpu/load", "r") as f:
@ -494,7 +494,7 @@ def get_nvidia_gpu_stats() -> dict[int, dict]:
return results
def get_jetson_stats() -> dict[int, dict]:
def get_jetson_stats() -> Optional[dict[int, dict]]:
results = {}
try:
@ -537,7 +537,7 @@ def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
return sp.run(ffprobe_cmd, capture_output=True)
def get_nvidia_driver_info() -> dict[str, any]:
def get_nvidia_driver_info() -> dict[str, Any]:
"""Get general hardware info for nvidia GPU."""
results = {}
try:
@ -596,8 +596,8 @@ def auto_detect_hwaccel() -> str:
async def get_video_properties(
ffmpeg, url: str, get_duration: bool = False
) -> dict[str, any]:
async def calculate_duration(video: Optional[any]) -> float:
) -> dict[str, Any]:
async def calculate_duration(video: Optional[Any]) -> float:
duration = None
if video is not None:

View File

@ -184,7 +184,7 @@ class CameraWatchdog(threading.Thread):
self.capture_thread = None
self.ffmpeg_detect_process = None
self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect")
self.ffmpeg_other_processes: list[dict[str, any]] = []
self.ffmpeg_other_processes: list[dict[str, Any]] = []
self.camera_fps = camera_fps
self.skipped_fps = skipped_fps
self.ffmpeg_pid = ffmpeg_pid
@ -371,7 +371,9 @@ class CameraWatchdog(threading.Thread):
p["logpipe"].close()
self.ffmpeg_other_processes.clear()
def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int:
def get_latest_segment_datetime(
self, latest_segment: datetime.datetime
) -> datetime.datetime:
"""Checks if ffmpeg is still writing recording segments to cache."""
cache_files = sorted(
[
@ -859,7 +861,7 @@ def process_frames(
detections[obj["id"]] = {**obj, "attributes": []}
# find the best object for each attribute to be assigned to
all_objects: list[dict[str, any]] = object_tracker.tracked_objects.values()
all_objects: list[dict[str, Any]] = object_tracker.tracked_objects.values()
for attributes in attribute_detections.values():
for attribute in attributes:
filtered_objects = filter(

View File

@ -1,3 +1,3 @@
[tool.ruff]
[tool.ruff.lint]
ignore = ["E501","E711","E712"]
extend-select = ["I"]
extend-select = ["I"]