mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-04-19 23:08:08 +02:00
More mypy cleanup (#22658)
* Halfway point for fixing data processing * Fix mixin types missing * Cleanup LPR mypy * Cleanup audio mypy * Cleanup bird mypy * Cleanup mypy for custom classification * remove whisper * Fix DB typing * Cleanup events mypy * Clenaup * fix type evaluation * Cleanup * Fix broken imports
This commit is contained in:
@@ -53,7 +53,7 @@ class AudioTranscriptionModelRunner:
|
||||
self.downloader = ModelDownloader(
|
||||
model_name="sherpa-onnx",
|
||||
download_path=download_path,
|
||||
file_names=self.model_files.keys(),
|
||||
file_names=list(self.model_files.keys()),
|
||||
download_func=self.__download_models,
|
||||
)
|
||||
self.downloader.ensure_model_files()
|
||||
|
||||
@@ -21,7 +21,7 @@ class FaceRecognizer(ABC):
|
||||
|
||||
def __init__(self, config: FrigateConfig) -> None:
|
||||
self.config = config
|
||||
self.landmark_detector: cv2.face.FacemarkLBF = None
|
||||
self.landmark_detector: cv2.face.Facemark | None = None
|
||||
self.init_landmark_detector()
|
||||
|
||||
@abstractmethod
|
||||
@@ -38,13 +38,14 @@ class FaceRecognizer(ABC):
|
||||
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
|
||||
pass
|
||||
|
||||
@redirect_output_to_logger(logger, logging.DEBUG)
|
||||
@redirect_output_to_logger(logger, logging.DEBUG) # type: ignore[misc]
|
||||
def init_landmark_detector(self) -> None:
|
||||
landmark_model = os.path.join(MODEL_CACHE_DIR, "facedet/landmarkdet.yaml")
|
||||
|
||||
if os.path.exists(landmark_model):
|
||||
self.landmark_detector = cv2.face.createFacemarkLBF()
|
||||
self.landmark_detector.loadModel(landmark_model)
|
||||
landmark_detector = cv2.face.createFacemarkLBF()
|
||||
landmark_detector.loadModel(landmark_model)
|
||||
self.landmark_detector = landmark_detector
|
||||
|
||||
def align_face(
|
||||
self,
|
||||
@@ -52,8 +53,10 @@ class FaceRecognizer(ABC):
|
||||
output_width: int,
|
||||
output_height: int,
|
||||
) -> np.ndarray:
|
||||
# landmark is run on grayscale images
|
||||
if not self.landmark_detector:
|
||||
raise ValueError("Landmark detector not initialized")
|
||||
|
||||
# landmark is run on grayscale images
|
||||
if image.ndim == 3:
|
||||
land_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
@@ -131,8 +134,11 @@ class FaceRecognizer(ABC):
|
||||
|
||||
|
||||
def similarity_to_confidence(
|
||||
cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12
|
||||
):
|
||||
cosine_similarity: float,
|
||||
median: float = 0.3,
|
||||
range_width: float = 0.6,
|
||||
slope_factor: float = 12,
|
||||
) -> float:
|
||||
"""
|
||||
Default sigmoid function to map cosine similarity to confidence.
|
||||
|
||||
@@ -151,14 +157,14 @@ def similarity_to_confidence(
|
||||
bias = median
|
||||
|
||||
# Calculate confidence
|
||||
confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias)))
|
||||
confidence: float = 1 / (1 + np.exp(-slope * (cosine_similarity - bias)))
|
||||
return confidence
|
||||
|
||||
|
||||
class FaceNetRecognizer(FaceRecognizer):
|
||||
def __init__(self, config: FrigateConfig):
|
||||
super().__init__(config)
|
||||
self.mean_embs: dict[int, np.ndarray] = {}
|
||||
self.mean_embs: dict[str, np.ndarray] = {}
|
||||
self.face_embedder: FaceNetEmbedding = FaceNetEmbedding()
|
||||
self.model_builder_queue: queue.Queue | None = None
|
||||
|
||||
@@ -168,7 +174,7 @@ class FaceNetRecognizer(FaceRecognizer):
|
||||
def run_build_task(self) -> None:
|
||||
self.model_builder_queue = queue.Queue()
|
||||
|
||||
def build_model():
|
||||
def build_model() -> None:
|
||||
face_embeddings_map: dict[str, list[np.ndarray]] = {}
|
||||
idx = 0
|
||||
|
||||
@@ -187,7 +193,7 @@ class FaceNetRecognizer(FaceRecognizer):
|
||||
img = cv2.imread(os.path.join(face_folder, image))
|
||||
|
||||
if img is None:
|
||||
continue
|
||||
continue # type: ignore[unreachable]
|
||||
|
||||
img = self.align_face(img, img.shape[1], img.shape[0])
|
||||
emb = self.face_embedder([img])[0].squeeze()
|
||||
@@ -195,12 +201,13 @@ class FaceNetRecognizer(FaceRecognizer):
|
||||
|
||||
idx += 1
|
||||
|
||||
assert self.model_builder_queue is not None
|
||||
self.model_builder_queue.put(face_embeddings_map)
|
||||
|
||||
thread = threading.Thread(target=build_model, daemon=True)
|
||||
thread.start()
|
||||
|
||||
def build(self):
|
||||
def build(self) -> None:
|
||||
if not self.landmark_detector:
|
||||
self.init_landmark_detector()
|
||||
return None
|
||||
@@ -226,7 +233,7 @@ class FaceNetRecognizer(FaceRecognizer):
|
||||
|
||||
logger.debug("Finished building ArcFace model")
|
||||
|
||||
def classify(self, face_image):
|
||||
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
@@ -245,7 +252,7 @@ class FaceNetRecognizer(FaceRecognizer):
|
||||
img = self.align_face(face_image, face_image.shape[1], face_image.shape[0])
|
||||
embedding = self.face_embedder([img])[0].squeeze()
|
||||
|
||||
score = 0
|
||||
score: float = 0
|
||||
label = ""
|
||||
|
||||
for name, mean_emb in self.mean_embs.items():
|
||||
@@ -268,7 +275,7 @@ class FaceNetRecognizer(FaceRecognizer):
|
||||
class ArcFaceRecognizer(FaceRecognizer):
|
||||
def __init__(self, config: FrigateConfig):
|
||||
super().__init__(config)
|
||||
self.mean_embs: dict[int, np.ndarray] = {}
|
||||
self.mean_embs: dict[str, np.ndarray] = {}
|
||||
self.face_embedder: ArcfaceEmbedding = ArcfaceEmbedding(config.face_recognition)
|
||||
self.model_builder_queue: queue.Queue | None = None
|
||||
|
||||
@@ -278,7 +285,7 @@ class ArcFaceRecognizer(FaceRecognizer):
|
||||
def run_build_task(self) -> None:
|
||||
self.model_builder_queue = queue.Queue()
|
||||
|
||||
def build_model():
|
||||
def build_model() -> None:
|
||||
face_embeddings_map: dict[str, list[np.ndarray]] = {}
|
||||
idx = 0
|
||||
|
||||
@@ -297,20 +304,21 @@ class ArcFaceRecognizer(FaceRecognizer):
|
||||
img = cv2.imread(os.path.join(face_folder, image))
|
||||
|
||||
if img is None:
|
||||
continue
|
||||
continue # type: ignore[unreachable]
|
||||
|
||||
img = self.align_face(img, img.shape[1], img.shape[0])
|
||||
emb = self.face_embedder([img])[0].squeeze()
|
||||
emb = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type]
|
||||
face_embeddings_map[name].append(emb)
|
||||
|
||||
idx += 1
|
||||
|
||||
assert self.model_builder_queue is not None
|
||||
self.model_builder_queue.put(face_embeddings_map)
|
||||
|
||||
thread = threading.Thread(target=build_model, daemon=True)
|
||||
thread.start()
|
||||
|
||||
def build(self):
|
||||
def build(self) -> None:
|
||||
if not self.landmark_detector:
|
||||
self.init_landmark_detector()
|
||||
return None
|
||||
@@ -336,7 +344,7 @@ class ArcFaceRecognizer(FaceRecognizer):
|
||||
|
||||
logger.debug("Finished building ArcFace model")
|
||||
|
||||
def classify(self, face_image):
|
||||
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
@@ -353,9 +361,9 @@ class ArcFaceRecognizer(FaceRecognizer):
|
||||
|
||||
# align face and run recognition
|
||||
img = self.align_face(face_image, face_image.shape[1], face_image.shape[0])
|
||||
embedding = self.face_embedder([img])[0].squeeze()
|
||||
embedding = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type]
|
||||
|
||||
score = 0
|
||||
score: float = 0
|
||||
label = ""
|
||||
|
||||
for name, mean_emb in self.mean_embs.items():
|
||||
|
||||
@@ -10,7 +10,7 @@ import random
|
||||
import re
|
||||
import string
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional, Tuple
|
||||
from typing import Any, List, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
@@ -22,19 +22,35 @@ from frigate.comms.event_metadata_updater import (
|
||||
EventMetadataPublisher,
|
||||
EventMetadataTypeEnum,
|
||||
)
|
||||
from frigate.comms.inter_process import InterProcessRequestor
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.config.classification import LicensePlateRecognitionConfig
|
||||
from frigate.const import CLIPS_DIR, MODEL_CACHE_DIR
|
||||
from frigate.data_processing.common.license_plate.model import LicensePlateModelRunner
|
||||
from frigate.embeddings.onnx.lpr_embedding import LPR_EMBEDDING_SIZE
|
||||
from frigate.types import TrackedObjectUpdateTypesEnum
|
||||
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
|
||||
from frigate.util.image import area
|
||||
|
||||
from ...types import DataProcessorMetrics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
WRITE_DEBUG_IMAGES = False
|
||||
|
||||
|
||||
class LicensePlateProcessingMixin:
|
||||
def __init__(self, *args, **kwargs):
|
||||
# Attributes expected from consuming classes (set before super().__init__)
|
||||
config: FrigateConfig
|
||||
metrics: DataProcessorMetrics
|
||||
model_runner: LicensePlateModelRunner
|
||||
lpr_config: LicensePlateRecognitionConfig
|
||||
requestor: InterProcessRequestor
|
||||
detected_license_plates: dict[str, dict[str, Any]]
|
||||
camera_current_cars: dict[str, list[str]]
|
||||
sub_label_publisher: EventMetadataPublisher
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self.plate_rec_speed = InferenceSpeed(self.metrics.alpr_speed)
|
||||
self.plates_rec_second = EventsPerSecond()
|
||||
@@ -97,7 +113,7 @@ class LicensePlateProcessingMixin:
|
||||
)
|
||||
|
||||
try:
|
||||
outputs = self.model_runner.detection_model([normalized_image])[0]
|
||||
outputs = self.model_runner.detection_model([normalized_image])[0] # type: ignore[arg-type]
|
||||
except Exception as e:
|
||||
logger.warning(f"Error running LPR box detection model: {e}")
|
||||
return []
|
||||
@@ -105,18 +121,18 @@ class LicensePlateProcessingMixin:
|
||||
outputs = outputs[0, :, :]
|
||||
|
||||
if False:
|
||||
current_time = int(datetime.datetime.now().timestamp())
|
||||
current_time = int(datetime.datetime.now().timestamp()) # type: ignore[unreachable]
|
||||
cv2.imwrite(
|
||||
f"debug/frames/probability_map_{current_time}.jpg",
|
||||
(outputs * 255).astype(np.uint8),
|
||||
)
|
||||
|
||||
boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
|
||||
return self._filter_polygon(boxes, (h, w))
|
||||
return self._filter_polygon(boxes, (h, w)) # type: ignore[return-value,arg-type]
|
||||
|
||||
def _classify(
|
||||
self, images: List[np.ndarray]
|
||||
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]:
|
||||
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]] | None:
|
||||
"""
|
||||
Classify the orientation or category of each detected license plate.
|
||||
|
||||
@@ -138,15 +154,15 @@ class LicensePlateProcessingMixin:
|
||||
norm_images.append(norm_img)
|
||||
|
||||
try:
|
||||
outputs = self.model_runner.classification_model(norm_images)
|
||||
outputs = self.model_runner.classification_model(norm_images) # type: ignore[arg-type]
|
||||
except Exception as e:
|
||||
logger.warning(f"Error running LPR classification model: {e}")
|
||||
return
|
||||
return None
|
||||
|
||||
return self._process_classification_output(images, outputs)
|
||||
|
||||
def _recognize(
|
||||
self, camera: string, images: List[np.ndarray]
|
||||
self, camera: str, images: List[np.ndarray]
|
||||
) -> Tuple[List[str], List[List[float]]]:
|
||||
"""
|
||||
Recognize the characters on the detected license plates using the recognition model.
|
||||
@@ -179,7 +195,7 @@ class LicensePlateProcessingMixin:
|
||||
norm_images.append(norm_image)
|
||||
|
||||
try:
|
||||
outputs = self.model_runner.recognition_model(norm_images)
|
||||
outputs = self.model_runner.recognition_model(norm_images) # type: ignore[arg-type]
|
||||
except Exception as e:
|
||||
logger.warning(f"Error running LPR recognition model: {e}")
|
||||
return [], []
|
||||
@@ -410,7 +426,8 @@ class LicensePlateProcessingMixin:
|
||||
)
|
||||
|
||||
if sorted_data:
|
||||
return map(list, zip(*sorted_data))
|
||||
plates, confs, areas_list = zip(*sorted_data)
|
||||
return list(plates), list(confs), list(areas_list)
|
||||
|
||||
return [], [], []
|
||||
|
||||
@@ -532,7 +549,7 @@ class LicensePlateProcessingMixin:
|
||||
# Add the last box
|
||||
merged_boxes.append(current_box)
|
||||
|
||||
return np.array(merged_boxes, dtype=np.int32)
|
||||
return np.array(merged_boxes, dtype=np.int32) # type: ignore[return-value]
|
||||
|
||||
def _boxes_from_bitmap(
|
||||
self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int
|
||||
@@ -560,38 +577,42 @@ class LicensePlateProcessingMixin:
|
||||
boxes = []
|
||||
scores = []
|
||||
|
||||
for index in range(len(contours)):
|
||||
contour = contours[index]
|
||||
for index in range(len(contours)): # type: ignore[arg-type]
|
||||
contour = contours[index] # type: ignore[index]
|
||||
|
||||
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
|
||||
points, sside = self._get_min_boxes(contour)
|
||||
if sside < self.min_size:
|
||||
continue
|
||||
|
||||
points = np.array(points, dtype=np.float32)
|
||||
points = np.array(points, dtype=np.float32) # type: ignore[assignment]
|
||||
|
||||
score = self._box_score(output, contour)
|
||||
if self.box_thresh > score:
|
||||
continue
|
||||
|
||||
points = self._expand_box(points)
|
||||
points = self._expand_box(points) # type: ignore[assignment]
|
||||
|
||||
# Get the minimum area rectangle again after expansion
|
||||
points, sside = self._get_min_boxes(points.reshape(-1, 1, 2))
|
||||
points, sside = self._get_min_boxes(points.reshape(-1, 1, 2)) # type: ignore[attr-defined]
|
||||
if sside < self.min_size + 2:
|
||||
continue
|
||||
|
||||
points = np.array(points, dtype=np.float32)
|
||||
points = np.array(points, dtype=np.float32) # type: ignore[assignment]
|
||||
|
||||
# normalize and clip box coordinates to fit within the destination image size.
|
||||
points[:, 0] = np.clip(
|
||||
np.round(points[:, 0] / width * dest_width), 0, dest_width
|
||||
points[:, 0] = np.clip( # type: ignore[call-overload]
|
||||
np.round(points[:, 0] / width * dest_width), # type: ignore[call-overload]
|
||||
0,
|
||||
dest_width,
|
||||
)
|
||||
points[:, 1] = np.clip(
|
||||
np.round(points[:, 1] / height * dest_height), 0, dest_height
|
||||
points[:, 1] = np.clip( # type: ignore[call-overload]
|
||||
np.round(points[:, 1] / height * dest_height), # type: ignore[call-overload]
|
||||
0,
|
||||
dest_height,
|
||||
)
|
||||
|
||||
boxes.append(points.astype("int32"))
|
||||
boxes.append(points.astype("int32")) # type: ignore[attr-defined]
|
||||
scores.append(score)
|
||||
|
||||
return np.array(boxes, dtype="int32"), scores
|
||||
@@ -632,7 +653,7 @@ class LicensePlateProcessingMixin:
|
||||
x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1])
|
||||
x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1])
|
||||
mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8)
|
||||
cv2.fillPoly(mask, [contour - [x1, y1]], 1)
|
||||
cv2.fillPoly(mask, [contour - [x1, y1]], 1) # type: ignore[call-overload]
|
||||
return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0]
|
||||
|
||||
@staticmethod
|
||||
@@ -690,7 +711,7 @@ class LicensePlateProcessingMixin:
|
||||
Returns:
|
||||
bool: Whether the polygon is valid or not.
|
||||
"""
|
||||
return (
|
||||
return bool(
|
||||
point[:, 0].min() >= 0
|
||||
and point[:, 0].max() < width
|
||||
and point[:, 1].min() >= 0
|
||||
@@ -735,7 +756,7 @@ class LicensePlateProcessingMixin:
|
||||
return np.array([tl, tr, br, bl])
|
||||
|
||||
@staticmethod
|
||||
def _sort_boxes(boxes):
|
||||
def _sort_boxes(boxes: list[np.ndarray]) -> list[np.ndarray]:
|
||||
"""
|
||||
Sort polygons based on their position in the image. If boxes are close in vertical
|
||||
position (within 5 pixels), sort them by horizontal position.
|
||||
@@ -837,16 +858,16 @@ class LicensePlateProcessingMixin:
|
||||
results = [["", 0.0]] * len(images)
|
||||
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
|
||||
|
||||
outputs = np.stack(outputs)
|
||||
stacked_outputs = np.stack(outputs)
|
||||
|
||||
outputs = [
|
||||
(labels[idx], outputs[i, idx])
|
||||
for i, idx in enumerate(outputs.argmax(axis=1))
|
||||
stacked_outputs = [
|
||||
(labels[idx], stacked_outputs[i, idx])
|
||||
for i, idx in enumerate(stacked_outputs.argmax(axis=1))
|
||||
]
|
||||
|
||||
for i in range(0, len(images), self.batch_size):
|
||||
for j in range(len(outputs)):
|
||||
label, score = outputs[j]
|
||||
for j in range(len(stacked_outputs)):
|
||||
label, score = stacked_outputs[j]
|
||||
results[indices[i + j]] = [label, score]
|
||||
# make sure we have high confidence if we need to flip a box
|
||||
if "180" in label and score >= 0.7:
|
||||
@@ -854,10 +875,10 @@ class LicensePlateProcessingMixin:
|
||||
images[indices[i + j]], cv2.ROTATE_180
|
||||
)
|
||||
|
||||
return images, results
|
||||
return images, results # type: ignore[return-value]
|
||||
|
||||
def _preprocess_recognition_image(
|
||||
self, camera: string, image: np.ndarray, max_wh_ratio: float
|
||||
self, camera: str, image: np.ndarray, max_wh_ratio: float
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Preprocess an image for recognition by dynamically adjusting its width.
|
||||
@@ -925,7 +946,7 @@ class LicensePlateProcessingMixin:
|
||||
input_w = int(input_h * max_wh_ratio)
|
||||
|
||||
# check for model-specific input width
|
||||
model_input_w = self.model_runner.recognition_model.runner.get_input_width()
|
||||
model_input_w = self.model_runner.recognition_model.runner.get_input_width() # type: ignore[union-attr]
|
||||
if isinstance(model_input_w, int) and model_input_w > 0:
|
||||
input_w = model_input_w
|
||||
|
||||
@@ -945,7 +966,7 @@ class LicensePlateProcessingMixin:
|
||||
padded_image[:, :, :resized_w] = resized_image
|
||||
|
||||
if False:
|
||||
current_time = int(datetime.datetime.now().timestamp() * 1000)
|
||||
current_time = int(datetime.datetime.now().timestamp() * 1000) # type: ignore[unreachable]
|
||||
cv2.imwrite(
|
||||
f"debug/frames/preprocessed_recognition_{current_time}.jpg",
|
||||
image,
|
||||
@@ -983,8 +1004,9 @@ class LicensePlateProcessingMixin:
|
||||
np.linalg.norm(points[1] - points[2]),
|
||||
)
|
||||
)
|
||||
pts_std = np.float32(
|
||||
[[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]]
|
||||
pts_std = np.array(
|
||||
[[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]],
|
||||
dtype=np.float32,
|
||||
)
|
||||
matrix = cv2.getPerspectiveTransform(points, pts_std)
|
||||
image = cv2.warpPerspective(
|
||||
@@ -1000,15 +1022,15 @@ class LicensePlateProcessingMixin:
|
||||
return image
|
||||
|
||||
def _detect_license_plate(
|
||||
self, camera: string, input: np.ndarray
|
||||
) -> tuple[int, int, int, int]:
|
||||
self, camera: str, input: np.ndarray
|
||||
) -> tuple[int, int, int, int] | None:
|
||||
"""
|
||||
Use a lightweight YOLOv9 model to detect license plates for users without Frigate+
|
||||
|
||||
Return the dimensions of the detected plate as [x1, y1, x2, y2].
|
||||
"""
|
||||
try:
|
||||
predictions = self.model_runner.yolov9_detection_model(input)
|
||||
predictions = self.model_runner.yolov9_detection_model(input) # type: ignore[arg-type]
|
||||
except Exception as e:
|
||||
logger.warning(f"Error running YOLOv9 license plate detection model: {e}")
|
||||
return None
|
||||
@@ -1073,7 +1095,7 @@ class LicensePlateProcessingMixin:
|
||||
logger.debug(
|
||||
f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}"
|
||||
)
|
||||
return tuple(expanded_box.astype(int))
|
||||
return tuple(expanded_box.astype(int)) # type: ignore[return-value]
|
||||
else:
|
||||
return None # No detection above the threshold
|
||||
|
||||
@@ -1097,7 +1119,7 @@ class LicensePlateProcessingMixin:
|
||||
f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})"
|
||||
)
|
||||
|
||||
clusters = []
|
||||
clusters: list[list[dict[str, Any]]] = []
|
||||
for i, plate in enumerate(plates):
|
||||
merged = False
|
||||
for j, cluster in enumerate(clusters):
|
||||
@@ -1132,7 +1154,7 @@ class LicensePlateProcessingMixin:
|
||||
)
|
||||
|
||||
# Best cluster: largest size, tiebroken by max conf
|
||||
def cluster_score(c):
|
||||
def cluster_score(c: list[dict[str, Any]]) -> tuple[int, float]:
|
||||
return (len(c), max(v["conf"] for v in c))
|
||||
|
||||
best_cluster_idx = max(
|
||||
@@ -1178,7 +1200,7 @@ class LicensePlateProcessingMixin:
|
||||
|
||||
def lpr_process(
|
||||
self, obj_data: dict[str, Any], frame: np.ndarray, dedicated_lpr: bool = False
|
||||
):
|
||||
) -> None:
|
||||
"""Look for license plates in image."""
|
||||
self.metrics.alpr_pps.value = self.plates_rec_second.eps()
|
||||
self.metrics.yolov9_lpr_pps.value = self.plates_det_second.eps()
|
||||
@@ -1195,7 +1217,7 @@ class LicensePlateProcessingMixin:
|
||||
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
|
||||
|
||||
# apply motion mask
|
||||
rgb[self.config.cameras[obj_data].motion.rasterized_mask == 0] = [0, 0, 0]
|
||||
rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] # type: ignore[attr-defined]
|
||||
|
||||
if WRITE_DEBUG_IMAGES:
|
||||
cv2.imwrite(
|
||||
@@ -1261,7 +1283,7 @@ class LicensePlateProcessingMixin:
|
||||
"stationary", False
|
||||
):
|
||||
logger.debug(
|
||||
f"{camera}: Skipping LPR for non-stationary {obj_data['label']} object {id} with no position changes. (Detected in {self.config.cameras[camera].detect.min_initialized + 1} concurrent frames, threshold to run is {self.config.cameras[camera].detect.min_initialized + 2} frames)"
|
||||
f"{camera}: Skipping LPR for non-stationary {obj_data['label']} object {id} with no position changes. (Detected in {self.config.cameras[camera].detect.min_initialized + 1} concurrent frames, threshold to run is {self.config.cameras[camera].detect.min_initialized + 2} frames)" # type: ignore[operator]
|
||||
)
|
||||
return
|
||||
|
||||
@@ -1288,7 +1310,7 @@ class LicensePlateProcessingMixin:
|
||||
if time_since_stationary > self.stationary_scan_duration:
|
||||
return
|
||||
|
||||
license_plate: Optional[dict[str, Any]] = None
|
||||
license_plate = None
|
||||
|
||||
if "license_plate" not in self.config.cameras[camera].objects.track:
|
||||
logger.debug(f"{camera}: Running manual license_plate detection.")
|
||||
@@ -1301,7 +1323,7 @@ class LicensePlateProcessingMixin:
|
||||
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
|
||||
|
||||
# apply motion mask
|
||||
rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0]
|
||||
rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] # type: ignore[attr-defined]
|
||||
|
||||
left, top, right, bottom = car_box
|
||||
car = rgb[top:bottom, left:right]
|
||||
@@ -1378,10 +1400,10 @@ class LicensePlateProcessingMixin:
|
||||
if attr.get("label") != "license_plate":
|
||||
continue
|
||||
|
||||
if license_plate is None or attr.get(
|
||||
if license_plate is None or attr.get( # type: ignore[unreachable]
|
||||
"score", 0.0
|
||||
) > license_plate.get("score", 0.0):
|
||||
license_plate = attr
|
||||
license_plate = attr # type: ignore[assignment]
|
||||
|
||||
# no license plates detected in this frame
|
||||
if not license_plate:
|
||||
@@ -1389,9 +1411,9 @@ class LicensePlateProcessingMixin:
|
||||
|
||||
# we are using dedicated lpr with frigate+
|
||||
if obj_data.get("label") == "license_plate":
|
||||
license_plate = obj_data
|
||||
license_plate = obj_data # type: ignore[assignment]
|
||||
|
||||
license_plate_box = license_plate.get("box")
|
||||
license_plate_box = license_plate.get("box") # type: ignore[attr-defined]
|
||||
|
||||
# check that license plate is valid
|
||||
if (
|
||||
@@ -1420,7 +1442,7 @@ class LicensePlateProcessingMixin:
|
||||
0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2
|
||||
)
|
||||
|
||||
plate_box = tuple(int(x) for x in expanded_box)
|
||||
plate_box = tuple(int(x) for x in expanded_box) # type: ignore[assignment]
|
||||
|
||||
# Crop using the expanded box
|
||||
license_plate_frame = license_plate_frame[
|
||||
@@ -1596,7 +1618,7 @@ class LicensePlateProcessingMixin:
|
||||
sub_label = next(
|
||||
(
|
||||
label
|
||||
for label, plates_list in self.lpr_config.known_plates.items()
|
||||
for label, plates_list in self.lpr_config.known_plates.items() # type: ignore[union-attr]
|
||||
if any(
|
||||
re.match(f"^{plate}$", rep_plate)
|
||||
or Levenshtein.distance(plate, rep_plate)
|
||||
@@ -1649,14 +1671,16 @@ class LicensePlateProcessingMixin:
|
||||
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
|
||||
_, encoded_img = cv2.imencode(".jpg", frame_bgr)
|
||||
self.sub_label_publisher.publish(
|
||||
(base64.b64encode(encoded_img).decode("ASCII"), id, camera),
|
||||
(base64.b64encode(encoded_img.tobytes()).decode("ASCII"), id, camera),
|
||||
EventMetadataTypeEnum.save_lpr_snapshot.value,
|
||||
)
|
||||
|
||||
def handle_request(self, topic, request_data) -> dict[str, Any] | None:
|
||||
return
|
||||
def handle_request(
|
||||
self, topic: str, request_data: dict[str, Any]
|
||||
) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
def lpr_expire(self, object_id: str, camera: str):
|
||||
def lpr_expire(self, object_id: str, camera: str) -> None:
|
||||
if object_id in self.detected_license_plates:
|
||||
self.detected_license_plates.pop(object_id)
|
||||
|
||||
@@ -1673,7 +1697,7 @@ class CTCDecoder:
|
||||
for each decoded character sequence.
|
||||
"""
|
||||
|
||||
def __init__(self, character_dict_path=None):
|
||||
def __init__(self, character_dict_path: str | None = None) -> None:
|
||||
"""
|
||||
Initializes the CTCDecoder.
|
||||
:param character_dict_path: Path to the character dictionary file.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from frigate.comms.inter_process import InterProcessRequestor
|
||||
from frigate.embeddings.onnx.lpr_embedding import (
|
||||
LicensePlateDetector,
|
||||
PaddleOCRClassification,
|
||||
@@ -9,7 +10,12 @@ from ...types import DataProcessorModelRunner
|
||||
|
||||
|
||||
class LicensePlateModelRunner(DataProcessorModelRunner):
|
||||
def __init__(self, requestor, device: str = "CPU", model_size: str = "small"):
|
||||
def __init__(
|
||||
self,
|
||||
requestor: InterProcessRequestor,
|
||||
device: str = "CPU",
|
||||
model_size: str = "small",
|
||||
):
|
||||
super().__init__(requestor, device, model_size)
|
||||
self.detection_model = PaddleOCRDetection(
|
||||
model_size=model_size, requestor=requestor, device=device
|
||||
|
||||
Reference in New Issue
Block a user