detect and process dedicated lpr plates

This commit is contained in:
Josh Hawkins 2025-03-20 09:18:32 -05:00
parent 8bf1c216bb
commit ed1cffd4ae
2 changed files with 332 additions and 137 deletions

View File

@ -3,16 +3,22 @@
import datetime
import logging
import math
import random
import re
import string
from typing import List, Optional, Tuple
import cv2
import numpy as np
from Levenshtein import distance
from Levenshtein import distance, jaro_winkler
from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset
from shapely.geometry import Polygon
from frigate.comms.event_metadata_updater import EventMetadataTypeEnum
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.embeddings.onnx.lpr_embedding import LPR_EMBEDDING_SIZE
from frigate.util.image import area
logger = logging.getLogger(__name__)
@ -28,6 +34,8 @@ class LicensePlateProcessingMixin:
"license_plate" not in self.config.objects.all_objects
)
self.event_metadata_publisher = EventMetadataPublisher()
self.ctc_decoder = CTCDecoder()
self.batch_size = 6
@ -710,18 +718,38 @@ class LicensePlateProcessingMixin:
top_score = -1
top_box = None
img_h, img_w = input.shape[0], input.shape[1]
# Calculate resized dimensions and padding based on _preprocess_inputs
if img_w > img_h:
resized_h = int(((img_h / img_w) * LPR_EMBEDDING_SIZE) // 4 * 4)
resized_w = LPR_EMBEDDING_SIZE
x_offset = (LPR_EMBEDDING_SIZE - resized_w) // 2
y_offset = (LPR_EMBEDDING_SIZE - resized_h) // 2
scale_x = img_w / resized_w
scale_y = img_h / resized_h
else:
resized_w = int(((img_w / img_h) * 512) // 4 * 4)
resized_h = LPR_EMBEDDING_SIZE
x_offset = (LPR_EMBEDDING_SIZE - resized_w) // 2
y_offset = (LPR_EMBEDDING_SIZE - resized_h) // 2
scale_x = img_w / resized_w
scale_y = img_h / resized_h
# Loop over predictions
for prediction in predictions:
score = prediction[6]
if score >= confidence_threshold:
bbox = prediction[1:5]
# Scale boxes back to original image size
scale_x = input.shape[1] / 256
scale_y = input.shape[0] / 256
bbox[0] *= scale_x
bbox[1] *= scale_y
bbox[2] *= scale_x
bbox[3] *= scale_y
# Adjust for padding and scale to original image
bbox[0] = (bbox[0] - x_offset) * scale_x
bbox[1] = (bbox[1] - y_offset) * scale_y
bbox[2] = (bbox[2] - x_offset) * scale_x
bbox[3] = (bbox[3] - y_offset) * scale_y
if score > top_score:
top_score = score
top_box = bbox
if score > top_score:
top_score = score
@ -729,8 +757,8 @@ class LicensePlateProcessingMixin:
# Return the top scoring bounding box if found
if top_box is not None:
# expand box by 30% to help with OCR
expansion = (top_box[2:] - top_box[:2]) * 0.30
# expand box by 5% to help with OCR
expansion = (top_box[2:] - top_box[:2]) * 0.05
# Expand box
expanded_box = np.array(
@ -842,57 +870,54 @@ class LicensePlateProcessingMixin:
"""
self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10
def lpr_process(self, obj_data: dict[str, any], frame: np.ndarray):
def _generate_plate_event(self, camera: str, plate_score: float) -> str:
"""Generate a unique ID for a plate event based on camera and text."""
now = datetime.datetime.now().timestamp()
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_create,
(
now,
camera,
"license_plate",
event_id,
True,
plate_score,
None,
None,
"api",
{},
),
)
return event_id
def lpr_process(
self, obj_data: dict[str, any], frame: np.ndarray, dedicated_lpr: bool = None
):
"""Look for license plates in image."""
if not self.config.cameras[obj_data["camera"]].lpr.enabled:
camera = obj_data if dedicated_lpr else obj_data["camera"]
current_time = int(datetime.datetime.now().timestamp())
if not self.config.cameras[camera].lpr.enabled:
return
id = obj_data["id"]
# don't run for non car objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
return
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug("Not a processing license plate for a stationary car object.")
return
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return
license_plate: Optional[dict[str, any]] = None
if self.requires_license_plate_detection:
logger.debug("Running manual license_plate detection.")
car_box = obj_data.get("box")
if not car_box:
return
if dedicated_lpr:
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
left, top, right, bottom = car_box
car = rgb[top:bottom, left:right]
# double the size of the car for better box detection
car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0])))
# apply motion mask
rgb[self.config.cameras[obj_data].motion.mask == 0] = [0, 0, 0]
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/car_frame_{current_time}.jpg",
car,
f"debug/frames/dedicated_lpr_masked_{current_time}.jpg",
rgb,
)
yolov9_start = datetime.datetime.now().timestamp()
license_plate = self._detect_license_plate(car)
license_plate = self._detect_license_plate(rgb)
logger.debug(
f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms"
)
@ -901,94 +926,225 @@ class LicensePlateProcessingMixin:
)
if not license_plate:
logger.debug("Detected no license plates for car object.")
logger.debug("Detected no license plates in full frame.")
return
license_plate_area = max(
0,
(license_plate[2] - license_plate[0])
* (license_plate[3] - license_plate[1]),
license_plate_area = (license_plate[2] - license_plate[0]) * (
license_plate[3] - license_plate[1]
)
# check that license plate is valid
# double the value because we've doubled the size of the car
if (
license_plate_area
< self.config.cameras[obj_data["camera"]].lpr.min_area * 2
):
logger.debug("License plate is less than min_area")
if license_plate_area < self.lpr_config.min_area:
logger.debug("License plate area below minimum threshold.")
return
license_plate_frame = car[
license_plate[1] : license_plate[3], license_plate[0] : license_plate[2]
]
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get("score", 0.0) > license_plate.get(
"score", 0.0
):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return
license_plate_box = license_plate.get("box")
# check that license plate is valid
if (
not license_plate_box
or area(license_plate_box)
< self.config.cameras[obj_data["camera"]].lpr.min_area
):
logger.debug(f"Invalid license plate box {license_plate}")
return
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
# Expand the license_plate_box by 30%
box_array = np.array(license_plate_box)
expansion = (box_array[2:] - box_array[:2]) * 0.30
expanded_box = np.array(
[
license_plate_box[0] - expansion[0],
license_plate_box[1] - expansion[1],
license_plate_box[2] + expansion[0],
license_plate_box[3] + expansion[1],
]
).clip(0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2)
# Crop using the expanded box
license_plate_frame = license_plate_frame[
int(expanded_box[1]) : int(expanded_box[3]),
int(expanded_box[0]) : int(expanded_box[2]),
license_plate_frame = rgb[
license_plate[1] : license_plate[3],
license_plate[0] : license_plate[2],
]
# double the size of the license plate frame for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
(
int(2 * license_plate_frame.shape[1]),
int(2 * license_plate_frame.shape[0]),
),
)
if WRITE_DEBUG_IMAGES:
if license_plate:
frame_with_rect = rgb.copy()
cv2.rectangle(
frame_with_rect,
(
int(license_plate[0]),
int(license_plate[1]),
),
(
int(license_plate[2]),
int(license_plate[3]),
),
(0, 255, 0),
2,
)
cv2.imwrite(
f"debug/frames/dedicated_lpr_with_rect_{current_time}.jpg",
frame_with_rect,
)
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/license_plate_frame_{current_time}.jpg",
# Double the size for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
(
int(2 * license_plate_frame.shape[1]),
int(2 * license_plate_frame.shape[0]),
),
)
if WRITE_DEBUG_IMAGES:
cv2.imwrite(
f"debug/frames/dedicated_lpr_doubled_{current_time}.jpg",
license_plate_frame,
)
else:
id = obj_data["id"]
# don't run for non car objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
return
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug(
"Not a processing license plate for a stationary car object."
)
return
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return
license_plate: Optional[dict[str, any]] = None
if self.requires_license_plate_detection:
logger.debug("Running manual license_plate detection.")
car_box = obj_data.get("box")
if not car_box:
return
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
left, top, right, bottom = car_box
car = rgb[top:bottom, left:right]
# double the size of the car for better box detection
car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0])))
if WRITE_DEBUG_IMAGES:
cv2.imwrite(
f"debug/frames/car_frame_{current_time}.jpg",
car,
)
yolov9_start = datetime.datetime.now().timestamp()
license_plate = self._detect_license_plate(car)
logger.debug(
f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms"
)
self.__update_yolov9_metrics(
datetime.datetime.now().timestamp() - yolov9_start
)
if not license_plate:
logger.debug("Detected no license plates for car object.")
return
if WRITE_DEBUG_IMAGES and license_plate:
frame_with_rect = car.copy()
cv2.rectangle(
frame_with_rect,
(
int(license_plate[0]),
int(license_plate[1]),
),
(
int(license_plate[2]),
int(license_plate[3]),
),
(0, 255, 0),
2,
)
cv2.imwrite(
f"debug/frames/car_frame_with_rect_{current_time}.jpg",
frame_with_rect,
)
license_plate_area = max(
0,
(license_plate[2] - license_plate[0])
* (license_plate[3] - license_plate[1]),
)
# check that license plate is valid
# double the value because we've doubled the size of the car
if (
license_plate_area
< self.config.cameras[obj_data["camera"]].lpr.min_area * 2
):
logger.debug("License plate is less than min_area")
return
license_plate_frame = car[
license_plate[1] : license_plate[3],
license_plate[0] : license_plate[2],
]
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get(
"current_attributes", []
)
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get(
"score", 0.0
) > license_plate.get("score", 0.0):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return
license_plate_box = license_plate.get("box")
# check that license plate is valid
if (
not license_plate_box
or area(license_plate_box)
< self.config.cameras[obj_data["camera"]].lpr.min_area
):
logger.debug(f"Invalid license plate box {license_plate}")
return
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
# Expand the license_plate_box by 30%
box_array = np.array(license_plate_box)
expansion = (box_array[2:] - box_array[:2]) * 0.30
expanded_box = np.array(
[
license_plate_box[0] - expansion[0],
license_plate_box[1] - expansion[1],
license_plate_box[2] + expansion[0],
license_plate_box[3] + expansion[1],
]
).clip(
0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2
)
# Crop using the expanded box
license_plate_frame = license_plate_frame[
int(expanded_box[1]) : int(expanded_box[3]),
int(expanded_box[0]) : int(expanded_box[2]),
]
# double the size of the license plate frame for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
(
int(2 * license_plate_frame.shape[1]),
int(2 * license_plate_frame.shape[0]),
),
)
if WRITE_DEBUG_IMAGES:
cv2.imwrite(
f"debug/frames/license_plate_frame_{current_time}.jpg",
license_plate_frame,
)
start = datetime.datetime.now().timestamp()
# run detection, returns results sorted by confidence, best first
@ -1027,6 +1183,47 @@ class LicensePlateProcessingMixin:
else 0
)
# Check against minimum confidence threshold
if avg_confidence < self.lpr_config.recognition_threshold:
logger.debug(
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})"
)
return
# For LPR cameras, match or assign plate ID using Jaro-Winkler distance
if dedicated_lpr:
plate_id = None
# Similarity threshold for matching plates
jaro_winkler_threshold = 0.8
for existing_id, data in self.detected_license_plates.items():
if (
data["camera"] == camera
and data["last_seen"] is not None
and current_time - data["last_seen"]
<= self.config.cameras[camera].lpr.expire_time
):
similarity = jaro_winkler(data["plate"], top_plate)
if similarity >= jaro_winkler_threshold:
plate_id = existing_id
logger.debug(
f"Matched plate {top_plate} to {data['plate']} (similarity: {similarity:.3f})"
)
break
if plate_id is None:
# start new manual lpr event
plate_id = self._generate_plate_event(obj_data, avg_confidence)
logger.debug(
f"New plate event for dedicated LPR camera {plate_id}: {top_plate}"
)
else:
logger.debug(
f"Matched existing plate event for dedicated LPR camera {plate_id}: {top_plate}"
)
self.detected_license_plates[plate_id]["last_seen"] = current_time
id = plate_id
# Check if we have a previously detected plate for this ID
if id in self.detected_license_plates:
if self._should_keep_previous_plate(
@ -1035,13 +1232,6 @@ class LicensePlateProcessingMixin:
logger.debug("Keeping previous plate")
return
# Check against minimum confidence threshold
if avg_confidence < self.lpr_config.recognition_threshold:
logger.debug(
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})"
)
return
# Determine subLabel based on known plates, use regex matching
# Default to the detected plate, use label name if there's a match
sub_label = next(
@ -1063,6 +1253,7 @@ class LicensePlateProcessingMixin:
EventMetadataTypeEnum.sub_label, (id, sub_label, avg_confidence)
)
logger.debug("publishing plate for", id, top_plate)
self.sub_label_publisher.publish(
EventMetadataTypeEnum.recognized_license_plate,
(id, top_plate, avg_confidence),
@ -1073,6 +1264,8 @@ class LicensePlateProcessingMixin:
"char_confidences": top_char_confidences,
"area": top_area,
"obj_data": obj_data,
"camera": camera,
"last_seen": current_time if dedicated_lpr else None,
}
def handle_request(self, topic, request_data) -> dict[str, any] | None:

View File

@ -35,9 +35,11 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess
self.sub_label_publisher = sub_label_publisher
super().__init__(config, metrics)
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray):
def process_frame(
self, obj_data: dict[str, any], frame: np.ndarray, dedicated_lpr: bool = None
):
"""Look for license plates in image."""
self.lpr_process(obj_data, frame)
self.lpr_process(obj_data, frame, dedicated_lpr)
def handle_request(self, topic, request_data) -> dict[str, any] | None:
return