blakeblackshear.frigate/frigate/data_processing/common/license_plate/mixin.py
Josh Hawkins 60b34bcfca
Refactor processors and add LPR postprocessing (#16722)
* recordings data pub/sub

* function to process recording stream frames

* model runner

* lpr model runner

* refactor to mixin class and use model runner

* separate out realtime and post processors

* move model and mixin folders

* basic postprocessor

* clean up

* docs

* postprocessing logic

* clean up

* return none if recordings are disabled

* run postprocessor handle_requests too

* tweak expansion

* add put endpoint

* postprocessor tweaks with endpoint
2025-02-21 06:51:37 -07:00

1194 lines
42 KiB
Python

"""Handle processing images for face detection and recognition."""
import datetime
import logging
import math
import re
from typing import List, Optional, Tuple
import cv2
import numpy as np
import requests
from Levenshtein import distance
from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset
from shapely.geometry import Polygon
from frigate.const import FRIGATE_LOCALHOST
from frigate.util.image import area
logger = logging.getLogger(__name__)
WRITE_DEBUG_IMAGES = False
class LicensePlateProcessingMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.requires_license_plate_detection = (
"license_plate" not in self.config.objects.all_objects
)
self.ctc_decoder = CTCDecoder()
self.batch_size = 6
# Detection specific parameters
self.min_size = 3
self.max_size = 960
self.box_thresh = 0.8
self.mask_thresh = 0.8
def _detect(self, image: np.ndarray) -> List[np.ndarray]:
"""
Detect possible license plates in the input image by first resizing and normalizing it,
running a detection model, and filtering out low-probability regions.
Args:
image (np.ndarray): The input image in which license plates will be detected.
Returns:
List[np.ndarray]: A list of bounding box coordinates representing detected license plates.
"""
h, w = image.shape[:2]
if sum([h, w]) < 64:
image = self._zero_pad(image)
resized_image = self._resize_image(image)
normalized_image = self._normalize_image(resized_image)
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/license_plate_resized_{current_time}.jpg",
resized_image,
)
outputs = self.model_runner.detection_model([normalized_image])[0]
outputs = outputs[0, :, :]
boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
return self._filter_polygon(boxes, (h, w))
def _classify(
self, images: List[np.ndarray]
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]:
"""
Classify the orientation or category of each detected license plate.
Args:
images (List[np.ndarray]): A list of images of detected license plates.
Returns:
Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of rotated/normalized plate images
and classification results with confidence scores.
"""
num_images = len(images)
indices = np.argsort([x.shape[1] / x.shape[0] for x in images])
for i in range(0, num_images, self.batch_size):
norm_images = []
for j in range(i, min(num_images, i + self.batch_size)):
norm_img = self._preprocess_classification_image(images[indices[j]])
norm_img = norm_img[np.newaxis, :]
norm_images.append(norm_img)
outputs = self.model_runner.classification_model(norm_images)
return self._process_classification_output(images, outputs)
def _recognize(
self, images: List[np.ndarray]
) -> Tuple[List[str], List[List[float]]]:
"""
Recognize the characters on the detected license plates using the recognition model.
Args:
images (List[np.ndarray]): A list of images of license plates to recognize.
Returns:
Tuple[List[str], List[List[float]]]: A tuple of recognized license plate texts and confidence scores.
"""
input_shape = [3, 48, 320]
num_images = len(images)
# sort images by aspect ratio for processing
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
for index in range(0, num_images, self.batch_size):
input_h, input_w = input_shape[1], input_shape[2]
max_wh_ratio = input_w / input_h
norm_images = []
# calculate the maximum aspect ratio in the current batch
for i in range(index, min(num_images, index + self.batch_size)):
h, w = images[indices[i]].shape[0:2]
max_wh_ratio = max(max_wh_ratio, w * 1.0 / h)
# preprocess the images based on the max aspect ratio
for i in range(index, min(num_images, index + self.batch_size)):
norm_image = self._preprocess_recognition_image(
images[indices[i]], max_wh_ratio
)
norm_image = norm_image[np.newaxis, :]
norm_images.append(norm_image)
outputs = self.model_runner.recognition_model(norm_images)
return self.ctc_decoder(outputs)
def _process_license_plate(
self, image: np.ndarray
) -> Tuple[List[str], List[float], List[int]]:
"""
Complete pipeline for detecting, classifying, and recognizing license plates in the input image.
Args:
image (np.ndarray): The input image in which to detect, classify, and recognize license plates.
Returns:
Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates.
"""
if (
self.model_runner.detection_model.runner is None
or self.model_runner.classification_model.runner is None
or self.model_runner.recognition_model.runner is None
):
# we might still be downloading the models
logger.debug("Model runners not loaded")
return [], [], []
plate_points = self._detect(image)
if len(plate_points) == 0:
logger.debug("No points found by OCR detector model")
return [], [], []
plate_points = self._sort_polygon(list(plate_points))
plate_images = [self._crop_license_plate(image, x) for x in plate_points]
rotated_images, _ = self._classify(plate_images)
# debug rotated and classification result
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
for i, img in enumerate(plate_images):
cv2.imwrite(
f"debug/frames/license_plate_rotated_{current_time}_{i + 1}.jpg",
img,
)
for i, img in enumerate(rotated_images):
cv2.imwrite(
f"debug/frames/license_plate_classified_{current_time}_{i + 1}.jpg",
img,
)
# keep track of the index of each image for correct area calc later
sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images])
reverse_mapping = {
idx: original_idx for original_idx, idx in enumerate(sorted_indices)
}
results, confidences = self._recognize(rotated_images)
if results:
license_plates = [""] * len(rotated_images)
average_confidences = [[0.0]] * len(rotated_images)
areas = [0] * len(rotated_images)
# map results back to original image order
for i, (plate, conf) in enumerate(zip(results, confidences)):
original_idx = reverse_mapping[i]
height, width = rotated_images[original_idx].shape[:2]
area = height * width
average_confidence = conf
# set to True to write each cropped image for debugging
if False:
save_image = cv2.cvtColor(
rotated_images[original_idx], cv2.COLOR_RGB2BGR
)
filename = f"debug/frames/plate_{original_idx}_{plate}_{area}.jpg"
cv2.imwrite(filename, save_image)
license_plates[original_idx] = plate
average_confidences[original_idx] = average_confidence
areas[original_idx] = area
# Filter out plates that have a length of less than min_plate_length characters
# or that don't match the expected format (if defined)
# Sort by area, then by plate length, then by confidence all desc
filtered_data = []
for plate, conf, area in zip(license_plates, average_confidences, areas):
if len(plate) < self.lpr_config.min_plate_length:
logger.debug(
f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})"
)
continue
if self.lpr_config.format and not re.fullmatch(
self.lpr_config.format, plate
):
logger.debug(f"Filtered out '{plate}' due to format mismatch")
continue
filtered_data.append((plate, conf, area))
sorted_data = sorted(
filtered_data,
key=lambda x: (x[2], len(x[0]), x[1]),
reverse=True,
)
if sorted_data:
return map(list, zip(*sorted_data))
return [], [], []
def _resize_image(self, image: np.ndarray) -> np.ndarray:
"""
Resize the input image while maintaining the aspect ratio, ensuring dimensions are multiples of 32.
Args:
image (np.ndarray): The input image to resize.
Returns:
np.ndarray: The resized image.
"""
h, w = image.shape[:2]
ratio = min(self.max_size / max(h, w), 1.0)
resize_h = max(int(round(int(h * ratio) / 32) * 32), 32)
resize_w = max(int(round(int(w * ratio) / 32) * 32), 32)
return cv2.resize(image, (resize_w, resize_h))
def _normalize_image(self, image: np.ndarray) -> np.ndarray:
"""
Normalize the input image by subtracting the mean and multiplying by the standard deviation.
Args:
image (np.ndarray): The input image to normalize.
Returns:
np.ndarray: The normalized image, transposed to match the model's expected input format.
"""
mean = np.array([123.675, 116.28, 103.53]).reshape(1, -1).astype("float64")
std = 1 / np.array([58.395, 57.12, 57.375]).reshape(1, -1).astype("float64")
image = image.astype("float32")
cv2.subtract(image, mean, image)
cv2.multiply(image, std, image)
return image.transpose((2, 0, 1))[np.newaxis, ...]
def _boxes_from_bitmap(
self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int
) -> Tuple[np.ndarray, List[float]]:
"""
Process the binary mask to extract bounding boxes and associated confidence scores.
Args:
output (np.ndarray): Output confidence map from the model.
mask (np.ndarray): Binary mask of detected regions.
dest_width (int): Target width for scaling the box coordinates.
dest_height (int): Target height for scaling the box coordinates.
Returns:
Tuple[np.ndarray, List[float]]: Array of bounding boxes and list of corresponding scores.
"""
mask = (mask * 255).astype(np.uint8)
height, width = mask.shape
outs = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
# handle different return values of findContours between OpenCV versions
contours = outs[0] if len(outs) == 2 else outs[1]
boxes = []
scores = []
for index in range(len(contours)):
contour = contours[index]
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
points, min_side = self._get_min_boxes(contour)
logger.debug(f"min side {index}, {min_side}")
if min_side < self.min_size:
continue
points = np.array(points)
score = self._box_score(output, contour)
logger.debug(f"box score {index}, {score}")
if self.box_thresh > score:
continue
polygon = Polygon(points)
distance = polygon.area / polygon.length
# Use pyclipper to shrink the polygon slightly based on the computed distance.
offset = PyclipperOffset()
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON)
points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2))
# get the minimum bounding box around the shrunken polygon.
box, min_side = self._get_min_boxes(points)
if min_side < self.min_size + 2:
continue
box = np.array(box)
# normalize and clip box coordinates to fit within the destination image size.
box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width)
box[:, 1] = np.clip(
np.round(box[:, 1] / height * dest_height), 0, dest_height
)
boxes.append(box.astype("int32"))
scores.append(score)
return np.array(boxes, dtype="int32"), scores
@staticmethod
def _get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]:
"""
Calculate the minimum bounding box (rotated rectangle) for a given contour.
Args:
contour (np.ndarray): The contour points of the detected shape.
Returns:
Tuple[List[Tuple[float, float]], float]: A list of four points representing the
corners of the bounding box, and the length of the shortest side.
"""
bounding_box = cv2.minAreaRect(contour)
points = sorted(cv2.boxPoints(bounding_box), key=lambda x: x[0])
index_1, index_4 = (0, 1) if points[1][1] > points[0][1] else (1, 0)
index_2, index_3 = (2, 3) if points[3][1] > points[2][1] else (3, 2)
box = [points[index_1], points[index_2], points[index_3], points[index_4]]
return box, min(bounding_box[1])
@staticmethod
def _box_score(bitmap: np.ndarray, contour: np.ndarray) -> float:
"""
Calculate the average score within the bounding box of a contour.
Args:
bitmap (np.ndarray): The output confidence map from the model.
contour (np.ndarray): The contour of the detected shape.
Returns:
float: The average score of the pixels inside the contour region.
"""
h, w = bitmap.shape[:2]
contour = contour.reshape(-1, 2)
x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1])
x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1])
mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8)
cv2.fillPoly(mask, [contour - [x1, y1]], 1)
return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0]
@staticmethod
def _expand_box(points: List[Tuple[float, float]]) -> np.ndarray:
"""
Expand a polygonal shape slightly by a factor determined by the area-to-perimeter ratio.
Args:
points (List[Tuple[float, float]]): Points of the polygon to expand.
Returns:
np.ndarray: Expanded polygon points.
"""
polygon = Polygon(points)
distance = polygon.area / polygon.length
offset = PyclipperOffset()
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON)
expanded = np.array(offset.Execute(distance * 1.5)).reshape((-1, 2))
return expanded
def _filter_polygon(
self, points: List[np.ndarray], shape: Tuple[int, int]
) -> np.ndarray:
"""
Filter a set of polygons to include only valid ones that fit within an image shape
and meet size constraints.
Args:
points (List[np.ndarray]): List of polygons to filter.
shape (Tuple[int, int]): Shape of the image (height, width).
Returns:
np.ndarray: List of filtered polygons.
"""
height, width = shape
return np.array(
[
self._clockwise_order(point)
for point in points
if self._is_valid_polygon(point, width, height)
]
)
@staticmethod
def _is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool:
"""
Check if a polygon is valid, meaning it fits within the image bounds
and has sides of a minimum length.
Args:
point (np.ndarray): The polygon to validate.
width (int): Image width.
height (int): Image height.
Returns:
bool: Whether the polygon is valid or not.
"""
return (
point[:, 0].min() >= 0
and point[:, 0].max() < width
and point[:, 1].min() >= 0
and point[:, 1].max() < height
and np.linalg.norm(point[0] - point[1]) > 3
and np.linalg.norm(point[0] - point[3]) > 3
)
@staticmethod
def _clockwise_order(point: np.ndarray) -> np.ndarray:
"""
Arrange the points of a polygon in clockwise order based on their angular positions
around the polygon's center.
Args:
point (np.ndarray): Array of points of the polygon.
Returns:
np.ndarray: Points ordered in clockwise direction.
"""
center = point.mean(axis=0)
return point[
np.argsort(np.arctan2(point[:, 1] - center[1], point[:, 0] - center[0]))
]
@staticmethod
def _sort_polygon(points):
"""
Sort polygons based on their position in the image. If polygons are close in vertical
position (within 5 pixels), sort them by horizontal position.
Args:
points: List of polygons to sort.
Returns:
List: Sorted list of polygons.
"""
points.sort(key=lambda x: (x[0][1], x[0][0]))
for i in range(len(points) - 1):
for j in range(i, -1, -1):
if abs(points[j + 1][0][1] - points[j][0][1]) < 5 and (
points[j + 1][0][0] < points[j][0][0]
):
temp = points[j]
points[j] = points[j + 1]
points[j + 1] = temp
else:
break
return points
@staticmethod
def _zero_pad(image: np.ndarray) -> np.ndarray:
"""
Apply zero-padding to an image, ensuring its dimensions are at least 32x32.
The padding is added only if needed.
Args:
image (np.ndarray): Input image.
Returns:
np.ndarray: Zero-padded image.
"""
h, w, c = image.shape
pad = np.zeros((max(32, h), max(32, w), c), np.uint8)
pad[:h, :w, :] = image
return pad
@staticmethod
def _preprocess_classification_image(image: np.ndarray) -> np.ndarray:
"""
Preprocess a single image for classification by resizing, normalizing, and padding.
This method resizes the input image to a fixed height of 48 pixels while adjusting
the width dynamically up to a maximum of 192 pixels. The image is then normalized and
padded to fit the required input dimensions for classification.
Args:
image (np.ndarray): Input image to preprocess.
Returns:
np.ndarray: Preprocessed and padded image.
"""
# fixed height of 48, dynamic width up to 192
input_shape = (3, 48, 192)
input_c, input_h, input_w = input_shape
h, w = image.shape[:2]
ratio = w / h
resized_w = min(input_w, math.ceil(input_h * ratio))
resized_image = cv2.resize(image, (resized_w, input_h))
# handle single-channel images (grayscale) if needed
if input_c == 1 and resized_image.ndim == 2:
resized_image = resized_image[np.newaxis, :, :]
else:
resized_image = resized_image.transpose((2, 0, 1))
# normalize
resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5
padded_image = np.zeros((input_c, input_h, input_w), dtype=np.float32)
padded_image[:, :, :resized_w] = resized_image
return padded_image
def _process_classification_output(
self, images: List[np.ndarray], outputs: List[np.ndarray]
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]:
"""
Process the classification model output by matching labels with confidence scores.
This method processes the outputs from the classification model and rotates images
with high confidence of being labeled "180". It ensures that results are mapped to
the original image order.
Args:
images (List[np.ndarray]): List of input images.
outputs (List[np.ndarray]): Corresponding model outputs.
Returns:
Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of processed images and
classification results (label and confidence score).
"""
labels = ["0", "180"]
results = [["", 0.0]] * len(images)
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
outputs = np.stack(outputs)
outputs = [
(labels[idx], outputs[i, idx])
for i, idx in enumerate(outputs.argmax(axis=1))
]
for i in range(0, len(images), self.batch_size):
for j in range(len(outputs)):
label, score = outputs[j]
results[indices[i + j]] = [label, score]
# make sure we have high confidence if we need to flip a box, this will be rare in lpr
if "180" in label and score >= 0.9:
images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1)
return images, results
def _preprocess_recognition_image(
self, image: np.ndarray, max_wh_ratio: float
) -> np.ndarray:
"""
Preprocess an image for recognition by dynamically adjusting its width.
This method adjusts the width of the image based on the maximum width-to-height ratio
while keeping the height fixed at 48 pixels. The image is then normalized and padded
to fit the required input dimensions for recognition.
Args:
image (np.ndarray): Input image to preprocess.
max_wh_ratio (float): Maximum width-to-height ratio for resizing.
Returns:
np.ndarray: Preprocessed and padded image.
"""
# fixed height of 48, dynamic width based on ratio
input_shape = [3, 48, 320]
input_h, input_w = input_shape[1], input_shape[2]
assert image.shape[2] == input_shape[0], "Unexpected number of image channels."
# dynamically adjust input width based on max_wh_ratio
input_w = int(input_h * max_wh_ratio)
# check for model-specific input width
model_input_w = self.model_runner.recognition_model.runner.ort.get_inputs()[
0
].shape[3]
if isinstance(model_input_w, int) and model_input_w > 0:
input_w = model_input_w
h, w = image.shape[:2]
aspect_ratio = w / h
resized_w = min(input_w, math.ceil(input_h * aspect_ratio))
resized_image = cv2.resize(image, (resized_w, input_h))
resized_image = resized_image.transpose((2, 0, 1))
resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5
# Compute mean pixel value of the resized image (per channel)
mean_pixel = np.mean(resized_image, axis=(1, 2), keepdims=True)
padded_image = np.full(
(input_shape[0], input_h, input_w), mean_pixel, dtype=np.float32
)
padded_image[:, :, :resized_w] = resized_image
return padded_image
@staticmethod
def _crop_license_plate(image: np.ndarray, points: np.ndarray) -> np.ndarray:
"""
Crop the license plate from the image using four corner points.
This method crops the region containing the license plate by using the perspective
transformation based on four corner points. If the resulting image is significantly
taller than wide, the image is rotated to the correct orientation.
Args:
image (np.ndarray): Input image containing the license plate.
points (np.ndarray): Four corner points defining the plate's position.
Returns:
np.ndarray: Cropped and potentially rotated license plate image.
"""
assert len(points) == 4, "shape of points must be 4*2"
points = points.astype(np.float32)
crop_width = int(
max(
np.linalg.norm(points[0] - points[1]),
np.linalg.norm(points[2] - points[3]),
)
)
crop_height = int(
max(
np.linalg.norm(points[0] - points[3]),
np.linalg.norm(points[1] - points[2]),
)
)
pts_std = np.float32(
[[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]]
)
matrix = cv2.getPerspectiveTransform(points, pts_std)
image = cv2.warpPerspective(
image,
matrix,
(crop_width, crop_height),
borderMode=cv2.BORDER_REPLICATE,
flags=cv2.INTER_CUBIC,
)
height, width = image.shape[0:2]
if height * 1.0 / width >= 1.5:
image = np.rot90(image, k=3)
return image
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""
Use a lightweight YOLOv9 model to detect license plates for users without Frigate+
Return the dimensions of the detected plate as [x1, y1, x2, y2].
"""
predictions = self.model_runner.yolov9_detection_model(input)
confidence_threshold = self.lpr_config.detection_threshold
top_score = -1
top_box = None
# Loop over predictions
for prediction in predictions:
score = prediction[6]
if score >= confidence_threshold:
bbox = prediction[1:5]
# Scale boxes back to original image size
scale_x = input.shape[1] / 256
scale_y = input.shape[0] / 256
bbox[0] *= scale_x
bbox[1] *= scale_y
bbox[2] *= scale_x
bbox[3] *= scale_y
if score > top_score:
top_score = score
top_box = bbox
# Return the top scoring bounding box if found
if top_box is not None:
# expand box by 30% to help with OCR
expansion = (top_box[2:] - top_box[:2]) * 0.30
# Expand box
expanded_box = np.array(
[
top_box[0] - expansion[0], # x1
top_box[1] - expansion[1], # y1
top_box[2] + expansion[0], # x2
top_box[3] + expansion[1], # y2
]
).clip(0, [input.shape[1], input.shape[0]] * 2)
logger.debug(f"Found license plate: {expanded_box.astype(int)}")
return tuple(expanded_box.astype(int))
else:
return None # No detection above the threshold
def _should_keep_previous_plate(
self, id, top_plate, top_char_confidences, top_area, avg_confidence
):
if id not in self.detected_license_plates:
return False
prev_data = self.detected_license_plates[id]
prev_plate = prev_data["plate"]
prev_char_confidences = prev_data["char_confidences"]
prev_area = prev_data["area"]
prev_avg_confidence = (
sum(prev_char_confidences) / len(prev_char_confidences)
if prev_char_confidences
else 0
)
# 1. Normalize metrics
# Length score - use relative comparison
# If lengths are equal, score is 0.5 for both
# If one is longer, it gets a higher score up to 1.0
max_length_diff = 4 # Maximum expected difference in plate lengths
length_diff = len(top_plate) - len(prev_plate)
curr_length_score = 0.5 + (
length_diff / (2 * max_length_diff)
) # Normalize to 0-1
curr_length_score = max(0, min(1, curr_length_score)) # Clamp to 0-1
prev_length_score = 1 - curr_length_score # Inverse relationship
# Area score (normalize based on max of current and previous)
max_area = max(top_area, prev_area)
curr_area_score = top_area / max_area
prev_area_score = prev_area / max_area
# Average confidence score (already normalized 0-1)
curr_conf_score = avg_confidence
prev_conf_score = prev_avg_confidence
# Character confidence comparison score
min_length = min(len(top_plate), len(prev_plate))
if min_length > 0:
curr_char_conf = sum(top_char_confidences[:min_length]) / min_length
prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length
else:
curr_char_conf = 0
prev_char_conf = 0
# 2. Define weights
weights = {
"length": 0.4,
"area": 0.3,
"avg_confidence": 0.2,
"char_confidence": 0.1,
}
# 3. Calculate weighted scores
curr_score = (
curr_length_score * weights["length"]
+ curr_area_score * weights["area"]
+ curr_conf_score * weights["avg_confidence"]
+ curr_char_conf * weights["char_confidence"]
)
prev_score = (
prev_length_score * weights["length"]
+ prev_area_score * weights["area"]
+ prev_conf_score * weights["avg_confidence"]
+ prev_char_conf * weights["char_confidence"]
)
# 4. Log the comparison for debugging
logger.debug(
f"Plate comparison - Current plate: {top_plate} (score: {curr_score:.3f}) vs "
f"Previous plate: {prev_plate} (score: {prev_score:.3f})\n"
f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), "
f"Area: {top_area} vs {prev_area}, "
f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}"
)
# 5. Return True if we should keep the previous plate (i.e., if it scores higher)
return prev_score > curr_score
def lpr_process(self, obj_data: dict[str, any], frame: np.ndarray):
"""Look for license plates in image."""
id = obj_data["id"]
# don't run for non car objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
return
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug("Not a processing license plate for a stationary car object.")
return
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return
license_plate: Optional[dict[str, any]] = None
if self.requires_license_plate_detection:
logger.debug("Running manual license_plate detection.")
car_box = obj_data.get("box")
if not car_box:
return
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
left, top, right, bottom = car_box
car = rgb[top:bottom, left:right]
# double the size of the car for better box detection
car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0])))
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/car_frame_{current_time}.jpg",
car,
)
yolov9_start = datetime.datetime.now().timestamp()
license_plate = self._detect_license_plate(car)
logger.debug(
f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms"
)
if not license_plate:
logger.debug("Detected no license plates for car object.")
return
license_plate_area = max(
0,
(license_plate[2] - license_plate[0])
* (license_plate[3] - license_plate[1]),
)
# check that license plate is valid
# double the value because we've doubled the size of the car
if license_plate_area < self.lpr_config.min_area * 2:
logger.debug("License plate is less than min_area")
return
license_plate_frame = car[
license_plate[1] : license_plate[3], license_plate[0] : license_plate[2]
]
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get("score", 0.0) > license_plate.get(
"score", 0.0
):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return
if license_plate.get("score") < self.lpr_config.detection_threshold:
logger.debug(
f"Plate detection score is less than the threshold ({license_plate['score']:0.2f} < {self.lpr_config.detection_threshold})"
)
return
license_plate_box = license_plate.get("box")
# check that license plate is valid
if (
not license_plate_box
or area(license_plate_box) < self.lpr_config.min_area
):
logger.debug(f"Invalid license plate box {license_plate}")
return
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
license_plate_frame = license_plate_frame[
license_plate_box[1] : license_plate_box[3],
license_plate_box[0] : license_plate_box[2],
]
# double the size of the license plate frame for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
(
int(2 * license_plate_frame.shape[1]),
int(2 * license_plate_frame.shape[0]),
),
)
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/license_plate_frame_{current_time}.jpg",
license_plate_frame,
)
# run detection, returns results sorted by confidence, best first
license_plates, confidences, areas = self._process_license_plate(
license_plate_frame
)
logger.debug(f"Text boxes: {license_plates}")
logger.debug(f"Confidences: {confidences}")
logger.debug(f"Areas: {areas}")
if license_plates:
for plate, confidence, text_area in zip(license_plates, confidences, areas):
avg_confidence = (
(sum(confidence) / len(confidence)) if confidence else 0
)
logger.debug(
f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
)
else:
# no plates found
logger.debug("No text detected")
return
top_plate, top_char_confidences, top_area = (
license_plates[0],
confidences[0],
areas[0],
)
avg_confidence = (
(sum(top_char_confidences) / len(top_char_confidences))
if top_char_confidences
else 0
)
# Check if we have a previously detected plate for this ID
if id in self.detected_license_plates:
if self._should_keep_previous_plate(
id, top_plate, top_char_confidences, top_area, avg_confidence
):
logger.debug("Keeping previous plate")
return
# Check against minimum confidence threshold
if avg_confidence < self.lpr_config.recognition_threshold:
logger.debug(
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})"
)
return
# Determine subLabel based on known plates, use regex matching
# Default to the detected plate, use label name if there's a match
sub_label = next(
(
label
for label, plates in self.lpr_config.known_plates.items()
if any(
re.match(f"^{plate}$", top_plate)
or distance(plate, top_plate) <= self.lpr_config.match_distance
for plate in plates
)
),
top_plate,
)
# Send the result to the API
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": avg_confidence,
},
)
if resp.status_code == 200:
self.detected_license_plates[id] = {
"plate": top_plate,
"char_confidences": top_char_confidences,
"area": top_area,
"obj_data": obj_data,
}
def handle_request(self, topic, request_data) -> dict[str, any] | None:
return
def expire_object(self, object_id: str):
if object_id in self.detected_license_plates:
self.detected_license_plates.pop(object_id)
class CTCDecoder:
"""
A decoder for interpreting the output of a CTC (Connectionist Temporal Classification) model.
This decoder converts the model's output probabilities into readable sequences of characters
while removing duplicates and handling blank tokens. It also calculates the confidence scores
for each decoded character sequence.
"""
def __init__(self):
"""
Initialize the CTCDecoder with a list of characters and a character map.
The character set includes digits, letters, special characters, and a "blank" token
(used by the CTC model for decoding purposes). A character map is created to map
indices to characters.
"""
self.characters = [
"blank",
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
":",
";",
"<",
"=",
">",
"?",
"@",
"A",
"B",
"C",
"D",
"E",
"F",
"G",
"H",
"I",
"J",
"K",
"L",
"M",
"N",
"O",
"P",
"Q",
"R",
"S",
"T",
"U",
"V",
"W",
"X",
"Y",
"Z",
"[",
"\\",
"]",
"^",
"_",
"`",
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h",
"i",
"j",
"k",
"l",
"m",
"n",
"o",
"p",
"q",
"r",
"s",
"t",
"u",
"v",
"w",
"x",
"y",
"z",
"{",
"|",
"}",
"~",
"!",
'"',
"#",
"$",
"%",
"&",
"'",
"(",
")",
"*",
"+",
",",
"-",
".",
"/",
" ",
" ",
]
self.char_map = {i: char for i, char in enumerate(self.characters)}
def __call__(
self, outputs: List[np.ndarray]
) -> Tuple[List[str], List[List[float]]]:
"""
Decode a batch of model outputs into character sequences and their confidence scores.
The method takes the output probability distributions for each time step and uses
the best path decoding strategy. It then merges repeating characters and ignores
blank tokens. Confidence scores for each decoded character are also calculated.
Args:
outputs (List[np.ndarray]): A list of model outputs, where each element is
a probability distribution for each time step.
Returns:
Tuple[List[str], List[List[float]]]: A tuple of decoded character sequences
and confidence scores for each sequence.
"""
results = []
confidences = []
for output in outputs:
seq_log_probs = np.log(output + 1e-8)
best_path = np.argmax(seq_log_probs, axis=1)
merged_path = []
merged_probs = []
for t, char_index in enumerate(best_path):
if char_index != 0 and (t == 0 or char_index != best_path[t - 1]):
merged_path.append(char_index)
merged_probs.append(seq_log_probs[t, char_index])
result = "".join(self.char_map[idx] for idx in merged_path)
results.append(result)
confidence = np.exp(merged_probs).tolist()
confidences.append(confidence)
return results, confidences