LPR improvements (#17289)

* config options

* processing in maintainer

* detect and process dedicated lpr plates

* create camera type, add manual event and save snapshot

* use const

* ensure lpr events are always detections, typing fixes

* docs

* docs tweaks

* add preprocessing and penalization for low confidence chars
This commit is contained in:
Josh Hawkins
2025-03-23 14:30:48 -05:00
committed by GitHub
parent b7fcd41737
commit fa4643fddf
14 changed files with 706 additions and 194 deletions

View File

@@ -1,18 +1,26 @@
"""Handle processing images for face detection and recognition."""
import base64
import datetime
import logging
import math
import random
import re
import string
from typing import List, Optional, Tuple
import cv2
import numpy as np
from Levenshtein import distance
from Levenshtein import distance, jaro_winkler
from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset
from shapely.geometry import Polygon
from frigate.comms.event_metadata_updater import EventMetadataTypeEnum
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.config.camera.camera import CameraTypeEnum
from frigate.embeddings.onnx.lpr_embedding import LPR_EMBEDDING_SIZE
from frigate.util.image import area
logger = logging.getLogger(__name__)
@@ -28,6 +36,8 @@ class LicensePlateProcessingMixin:
"license_plate" not in self.config.objects.all_objects
)
self.event_metadata_publisher = EventMetadataPublisher()
self.ctc_decoder = CTCDecoder()
self.batch_size = 6
@@ -38,6 +48,9 @@ class LicensePlateProcessingMixin:
self.box_thresh = 0.6
self.mask_thresh = 0.6
# matching
self.similarity_threshold = 0.8
def _detect(self, image: np.ndarray) -> List[np.ndarray]:
"""
Detect possible license plates in the input image by first resizing and normalizing it,
@@ -197,11 +210,8 @@ class LicensePlateProcessingMixin:
# set to True to write each cropped image for debugging
if False:
save_image = cv2.cvtColor(
plate_images[original_idx], cv2.COLOR_RGB2BGR
)
filename = f"debug/frames/plate_{original_idx}_{plate}_{area}.jpg"
cv2.imwrite(filename, save_image)
cv2.imwrite(filename, plate_images[original_idx])
license_plates[original_idx] = plate
average_confidences[original_idx] = average_confidence
@@ -320,7 +330,7 @@ class LicensePlateProcessingMixin:
# Use pyclipper to shrink the polygon slightly based on the computed distance.
offset = PyclipperOffset()
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON)
points = np.array(offset.Execute(distance * 1.75)).reshape((-1, 1, 2))
points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2))
# get the minimum bounding box around the shrunken polygon.
box, min_side = self._get_min_boxes(points)
@@ -624,6 +634,47 @@ class LicensePlateProcessingMixin:
assert image.shape[2] == input_shape[0], "Unexpected number of image channels."
# convert to grayscale
if image.shape[2] == 3:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
else:
gray = image
# detect noise with Laplacian variance
laplacian = cv2.Laplacian(gray, cv2.CV_64F)
noise_variance = np.var(laplacian)
brightness = cv2.mean(gray)[0]
noise_threshold = 70
brightness_threshold = 150
is_noisy = (
noise_variance > noise_threshold and brightness < brightness_threshold
)
# apply bilateral filter and sharpening only if noisy
if is_noisy:
logger.debug(
f"Noise detected (variance: {noise_variance:.1f}, brightness: {brightness:.1f}) - denoising"
)
smoothed = cv2.bilateralFilter(gray, d=15, sigmaColor=100, sigmaSpace=100)
sharpening_kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
processed = cv2.filter2D(smoothed, -1, sharpening_kernel)
else:
logger.debug(
f"No noise detected (variance: {noise_variance:.1f}, brightness: {brightness:.1f}) - skipping denoising and sharpening"
)
processed = gray
# apply CLAHE for contrast enhancement
grid_size = (
max(4, input_w // 40),
max(4, input_h // 40),
)
clahe = cv2.createCLAHE(clipLimit=1.5, tileGridSize=grid_size)
enhanced = clahe.apply(processed)
# Convert back to 3-channel for model compatibility
image = cv2.cvtColor(enhanced, cv2.COLOR_GRAY2RGB)
# dynamically adjust input width based on max_wh_ratio
input_w = int(input_h * max_wh_ratio)
@@ -649,6 +700,13 @@ class LicensePlateProcessingMixin:
)
padded_image[:, :, :resized_w] = resized_image
if False:
current_time = int(datetime.datetime.now().timestamp() * 1000)
cv2.imwrite(
f"debug/frames/preprocessed_recognition_{current_time}.jpg",
image,
)
return padded_image
@staticmethod
@@ -710,18 +768,38 @@ class LicensePlateProcessingMixin:
top_score = -1
top_box = None
img_h, img_w = input.shape[0], input.shape[1]
# Calculate resized dimensions and padding based on _preprocess_inputs
if img_w > img_h:
resized_h = int(((img_h / img_w) * LPR_EMBEDDING_SIZE) // 4 * 4)
resized_w = LPR_EMBEDDING_SIZE
x_offset = (LPR_EMBEDDING_SIZE - resized_w) // 2
y_offset = (LPR_EMBEDDING_SIZE - resized_h) // 2
scale_x = img_w / resized_w
scale_y = img_h / resized_h
else:
resized_w = int(((img_w / img_h) * LPR_EMBEDDING_SIZE) // 4 * 4)
resized_h = LPR_EMBEDDING_SIZE
x_offset = (LPR_EMBEDDING_SIZE - resized_w) // 2
y_offset = (LPR_EMBEDDING_SIZE - resized_h) // 2
scale_x = img_w / resized_w
scale_y = img_h / resized_h
# Loop over predictions
for prediction in predictions:
score = prediction[6]
if score >= confidence_threshold:
bbox = prediction[1:5]
# Scale boxes back to original image size
scale_x = input.shape[1] / 256
scale_y = input.shape[0] / 256
bbox[0] *= scale_x
bbox[1] *= scale_y
bbox[2] *= scale_x
bbox[3] *= scale_y
# Adjust for padding and scale to original image
bbox[0] = (bbox[0] - x_offset) * scale_x
bbox[1] = (bbox[1] - y_offset) * scale_y
bbox[2] = (bbox[2] - x_offset) * scale_x
bbox[3] = (bbox[3] - y_offset) * scale_y
if score > top_score:
top_score = score
top_box = bbox
if score > top_score:
top_score = score
@@ -729,8 +807,8 @@ class LicensePlateProcessingMixin:
# Return the top scoring bounding box if found
if top_box is not None:
# expand box by 30% to help with OCR
expansion = (top_box[2:] - top_box[:2]) * 0.30
# expand box by 5% to help with OCR
expansion = (top_box[2:] - top_box[:2]) * 0.05
# Expand box
expanded_box = np.array(
@@ -750,6 +828,7 @@ class LicensePlateProcessingMixin:
def _should_keep_previous_plate(
self, id, top_plate, top_char_confidences, top_area, avg_confidence
):
"""Determine if the previous plate should be kept over the current one."""
if id not in self.detected_license_plates:
return False
@@ -764,68 +843,88 @@ class LicensePlateProcessingMixin:
)
# 1. Normalize metrics
# Length score - use relative comparison
# If lengths are equal, score is 0.5 for both
# If one is longer, it gets a higher score up to 1.0
max_length_diff = 4 # Maximum expected difference in plate lengths
# Length score: Equal lengths = 0.5, penalize extra characters if low confidence
length_diff = len(top_plate) - len(prev_plate)
curr_length_score = 0.5 + (
length_diff / (2 * max_length_diff)
) # Normalize to 0-1
curr_length_score = max(0, min(1, curr_length_score)) # Clamp to 0-1
prev_length_score = 1 - curr_length_score # Inverse relationship
max_length_diff = 3
curr_length_score = 0.5 + (length_diff / (2 * max_length_diff))
curr_length_score = max(0, min(1, curr_length_score))
prev_length_score = 0.5 - (length_diff / (2 * max_length_diff))
prev_length_score = max(0, min(1, prev_length_score))
# Area score (normalize based on max of current and previous)
# Adjust length score based on confidence of extra characters
conf_threshold = 0.75 # Minimum confidence for a character to be "trusted"
if len(top_plate) > len(prev_plate):
extra_conf = min(
top_char_confidences[len(prev_plate) :]
) # Lowest extra char confidence
if extra_conf < conf_threshold:
curr_length_score *= extra_conf / conf_threshold # Penalize if weak
elif len(prev_plate) > len(top_plate):
extra_conf = min(prev_char_confidences[len(top_plate) :])
if extra_conf < conf_threshold:
prev_length_score *= extra_conf / conf_threshold
# Area score: Normalize by max area
max_area = max(top_area, prev_area)
curr_area_score = top_area / max_area
prev_area_score = prev_area / max_area
curr_area_score = top_area / max_area if max_area > 0 else 0
prev_area_score = prev_area / max_area if max_area > 0 else 0
# Average confidence score (already normalized 0-1)
# Confidence scores
curr_conf_score = avg_confidence
prev_conf_score = prev_avg_confidence
# Character confidence comparison score
# Character confidence comparison (average over shared length)
min_length = min(len(top_plate), len(prev_plate))
if min_length > 0:
curr_char_conf = sum(top_char_confidences[:min_length]) / min_length
prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length
else:
curr_char_conf = 0
prev_char_conf = 0
curr_char_conf = prev_char_conf = 0
# 2. Define weights
# Penalize any character below threshold
curr_min_conf = min(top_char_confidences) if top_char_confidences else 0
prev_min_conf = min(prev_char_confidences) if prev_char_confidences else 0
curr_conf_penalty = (
1.0 if curr_min_conf >= conf_threshold else (curr_min_conf / conf_threshold)
)
prev_conf_penalty = (
1.0 if prev_min_conf >= conf_threshold else (prev_min_conf / conf_threshold)
)
# 2. Define weights (boost confidence importance)
weights = {
"length": 0.4,
"area": 0.3,
"avg_confidence": 0.2,
"char_confidence": 0.1,
"length": 0.2,
"area": 0.2,
"avg_confidence": 0.35,
"char_confidence": 0.25,
}
# 3. Calculate weighted scores
# 3. Calculate weighted scores with penalty
curr_score = (
curr_length_score * weights["length"]
+ curr_area_score * weights["area"]
+ curr_conf_score * weights["avg_confidence"]
+ curr_char_conf * weights["char_confidence"]
)
) * curr_conf_penalty
prev_score = (
prev_length_score * weights["length"]
+ prev_area_score * weights["area"]
+ prev_conf_score * weights["avg_confidence"]
+ prev_char_conf * weights["char_confidence"]
)
) * prev_conf_penalty
# 4. Log the comparison for debugging
# 4. Log the comparison
logger.debug(
f"Plate comparison - Current plate: {top_plate} (score: {curr_score:.3f}) vs "
f"Previous plate: {prev_plate} (score: {prev_score:.3f})\n"
f"Plate comparison - Current: {top_plate} (score: {curr_score:.3f}, min_conf: {curr_min_conf:.2f}) vs "
f"Previous: {prev_plate} (score: {prev_score:.3f}, min_conf: {prev_min_conf:.2f})\n"
f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), "
f"Area: {top_area} vs {prev_area}, "
f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}"
f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}, "
f"Char Conf: {curr_char_conf:.2f} vs {prev_char_conf:.2f}"
)
# 5. Return True if we should keep the previous plate (i.e., if it scores higher)
# 5. Return True if previous plate scores higher
return prev_score > curr_score
def __update_yolov9_metrics(self, duration: float) -> None:
@@ -842,57 +941,55 @@ class LicensePlateProcessingMixin:
"""
self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10
def lpr_process(self, obj_data: dict[str, any], frame: np.ndarray):
def _generate_plate_event(self, camera: str, plate: str, plate_score: float) -> str:
"""Generate a unique ID for a plate event based on camera and text."""
now = datetime.datetime.now().timestamp()
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.lpr_event_create,
(
now,
camera,
"car",
event_id,
True,
plate_score,
None,
plate,
),
)
return event_id
def lpr_process(
self, obj_data: dict[str, any], frame: np.ndarray, dedicated_lpr: bool = False
):
"""Look for license plates in image."""
if not self.config.cameras[obj_data["camera"]].lpr.enabled:
camera = obj_data if dedicated_lpr else obj_data["camera"]
current_time = int(datetime.datetime.now().timestamp())
if not self.config.cameras[camera].lpr.enabled:
return
id = obj_data["id"]
# don't run for non car objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
if not dedicated_lpr and self.config.cameras[camera].type == CameraTypeEnum.lpr:
return
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug("Not a processing license plate for a stationary car object.")
return
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return
license_plate: Optional[dict[str, any]] = None
if self.requires_license_plate_detection:
logger.debug("Running manual license_plate detection.")
car_box = obj_data.get("box")
if not car_box:
return
if dedicated_lpr:
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
left, top, right, bottom = car_box
car = rgb[top:bottom, left:right]
# double the size of the car for better box detection
car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0])))
# apply motion mask
rgb[self.config.cameras[obj_data].motion.mask == 0] = [0, 0, 0]
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/car_frame_{current_time}.jpg",
car,
f"debug/frames/dedicated_lpr_masked_{current_time}.jpg",
rgb,
)
yolov9_start = datetime.datetime.now().timestamp()
license_plate = self._detect_license_plate(car)
license_plate = self._detect_license_plate(rgb)
logger.debug(
f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms"
)
@@ -901,107 +998,185 @@ class LicensePlateProcessingMixin:
)
if not license_plate:
logger.debug("Detected no license plates for car object.")
logger.debug("Detected no license plates in full frame.")
return
license_plate_area = max(
0,
(license_plate[2] - license_plate[0])
* (license_plate[3] - license_plate[1]),
license_plate_area = (license_plate[2] - license_plate[0]) * (
license_plate[3] - license_plate[1]
)
# check that license plate is valid
# double the value because we've doubled the size of the car
if (
license_plate_area
< self.config.cameras[obj_data["camera"]].lpr.min_area * 2
):
logger.debug("License plate is less than min_area")
if license_plate_area < self.lpr_config.min_area:
logger.debug("License plate area below minimum threshold.")
return
license_plate_frame = car[
license_plate[1] : license_plate[3], license_plate[0] : license_plate[2]
]
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get("score", 0.0) > license_plate.get(
"score", 0.0
):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return
license_plate_box = license_plate.get("box")
# check that license plate is valid
if (
not license_plate_box
or area(license_plate_box)
< self.config.cameras[obj_data["camera"]].lpr.min_area
):
logger.debug(f"Invalid license plate box {license_plate}")
return
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
# Expand the license_plate_box by 30%
box_array = np.array(license_plate_box)
expansion = (box_array[2:] - box_array[:2]) * 0.30
expanded_box = np.array(
[
license_plate_box[0] - expansion[0],
license_plate_box[1] - expansion[1],
license_plate_box[2] + expansion[0],
license_plate_box[3] + expansion[1],
]
).clip(0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2)
# Crop using the expanded box
license_plate_frame = license_plate_frame[
int(expanded_box[1]) : int(expanded_box[3]),
int(expanded_box[0]) : int(expanded_box[2]),
license_plate_frame = rgb[
license_plate[1] : license_plate[3],
license_plate[0] : license_plate[2],
]
# double the size of the license plate frame for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
(
int(2 * license_plate_frame.shape[1]),
int(2 * license_plate_frame.shape[0]),
),
)
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/license_plate_frame_{current_time}.jpg",
# Double the size for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
(
int(2 * license_plate_frame.shape[1]),
int(2 * license_plate_frame.shape[0]),
),
)
start = datetime.datetime.now().timestamp()
else:
id = obj_data["id"]
# don't run for non car objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
return
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug(
"Not a processing license plate for a stationary car object."
)
return
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return
license_plate: Optional[dict[str, any]] = None
if self.requires_license_plate_detection:
logger.debug("Running manual license_plate detection.")
car_box = obj_data.get("box")
if not car_box:
return
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
left, top, right, bottom = car_box
car = rgb[top:bottom, left:right]
# double the size of the car for better box detection
car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0])))
if WRITE_DEBUG_IMAGES:
cv2.imwrite(
f"debug/frames/car_frame_{current_time}.jpg",
car,
)
yolov9_start = datetime.datetime.now().timestamp()
license_plate = self._detect_license_plate(car)
logger.debug(
f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms"
)
self.__update_yolov9_metrics(
datetime.datetime.now().timestamp() - yolov9_start
)
if not license_plate:
logger.debug("Detected no license plates for car object.")
return
license_plate_area = max(
0,
(license_plate[2] - license_plate[0])
* (license_plate[3] - license_plate[1]),
)
# check that license plate is valid
# double the value because we've doubled the size of the car
if (
license_plate_area
< self.config.cameras[obj_data["camera"]].lpr.min_area * 2
):
logger.debug("License plate is less than min_area")
return
license_plate_frame = car[
license_plate[1] : license_plate[3],
license_plate[0] : license_plate[2],
]
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get(
"current_attributes", []
)
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get(
"score", 0.0
) > license_plate.get("score", 0.0):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return
license_plate_box = license_plate.get("box")
# check that license plate is valid
if (
not license_plate_box
or area(license_plate_box)
< self.config.cameras[obj_data["camera"]].lpr.min_area
):
logger.debug(f"Invalid license plate box {license_plate}")
return
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
# Expand the license_plate_box by 30%
box_array = np.array(license_plate_box)
expansion = (box_array[2:] - box_array[:2]) * 0.30
expanded_box = np.array(
[
license_plate_box[0] - expansion[0],
license_plate_box[1] - expansion[1],
license_plate_box[2] + expansion[0],
license_plate_box[3] + expansion[1],
]
).clip(
0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2
)
# Crop using the expanded box
license_plate_frame = license_plate_frame[
int(expanded_box[1]) : int(expanded_box[3]),
int(expanded_box[0]) : int(expanded_box[2]),
]
# double the size of the license plate frame for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
(
int(2 * license_plate_frame.shape[1]),
int(2 * license_plate_frame.shape[0]),
),
)
if WRITE_DEBUG_IMAGES:
cv2.imwrite(
f"debug/frames/license_plate_frame_{current_time}.jpg",
license_plate_frame,
)
# run detection, returns results sorted by confidence, best first
start = datetime.datetime.now().timestamp()
license_plates, confidences, areas = self._process_license_plate(
license_plate_frame
)
self.__update_lpr_metrics(datetime.datetime.now().timestamp() - start)
logger.debug(f"Text boxes: {license_plates}")
logger.debug(f"Confidences: {confidences}")
logger.debug(f"Areas: {areas}")
if license_plates:
for plate, confidence, text_area in zip(license_plates, confidences, areas):
avg_confidence = (
@@ -1012,7 +1187,6 @@ class LicensePlateProcessingMixin:
f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
)
else:
# no plates found
logger.debug("No text detected")
return
@@ -1027,6 +1201,46 @@ class LicensePlateProcessingMixin:
else 0
)
# Check against minimum confidence threshold
if avg_confidence < self.lpr_config.recognition_threshold:
logger.debug(
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})"
)
return
# For LPR cameras, match or assign plate ID using Jaro-Winkler distance
if dedicated_lpr:
plate_id = None
for existing_id, data in self.detected_license_plates.items():
if (
data["camera"] == camera
and data["last_seen"] is not None
and current_time - data["last_seen"]
<= self.config.cameras[camera].lpr.expire_time
):
similarity = jaro_winkler(data["plate"], top_plate)
if similarity >= self.similarity_threshold:
plate_id = existing_id
logger.debug(
f"Matched plate {top_plate} to {data['plate']} (similarity: {similarity:.3f})"
)
break
if plate_id is None:
plate_id = self._generate_plate_event(
obj_data, top_plate, avg_confidence
)
logger.debug(
f"New plate event for dedicated LPR camera {plate_id}: {top_plate}"
)
else:
logger.debug(
f"Matched existing plate event for dedicated LPR camera {plate_id}: {top_plate}"
)
self.detected_license_plates[plate_id]["last_seen"] = current_time
id = plate_id
# Check if we have a previously detected plate for this ID
if id in self.detected_license_plates:
if self._should_keep_previous_plate(
@@ -1035,13 +1249,6 @@ class LicensePlateProcessingMixin:
logger.debug("Keeping previous plate")
return
# Check against minimum confidence threshold
if avg_confidence < self.lpr_config.recognition_threshold:
logger.debug(
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})"
)
return
# Determine subLabel based on known plates, use regex matching
# Default to the detected plate, use label name if there's a match
sub_label = next(
@@ -1068,11 +1275,23 @@ class LicensePlateProcessingMixin:
(id, top_plate, avg_confidence),
)
if dedicated_lpr:
# save the best snapshot
logger.debug(f"Writing snapshot for {id}, {top_plate}, {current_time}")
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
_, buffer = cv2.imencode(".jpg", frame_bgr)
self.sub_label_publisher.publish(
EventMetadataTypeEnum.save_lpr_snapshot,
(base64.b64encode(buffer).decode("ASCII"), id, camera),
)
self.detected_license_plates[id] = {
"plate": top_plate,
"char_confidences": top_char_confidences,
"area": top_area,
"obj_data": obj_data,
"camera": camera,
"last_seen": current_time if dedicated_lpr else None,
}
def handle_request(self, topic, request_data) -> dict[str, any] | None: