From ed1cffd4ae7a1600fb63606a49de6e5e29133a53 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Mar 2025 09:18:32 -0500 Subject: [PATCH] detect and process dedicated lpr plates --- .../common/license_plate/mixin.py | 463 +++++++++++++----- .../real_time/license_plate.py | 6 +- 2 files changed, 332 insertions(+), 137 deletions(-) diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index 52f59e71a..a8ae7feb8 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -3,16 +3,22 @@ import datetime import logging import math +import random import re +import string from typing import List, Optional, Tuple import cv2 import numpy as np -from Levenshtein import distance +from Levenshtein import distance, jaro_winkler from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset from shapely.geometry import Polygon -from frigate.comms.event_metadata_updater import EventMetadataTypeEnum +from frigate.comms.event_metadata_updater import ( + EventMetadataPublisher, + EventMetadataTypeEnum, +) +from frigate.embeddings.onnx.lpr_embedding import LPR_EMBEDDING_SIZE from frigate.util.image import area logger = logging.getLogger(__name__) @@ -28,6 +34,8 @@ class LicensePlateProcessingMixin: "license_plate" not in self.config.objects.all_objects ) + self.event_metadata_publisher = EventMetadataPublisher() + self.ctc_decoder = CTCDecoder() self.batch_size = 6 @@ -710,18 +718,38 @@ class LicensePlateProcessingMixin: top_score = -1 top_box = None + img_h, img_w = input.shape[0], input.shape[1] + + # Calculate resized dimensions and padding based on _preprocess_inputs + if img_w > img_h: + resized_h = int(((img_h / img_w) * LPR_EMBEDDING_SIZE) // 4 * 4) + resized_w = LPR_EMBEDDING_SIZE + x_offset = (LPR_EMBEDDING_SIZE - resized_w) // 2 + y_offset = (LPR_EMBEDDING_SIZE - resized_h) // 2 + scale_x = img_w / resized_w + scale_y = img_h / resized_h + else: + resized_w = int(((img_w / img_h) * 512) // 4 * 4) + resized_h = LPR_EMBEDDING_SIZE + x_offset = (LPR_EMBEDDING_SIZE - resized_w) // 2 + y_offset = (LPR_EMBEDDING_SIZE - resized_h) // 2 + scale_x = img_w / resized_w + scale_y = img_h / resized_h + # Loop over predictions for prediction in predictions: score = prediction[6] if score >= confidence_threshold: bbox = prediction[1:5] - # Scale boxes back to original image size - scale_x = input.shape[1] / 256 - scale_y = input.shape[0] / 256 - bbox[0] *= scale_x - bbox[1] *= scale_y - bbox[2] *= scale_x - bbox[3] *= scale_y + # Adjust for padding and scale to original image + bbox[0] = (bbox[0] - x_offset) * scale_x + bbox[1] = (bbox[1] - y_offset) * scale_y + bbox[2] = (bbox[2] - x_offset) * scale_x + bbox[3] = (bbox[3] - y_offset) * scale_y + + if score > top_score: + top_score = score + top_box = bbox if score > top_score: top_score = score @@ -729,8 +757,8 @@ class LicensePlateProcessingMixin: # Return the top scoring bounding box if found if top_box is not None: - # expand box by 30% to help with OCR - expansion = (top_box[2:] - top_box[:2]) * 0.30 + # expand box by 5% to help with OCR + expansion = (top_box[2:] - top_box[:2]) * 0.05 # Expand box expanded_box = np.array( @@ -842,57 +870,54 @@ class LicensePlateProcessingMixin: """ self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 - def lpr_process(self, obj_data: dict[str, any], frame: np.ndarray): + def _generate_plate_event(self, camera: str, plate_score: float) -> str: + """Generate a unique ID for a plate event based on camera and text.""" + now = datetime.datetime.now().timestamp() + rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) + event_id = f"{now}-{rand_id}" + + self.event_metadata_publisher.publish( + EventMetadataTypeEnum.manual_event_create, + ( + now, + camera, + "license_plate", + event_id, + True, + plate_score, + None, + None, + "api", + {}, + ), + ) + return event_id + + def lpr_process( + self, obj_data: dict[str, any], frame: np.ndarray, dedicated_lpr: bool = None + ): """Look for license plates in image.""" - if not self.config.cameras[obj_data["camera"]].lpr.enabled: + camera = obj_data if dedicated_lpr else obj_data["camera"] + current_time = int(datetime.datetime.now().timestamp()) + + if not self.config.cameras[camera].lpr.enabled: return - id = obj_data["id"] - - # don't run for non car objects - if obj_data.get("label") != "car": - logger.debug("Not a processing license plate for non car object.") - return - - # don't run for stationary car objects - if obj_data.get("stationary") == True: - logger.debug("Not a processing license plate for a stationary car object.") - return - - # don't overwrite sub label for objects that have a sub label - # that is not a license plate - if obj_data.get("sub_label") and id not in self.detected_license_plates: - logger.debug( - f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}." - ) - return - - license_plate: Optional[dict[str, any]] = None - - if self.requires_license_plate_detection: - logger.debug("Running manual license_plate detection.") - - car_box = obj_data.get("box") - - if not car_box: - return - + if dedicated_lpr: rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) - left, top, right, bottom = car_box - car = rgb[top:bottom, left:right] - # double the size of the car for better box detection - car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0]))) + # apply motion mask + rgb[self.config.cameras[obj_data].motion.mask == 0] = [0, 0, 0] if WRITE_DEBUG_IMAGES: - current_time = int(datetime.datetime.now().timestamp()) cv2.imwrite( - f"debug/frames/car_frame_{current_time}.jpg", - car, + f"debug/frames/dedicated_lpr_masked_{current_time}.jpg", + rgb, ) yolov9_start = datetime.datetime.now().timestamp() - license_plate = self._detect_license_plate(car) + license_plate = self._detect_license_plate(rgb) + logger.debug( f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms" ) @@ -901,94 +926,225 @@ class LicensePlateProcessingMixin: ) if not license_plate: - logger.debug("Detected no license plates for car object.") + logger.debug("Detected no license plates in full frame.") return - license_plate_area = max( - 0, - (license_plate[2] - license_plate[0]) - * (license_plate[3] - license_plate[1]), + license_plate_area = (license_plate[2] - license_plate[0]) * ( + license_plate[3] - license_plate[1] ) - - # check that license plate is valid - # double the value because we've doubled the size of the car - if ( - license_plate_area - < self.config.cameras[obj_data["camera"]].lpr.min_area * 2 - ): - logger.debug("License plate is less than min_area") + if license_plate_area < self.lpr_config.min_area: + logger.debug("License plate area below minimum threshold.") return - license_plate_frame = car[ - license_plate[1] : license_plate[3], license_plate[0] : license_plate[2] - ] - else: - # don't run for object without attributes - if not obj_data.get("current_attributes"): - logger.debug("No attributes to parse.") - return - - attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) - for attr in attributes: - if attr.get("label") != "license_plate": - continue - - if license_plate is None or attr.get("score", 0.0) > license_plate.get( - "score", 0.0 - ): - license_plate = attr - - # no license plates detected in this frame - if not license_plate: - return - - license_plate_box = license_plate.get("box") - - # check that license plate is valid - if ( - not license_plate_box - or area(license_plate_box) - < self.config.cameras[obj_data["camera"]].lpr.min_area - ): - logger.debug(f"Invalid license plate box {license_plate}") - return - - license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) - - # Expand the license_plate_box by 30% - box_array = np.array(license_plate_box) - expansion = (box_array[2:] - box_array[:2]) * 0.30 - expanded_box = np.array( - [ - license_plate_box[0] - expansion[0], - license_plate_box[1] - expansion[1], - license_plate_box[2] + expansion[0], - license_plate_box[3] + expansion[1], - ] - ).clip(0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2) - - # Crop using the expanded box - license_plate_frame = license_plate_frame[ - int(expanded_box[1]) : int(expanded_box[3]), - int(expanded_box[0]) : int(expanded_box[2]), + license_plate_frame = rgb[ + license_plate[1] : license_plate[3], + license_plate[0] : license_plate[2], ] - # double the size of the license plate frame for better OCR - license_plate_frame = cv2.resize( - license_plate_frame, - ( - int(2 * license_plate_frame.shape[1]), - int(2 * license_plate_frame.shape[0]), - ), - ) + if WRITE_DEBUG_IMAGES: + if license_plate: + frame_with_rect = rgb.copy() + cv2.rectangle( + frame_with_rect, + ( + int(license_plate[0]), + int(license_plate[1]), + ), + ( + int(license_plate[2]), + int(license_plate[3]), + ), + (0, 255, 0), + 2, + ) + cv2.imwrite( + f"debug/frames/dedicated_lpr_with_rect_{current_time}.jpg", + frame_with_rect, + ) - if WRITE_DEBUG_IMAGES: - current_time = int(datetime.datetime.now().timestamp()) - cv2.imwrite( - f"debug/frames/license_plate_frame_{current_time}.jpg", + # Double the size for better OCR + license_plate_frame = cv2.resize( license_plate_frame, + ( + int(2 * license_plate_frame.shape[1]), + int(2 * license_plate_frame.shape[0]), + ), ) + if WRITE_DEBUG_IMAGES: + cv2.imwrite( + f"debug/frames/dedicated_lpr_doubled_{current_time}.jpg", + license_plate_frame, + ) + + else: + id = obj_data["id"] + + # don't run for non car objects + if obj_data.get("label") != "car": + logger.debug("Not a processing license plate for non car object.") + return + + # don't run for stationary car objects + if obj_data.get("stationary") == True: + logger.debug( + "Not a processing license plate for a stationary car object." + ) + return + + # don't overwrite sub label for objects that have a sub label + # that is not a license plate + if obj_data.get("sub_label") and id not in self.detected_license_plates: + logger.debug( + f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}." + ) + return + + license_plate: Optional[dict[str, any]] = None + + if self.requires_license_plate_detection: + logger.debug("Running manual license_plate detection.") + + car_box = obj_data.get("box") + + if not car_box: + return + + rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) + left, top, right, bottom = car_box + car = rgb[top:bottom, left:right] + + # double the size of the car for better box detection + car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0]))) + + if WRITE_DEBUG_IMAGES: + cv2.imwrite( + f"debug/frames/car_frame_{current_time}.jpg", + car, + ) + + yolov9_start = datetime.datetime.now().timestamp() + license_plate = self._detect_license_plate(car) + logger.debug( + f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms" + ) + self.__update_yolov9_metrics( + datetime.datetime.now().timestamp() - yolov9_start + ) + + if not license_plate: + logger.debug("Detected no license plates for car object.") + return + + if WRITE_DEBUG_IMAGES and license_plate: + frame_with_rect = car.copy() + cv2.rectangle( + frame_with_rect, + ( + int(license_plate[0]), + int(license_plate[1]), + ), + ( + int(license_plate[2]), + int(license_plate[3]), + ), + (0, 255, 0), + 2, + ) + cv2.imwrite( + f"debug/frames/car_frame_with_rect_{current_time}.jpg", + frame_with_rect, + ) + + license_plate_area = max( + 0, + (license_plate[2] - license_plate[0]) + * (license_plate[3] - license_plate[1]), + ) + + # check that license plate is valid + # double the value because we've doubled the size of the car + if ( + license_plate_area + < self.config.cameras[obj_data["camera"]].lpr.min_area * 2 + ): + logger.debug("License plate is less than min_area") + return + + license_plate_frame = car[ + license_plate[1] : license_plate[3], + license_plate[0] : license_plate[2], + ] + else: + # don't run for object without attributes + if not obj_data.get("current_attributes"): + logger.debug("No attributes to parse.") + return + + attributes: list[dict[str, any]] = obj_data.get( + "current_attributes", [] + ) + for attr in attributes: + if attr.get("label") != "license_plate": + continue + + if license_plate is None or attr.get( + "score", 0.0 + ) > license_plate.get("score", 0.0): + license_plate = attr + + # no license plates detected in this frame + if not license_plate: + return + + license_plate_box = license_plate.get("box") + + # check that license plate is valid + if ( + not license_plate_box + or area(license_plate_box) + < self.config.cameras[obj_data["camera"]].lpr.min_area + ): + logger.debug(f"Invalid license plate box {license_plate}") + return + + license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) + + # Expand the license_plate_box by 30% + box_array = np.array(license_plate_box) + expansion = (box_array[2:] - box_array[:2]) * 0.30 + expanded_box = np.array( + [ + license_plate_box[0] - expansion[0], + license_plate_box[1] - expansion[1], + license_plate_box[2] + expansion[0], + license_plate_box[3] + expansion[1], + ] + ).clip( + 0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2 + ) + + # Crop using the expanded box + license_plate_frame = license_plate_frame[ + int(expanded_box[1]) : int(expanded_box[3]), + int(expanded_box[0]) : int(expanded_box[2]), + ] + + # double the size of the license plate frame for better OCR + license_plate_frame = cv2.resize( + license_plate_frame, + ( + int(2 * license_plate_frame.shape[1]), + int(2 * license_plate_frame.shape[0]), + ), + ) + + if WRITE_DEBUG_IMAGES: + cv2.imwrite( + f"debug/frames/license_plate_frame_{current_time}.jpg", + license_plate_frame, + ) + start = datetime.datetime.now().timestamp() # run detection, returns results sorted by confidence, best first @@ -1027,6 +1183,47 @@ class LicensePlateProcessingMixin: else 0 ) + # Check against minimum confidence threshold + if avg_confidence < self.lpr_config.recognition_threshold: + logger.debug( + f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})" + ) + return + + # For LPR cameras, match or assign plate ID using Jaro-Winkler distance + if dedicated_lpr: + plate_id = None + # Similarity threshold for matching plates + jaro_winkler_threshold = 0.8 + + for existing_id, data in self.detected_license_plates.items(): + if ( + data["camera"] == camera + and data["last_seen"] is not None + and current_time - data["last_seen"] + <= self.config.cameras[camera].lpr.expire_time + ): + similarity = jaro_winkler(data["plate"], top_plate) + if similarity >= jaro_winkler_threshold: + plate_id = existing_id + logger.debug( + f"Matched plate {top_plate} to {data['plate']} (similarity: {similarity:.3f})" + ) + break + if plate_id is None: + # start new manual lpr event + plate_id = self._generate_plate_event(obj_data, avg_confidence) + logger.debug( + f"New plate event for dedicated LPR camera {plate_id}: {top_plate}" + ) + else: + logger.debug( + f"Matched existing plate event for dedicated LPR camera {plate_id}: {top_plate}" + ) + self.detected_license_plates[plate_id]["last_seen"] = current_time + + id = plate_id + # Check if we have a previously detected plate for this ID if id in self.detected_license_plates: if self._should_keep_previous_plate( @@ -1035,13 +1232,6 @@ class LicensePlateProcessingMixin: logger.debug("Keeping previous plate") return - # Check against minimum confidence threshold - if avg_confidence < self.lpr_config.recognition_threshold: - logger.debug( - f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})" - ) - return - # Determine subLabel based on known plates, use regex matching # Default to the detected plate, use label name if there's a match sub_label = next( @@ -1063,6 +1253,7 @@ class LicensePlateProcessingMixin: EventMetadataTypeEnum.sub_label, (id, sub_label, avg_confidence) ) + logger.debug("publishing plate for", id, top_plate) self.sub_label_publisher.publish( EventMetadataTypeEnum.recognized_license_plate, (id, top_plate, avg_confidence), @@ -1073,6 +1264,8 @@ class LicensePlateProcessingMixin: "char_confidences": top_char_confidences, "area": top_area, "obj_data": obj_data, + "camera": camera, + "last_seen": current_time if dedicated_lpr else None, } def handle_request(self, topic, request_data) -> dict[str, any] | None: diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py index d2cb9f2a5..c7d487cf0 100644 --- a/frigate/data_processing/real_time/license_plate.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -35,9 +35,11 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess self.sub_label_publisher = sub_label_publisher super().__init__(config, metrics) - def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): + def process_frame( + self, obj_data: dict[str, any], frame: np.ndarray, dedicated_lpr: bool = None + ): """Look for license plates in image.""" - self.lpr_process(obj_data, frame) + self.lpr_process(obj_data, frame, dedicated_lpr) def handle_request(self, topic, request_data) -> dict[str, any] | None: return