From 2458f667c45f95a16e348ab8f0855486f10194a9 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Tue, 11 Feb 2025 14:45:13 -0600 Subject: [PATCH] Refactor lpr into real time data processor (#16497) --- .../real_time/license_plate_processor.py} | 342 +++++++++++++++--- frigate/embeddings/embeddings.py | 41 --- frigate/embeddings/maintainer.py | 237 +----------- 3 files changed, 305 insertions(+), 315 deletions(-) rename frigate/{embeddings/lpr/lpr.py => data_processing/real_time/license_plate_processor.py} (68%) diff --git a/frigate/embeddings/lpr/lpr.py b/frigate/data_processing/real_time/license_plate_processor.py similarity index 68% rename from frigate/embeddings/lpr/lpr.py rename to frigate/data_processing/real_time/license_plate_processor.py index d7e513c73..27303601b 100644 --- a/frigate/embeddings/lpr/lpr.py +++ b/frigate/data_processing/real_time/license_plate_processor.py @@ -1,34 +1,41 @@ +"""Handle processing images for face detection and recognition.""" + +import datetime import logging import math -from typing import List, Tuple +import re +from typing import List, Optional, Tuple import cv2 import numpy as np +import requests from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset from shapely.geometry import Polygon from frigate.comms.inter_process import InterProcessRequestor -from frigate.config.classification import LicensePlateRecognitionConfig -from frigate.embeddings.embeddings import Embeddings +from frigate.config import FrigateConfig +from frigate.const import FRIGATE_LOCALHOST +from frigate.embeddings.functions.onnx import GenericONNXEmbedding, ModelTypeEnum +from frigate.util.image import area + +from ..types import DataProcessorMetrics +from .api import RealTimeProcessorApi logger = logging.getLogger(__name__) MIN_PLATE_LENGTH = 3 -class LicensePlateRecognition: - def __init__( - self, - config: LicensePlateRecognitionConfig, - requestor: InterProcessRequestor, - embeddings: Embeddings, - ): - self.lpr_config = config - self.requestor = requestor - self.embeddings = embeddings - self.detection_model = self.embeddings.lpr_detection_model - self.classification_model = self.embeddings.lpr_classification_model - self.recognition_model = self.embeddings.lpr_recognition_model +class LicensePlateProcessor(RealTimeProcessorApi): + def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): + super().__init__(config, metrics) + self.requestor = InterProcessRequestor() + self.lpr_config = config.lpr + self.requires_license_plate_detection = ( + "license_plate" not in self.config.objects.all_objects + ) + self.detected_license_plates: dict[str, dict[str, any]] = {} + self.ctc_decoder = CTCDecoder() self.batch_size = 6 @@ -39,13 +46,54 @@ class LicensePlateRecognition: self.box_thresh = 0.8 self.mask_thresh = 0.8 + self.lpr_detection_model = None + self.lpr_classification_model = None + self.lpr_recognition_model = None + + if self.config.lpr.enabled: + self.detection_model = GenericONNXEmbedding( + model_name="paddleocr-onnx", + model_file="detection.onnx", + download_urls={ + "detection.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/detection.onnx" + }, + model_size="large", + model_type=ModelTypeEnum.lpr_detect, + requestor=self.requestor, + device="CPU", + ) + + self.classification_model = GenericONNXEmbedding( + model_name="paddleocr-onnx", + model_file="classification.onnx", + download_urls={ + "classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx" + }, + model_size="large", + model_type=ModelTypeEnum.lpr_classify, + requestor=self.requestor, + device="CPU", + ) + + self.recognition_model = GenericONNXEmbedding( + model_name="paddleocr-onnx", + model_file="recognition.onnx", + download_urls={ + "recognition.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/recognition.onnx" + }, + model_size="large", + model_type=ModelTypeEnum.lpr_recognize, + requestor=self.requestor, + device="CPU", + ) + if self.lpr_config.enabled: # all models need to be loaded to run LPR self.detection_model._load_model_and_utils() self.classification_model._load_model_and_utils() self.recognition_model._load_model_and_utils() - def detect(self, image: np.ndarray) -> List[np.ndarray]: + def _detect(self, image: np.ndarray) -> List[np.ndarray]: """ Detect possible license plates in the input image by first resizing and normalizing it, running a detection model, and filtering out low-probability regions. @@ -59,18 +107,18 @@ class LicensePlateRecognition: h, w = image.shape[:2] if sum([h, w]) < 64: - image = self.zero_pad(image) + image = self._zero_pad(image) - resized_image = self.resize_image(image) - normalized_image = self.normalize_image(resized_image) + resized_image = self._resize_image(image) + normalized_image = self._normalize_image(resized_image) outputs = self.detection_model([normalized_image])[0] outputs = outputs[0, :, :] - boxes, _ = self.boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) - return self.filter_polygon(boxes, (h, w)) + boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) + return self._filter_polygon(boxes, (h, w)) - def classify( + def _classify( self, images: List[np.ndarray] ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: """ @@ -97,7 +145,7 @@ class LicensePlateRecognition: return self._process_classification_output(images, outputs) - def recognize( + def _recognize( self, images: List[np.ndarray] ) -> Tuple[List[str], List[List[float]]]: """ @@ -136,7 +184,7 @@ class LicensePlateRecognition: outputs = self.recognition_model(norm_images) return self.ctc_decoder(outputs) - def process_license_plate( + def _process_license_plate( self, image: np.ndarray ) -> Tuple[List[str], List[float], List[int]]: """ @@ -157,13 +205,13 @@ class LicensePlateRecognition: logger.debug("Model runners not loaded") return [], [], [] - plate_points = self.detect(image) + plate_points = self._detect(image) if len(plate_points) == 0: return [], [], [] - plate_points = self.sort_polygon(list(plate_points)) + plate_points = self._sort_polygon(list(plate_points)) plate_images = [self._crop_license_plate(image, x) for x in plate_points] - rotated_images, _ = self.classify(plate_images) + rotated_images, _ = self._classify(plate_images) # keep track of the index of each image for correct area calc later sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images]) @@ -171,7 +219,7 @@ class LicensePlateRecognition: idx: original_idx for original_idx, idx in enumerate(sorted_indices) } - results, confidences = self.recognize(rotated_images) + results, confidences = self._recognize(rotated_images) if results: license_plates = [""] * len(rotated_images) @@ -218,7 +266,7 @@ class LicensePlateRecognition: return [], [], [] - def resize_image(self, image: np.ndarray) -> np.ndarray: + def _resize_image(self, image: np.ndarray) -> np.ndarray: """ Resize the input image while maintaining the aspect ratio, ensuring dimensions are multiples of 32. @@ -234,7 +282,7 @@ class LicensePlateRecognition: resize_w = max(int(round(int(w * ratio) / 32) * 32), 32) return cv2.resize(image, (resize_w, resize_h)) - def normalize_image(self, image: np.ndarray) -> np.ndarray: + def _normalize_image(self, image: np.ndarray) -> np.ndarray: """ Normalize the input image by subtracting the mean and multiplying by the standard deviation. @@ -252,7 +300,7 @@ class LicensePlateRecognition: cv2.multiply(image, std, image) return image.transpose((2, 0, 1))[np.newaxis, ...] - def boxes_from_bitmap( + def _boxes_from_bitmap( self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int ) -> Tuple[np.ndarray, List[float]]: """ @@ -282,14 +330,14 @@ class LicensePlateRecognition: contour = contours[index] # get minimum bounding box (rotated rectangle) around the contour and the smallest side length. - points, min_side = self.get_min_boxes(contour) + points, min_side = self._get_min_boxes(contour) if min_side < self.min_size: continue points = np.array(points) - score = self.box_score(output, contour) + score = self._box_score(output, contour) if self.box_thresh > score: continue @@ -302,7 +350,7 @@ class LicensePlateRecognition: points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2)) # get the minimum bounding box around the shrunken polygon. - box, min_side = self.get_min_boxes(points) + box, min_side = self._get_min_boxes(points) if min_side < self.min_size + 2: continue @@ -321,7 +369,7 @@ class LicensePlateRecognition: return np.array(boxes, dtype="int32"), scores @staticmethod - def get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]: + def _get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]: """ Calculate the minimum bounding box (rotated rectangle) for a given contour. @@ -340,7 +388,7 @@ class LicensePlateRecognition: return box, min(bounding_box[1]) @staticmethod - def box_score(bitmap: np.ndarray, contour: np.ndarray) -> float: + def _box_score(bitmap: np.ndarray, contour: np.ndarray) -> float: """ Calculate the average score within the bounding box of a contour. @@ -360,7 +408,7 @@ class LicensePlateRecognition: return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0] @staticmethod - def expand_box(points: List[Tuple[float, float]]) -> np.ndarray: + def _expand_box(points: List[Tuple[float, float]]) -> np.ndarray: """ Expand a polygonal shape slightly by a factor determined by the area-to-perimeter ratio. @@ -377,7 +425,7 @@ class LicensePlateRecognition: expanded = np.array(offset.Execute(distance * 1.5)).reshape((-1, 2)) return expanded - def filter_polygon( + def _filter_polygon( self, points: List[np.ndarray], shape: Tuple[int, int] ) -> np.ndarray: """ @@ -394,14 +442,14 @@ class LicensePlateRecognition: height, width = shape return np.array( [ - self.clockwise_order(point) + self._clockwise_order(point) for point in points - if self.is_valid_polygon(point, width, height) + if self._is_valid_polygon(point, width, height) ] ) @staticmethod - def is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool: + def _is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool: """ Check if a polygon is valid, meaning it fits within the image bounds and has sides of a minimum length. @@ -424,7 +472,7 @@ class LicensePlateRecognition: ) @staticmethod - def clockwise_order(point: np.ndarray) -> np.ndarray: + def _clockwise_order(point: np.ndarray) -> np.ndarray: """ Arrange the points of a polygon in clockwise order based on their angular positions around the polygon's center. @@ -441,7 +489,7 @@ class LicensePlateRecognition: ] @staticmethod - def sort_polygon(points): + def _sort_polygon(points): """ Sort polygons based on their position in the image. If polygons are close in vertical position (within 10 pixels), sort them by horizontal position. @@ -466,7 +514,7 @@ class LicensePlateRecognition: return points @staticmethod - def zero_pad(image: np.ndarray) -> np.ndarray: + def _zero_pad(image: np.ndarray) -> np.ndarray: """ Apply zero-padding to an image, ensuring its dimensions are at least 32x32. The padding is added only if needed. @@ -649,6 +697,210 @@ class LicensePlateRecognition: image = np.rot90(image, k=3) return image + def __update_metrics(self, duration: float) -> None: + self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 + + def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: + """Return the dimensions of the input image as [x, y, width, height].""" + # TODO: use a small model here to detect plates + height, width = input.shape[:2] + return (0, 0, width, height) + + def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): + """Look for license plates in image.""" + start = datetime.datetime.now().timestamp() + + id = obj_data["id"] + + # don't run for non car objects + if obj_data.get("label") != "car": + logger.debug("Not a processing license plate for non car object.") + return + + # don't run for stationary car objects + if obj_data.get("stationary") == True: + logger.debug("Not a processing license plate for a stationary car object.") + return + + # don't overwrite sub label for objects that have a sub label + # that is not a license plate + if obj_data.get("sub_label") and id not in self.detected_license_plates: + logger.debug( + f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}." + ) + return + + license_plate: Optional[dict[str, any]] = None + + if self.requires_license_plate_detection: + logger.debug("Running manual license_plate detection.") + car_box = obj_data.get("box") + + if not car_box: + return + + rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) + left, top, right, bottom = car_box + car = rgb[top:bottom, left:right] + license_plate = self._detect_license_plate(car) + + if not license_plate: + logger.debug("Detected no license plates for car object.") + return + + license_plate_frame = car[ + license_plate[1] : license_plate[3], license_plate[0] : license_plate[2] + ] + license_plate_frame = cv2.cvtColor(license_plate_frame, cv2.COLOR_RGB2BGR) + else: + # don't run for object without attributes + if not obj_data.get("current_attributes"): + logger.debug("No attributes to parse.") + return + + attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) + for attr in attributes: + if attr.get("label") != "license_plate": + continue + + if license_plate is None or attr.get("score", 0.0) > license_plate.get( + "score", 0.0 + ): + license_plate = attr + + # no license plates detected in this frame + if not license_plate: + return + + license_plate_box = license_plate.get("box") + + # check that license plate is valid + if ( + not license_plate_box + or area(license_plate_box) < self.config.lpr.min_area + ): + logger.debug(f"Invalid license plate box {license_plate}") + return + + license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) + license_plate_frame = license_plate_frame[ + license_plate_box[1] : license_plate_box[3], + license_plate_box[0] : license_plate_box[2], + ] + + # run detection, returns results sorted by confidence, best first + license_plates, confidences, areas = self._process_license_plate( + license_plate_frame + ) + + logger.debug(f"Text boxes: {license_plates}") + logger.debug(f"Confidences: {confidences}") + logger.debug(f"Areas: {areas}") + + if license_plates: + for plate, confidence, text_area in zip(license_plates, confidences, areas): + avg_confidence = ( + (sum(confidence) / len(confidence)) if confidence else 0 + ) + + logger.debug( + f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)" + ) + else: + # no plates found + logger.debug("No text detected") + return + + top_plate, top_char_confidences, top_area = ( + license_plates[0], + confidences[0], + areas[0], + ) + avg_confidence = ( + (sum(top_char_confidences) / len(top_char_confidences)) + if top_char_confidences + else 0 + ) + + # Check if we have a previously detected plate for this ID + if id in self.detected_license_plates: + prev_plate = self.detected_license_plates[id]["plate"] + prev_char_confidences = self.detected_license_plates[id]["char_confidences"] + prev_area = self.detected_license_plates[id]["area"] + prev_avg_confidence = ( + (sum(prev_char_confidences) / len(prev_char_confidences)) + if prev_char_confidences + else 0 + ) + + # Define conditions for keeping the previous plate + shorter_than_previous = len(top_plate) < len(prev_plate) + lower_avg_confidence = avg_confidence <= prev_avg_confidence + smaller_area = top_area < prev_area + + # Compare character-by-character confidence where possible + min_length = min(len(top_plate), len(prev_plate)) + char_confidence_comparison = sum( + 1 + for i in range(min_length) + if top_char_confidences[i] <= prev_char_confidences[i] + ) + worse_char_confidences = char_confidence_comparison >= min_length / 2 + + if (shorter_than_previous or smaller_area) and ( + lower_avg_confidence and worse_char_confidences + ): + logger.debug( + f"Keeping previous plate. New plate stats: " + f"length={len(top_plate)}, avg_conf={avg_confidence:.2f}, area={top_area} " + f"vs Previous: length={len(prev_plate)}, avg_conf={prev_avg_confidence:.2f}, area={prev_area}" + ) + return True + + # Check against minimum confidence threshold + if avg_confidence < self.lpr_config.threshold: + logger.debug( + f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.threshold})" + ) + return + + # Determine subLabel based on known plates, use regex matching + # Default to the detected plate, use label name if there's a match + sub_label = next( + ( + label + for label, plates in self.lpr_config.known_plates.items() + if any(re.match(f"^{plate}$", top_plate) for plate in plates) + ), + top_plate, + ) + + # Send the result to the API + resp = requests.post( + f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", + json={ + "camera": obj_data.get("camera"), + "subLabel": sub_label, + "subLabelScore": avg_confidence, + }, + ) + + if resp.status_code == 200: + self.detected_license_plates[id] = { + "plate": top_plate, + "char_confidences": top_char_confidences, + "area": top_area, + } + + self.__update_metrics(datetime.datetime.now().timestamp() - start) + + def handle_request(self, topic, request_data) -> dict[str, any] | None: + return + + def expire_object(self, object_id: str): + if object_id in self.detected_license_plates: + self.detected_license_plates.pop(object_id) + class CTCDecoder: """ diff --git a/frigate/embeddings/embeddings.py b/frigate/embeddings/embeddings.py index 852806a8d..d8a4a2f4d 100644 --- a/frigate/embeddings/embeddings.py +++ b/frigate/embeddings/embeddings.py @@ -131,47 +131,6 @@ class Embeddings: device="GPU" if config.semantic_search.model_size == "large" else "CPU", ) - self.lpr_detection_model = None - self.lpr_classification_model = None - self.lpr_recognition_model = None - - if self.config.lpr.enabled: - self.lpr_detection_model = GenericONNXEmbedding( - model_name="paddleocr-onnx", - model_file="detection.onnx", - download_urls={ - "detection.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/detection.onnx" - }, - model_size="large", - model_type=ModelTypeEnum.lpr_detect, - requestor=self.requestor, - device="CPU", - ) - - self.lpr_classification_model = GenericONNXEmbedding( - model_name="paddleocr-onnx", - model_file="classification.onnx", - download_urls={ - "classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx" - }, - model_size="large", - model_type=ModelTypeEnum.lpr_classify, - requestor=self.requestor, - device="CPU", - ) - - self.lpr_recognition_model = GenericONNXEmbedding( - model_name="paddleocr-onnx", - model_file="recognition.onnx", - download_urls={ - "recognition.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/recognition.onnx" - }, - model_size="large", - model_type=ModelTypeEnum.lpr_recognize, - requestor=self.requestor, - device="CPU", - ) - def embed_thumbnail( self, event_id: str, thumbnail: bytes, upsert: bool = True ) -> ndarray: diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index e221bd146..b7623722d 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -1,10 +1,8 @@ """Maintain embeddings in SQLite-vec.""" import base64 -import datetime import logging import os -import re import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path @@ -12,7 +10,6 @@ from typing import Optional import cv2 import numpy as np -import requests from peewee import DoesNotExist from playhouse.sqliteq import SqliteQueueDatabase @@ -26,20 +23,21 @@ from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig from frigate.const import ( CLIPS_DIR, - FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION, ) from frigate.data_processing.real_time.api import RealTimeProcessorApi from frigate.data_processing.real_time.bird_processor import BirdProcessor from frigate.data_processing.real_time.face_processor import FaceProcessor +from frigate.data_processing.real_time.license_plate_processor import ( + LicensePlateProcessor, +) from frigate.data_processing.types import DataProcessorMetrics -from frigate.embeddings.lpr.lpr import LicensePlateRecognition from frigate.events.types import EventTypeEnum from frigate.genai import get_genai_client from frigate.models import Event from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import serialize -from frigate.util.image import SharedMemoryFrameManager, area, calculate_region +from frigate.util.image import SharedMemoryFrameManager, calculate_region from .embeddings import Embeddings @@ -82,24 +80,15 @@ class EmbeddingMaintainer(threading.Thread): if self.config.classification.bird.enabled: self.processors.append(BirdProcessor(self.config, metrics)) + if self.config.lpr.enabled: + self.processors.append(LicensePlateProcessor(self.config, metrics)) + # create communication for updating event descriptions self.requestor = InterProcessRequestor() self.stop_event = stop_event self.tracked_events: dict[str, list[any]] = {} self.genai_client = get_genai_client(config) - # set license plate recognition conditions - self.lpr_config = self.config.lpr - self.requires_license_plate_detection = ( - "license_plate" not in self.config.objects.all_objects - ) - self.detected_license_plates: dict[str, dict[str, any]] = {} - - if self.lpr_config.enabled: - self.license_plate_recognition = LicensePlateRecognition( - self.lpr_config, self.requestor, self.embeddings - ) - def run(self) -> None: """Maintain a SQLite-vec database for semantic search.""" while not self.stop_event.is_set(): @@ -164,11 +153,7 @@ class EmbeddingMaintainer(threading.Thread): camera_config = self.config.cameras[camera] # no need to process updated objects if face recognition, lpr, genai are disabled - if ( - not camera_config.genai.enabled - and not self.lpr_config.enabled - and len(self.processors) == 0 - ): + if not camera_config.genai.enabled and len(self.processors) == 0: return # Create our own thumbnail based on the bounding box and the frame time @@ -188,16 +173,6 @@ class EmbeddingMaintainer(threading.Thread): for processor in self.processors: processor.process_frame(data, yuv_frame) - if self.lpr_config.enabled: - start = datetime.datetime.now().timestamp() - processed = self._process_license_plate(data, yuv_frame) - - if processed: - duration = datetime.datetime.now().timestamp() - start - self.metrics.alpr_pps.value = ( - self.metrics.alpr_pps.value * 9 + duration - ) / 10 - # no need to save our own thumbnails if genai is not enabled # or if the object has become stationary if self.genai_client is not None and not data["stationary"]: @@ -229,9 +204,6 @@ class EmbeddingMaintainer(threading.Thread): for processor in self.processors: processor.expire_object(event_id) - if event_id in self.detected_license_plates: - self.detected_license_plates.pop(event_id) - if updated_db: try: event: Event = Event.get(Event.id == event_id) @@ -354,199 +326,6 @@ class EmbeddingMaintainer(threading.Thread): if event_id: self.handle_regenerate_description(event_id, source) - def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: - """Return the dimensions of the input image as [x, y, width, height].""" - height, width = input.shape[:2] - return (0, 0, width, height) - - def _process_license_plate( - self, obj_data: dict[str, any], frame: np.ndarray - ) -> bool: - """Look for license plates in image.""" - id = obj_data["id"] - - # don't run for non car objects - if obj_data.get("label") != "car": - logger.debug("Not a processing license plate for non car object.") - return False - - # don't run for stationary car objects - if obj_data.get("stationary") == True: - logger.debug("Not a processing license plate for a stationary car object.") - return False - - # don't overwrite sub label for objects that have a sub label - # that is not a license plate - if obj_data.get("sub_label") and id not in self.detected_license_plates: - logger.debug( - f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}." - ) - return False - - license_plate: Optional[dict[str, any]] = None - - if self.requires_license_plate_detection: - logger.debug("Running manual license_plate detection.") - car_box = obj_data.get("box") - - if not car_box: - return False - - rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) - left, top, right, bottom = car_box - car = rgb[top:bottom, left:right] - license_plate = self._detect_license_plate(car) - - if not license_plate: - logger.debug("Detected no license plates for car object.") - return False - - license_plate_frame = car[ - license_plate[1] : license_plate[3], license_plate[0] : license_plate[2] - ] - license_plate_frame = cv2.cvtColor(license_plate_frame, cv2.COLOR_RGB2BGR) - else: - # don't run for object without attributes - if not obj_data.get("current_attributes"): - logger.debug("No attributes to parse.") - return False - - attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) - for attr in attributes: - if attr.get("label") != "license_plate": - continue - - if license_plate is None or attr.get("score", 0.0) > license_plate.get( - "score", 0.0 - ): - license_plate = attr - - # no license plates detected in this frame - if not license_plate: - return False - - license_plate_box = license_plate.get("box") - - # check that license plate is valid - if ( - not license_plate_box - or area(license_plate_box) < self.config.lpr.min_area - ): - logger.debug(f"Invalid license plate box {license_plate}") - return False - - license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) - license_plate_frame = license_plate_frame[ - license_plate_box[1] : license_plate_box[3], - license_plate_box[0] : license_plate_box[2], - ] - - # run detection, returns results sorted by confidence, best first - license_plates, confidences, areas = ( - self.license_plate_recognition.process_license_plate(license_plate_frame) - ) - - logger.debug(f"Text boxes: {license_plates}") - logger.debug(f"Confidences: {confidences}") - logger.debug(f"Areas: {areas}") - - if license_plates: - for plate, confidence, text_area in zip(license_plates, confidences, areas): - avg_confidence = ( - (sum(confidence) / len(confidence)) if confidence else 0 - ) - - logger.debug( - f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)" - ) - else: - # no plates found - logger.debug("No text detected") - return True - - top_plate, top_char_confidences, top_area = ( - license_plates[0], - confidences[0], - areas[0], - ) - avg_confidence = ( - (sum(top_char_confidences) / len(top_char_confidences)) - if top_char_confidences - else 0 - ) - - # Check if we have a previously detected plate for this ID - if id in self.detected_license_plates: - prev_plate = self.detected_license_plates[id]["plate"] - prev_char_confidences = self.detected_license_plates[id]["char_confidences"] - prev_area = self.detected_license_plates[id]["area"] - prev_avg_confidence = ( - (sum(prev_char_confidences) / len(prev_char_confidences)) - if prev_char_confidences - else 0 - ) - - # Define conditions for keeping the previous plate - shorter_than_previous = len(top_plate) < len(prev_plate) - lower_avg_confidence = avg_confidence <= prev_avg_confidence - smaller_area = top_area < prev_area - - # Compare character-by-character confidence where possible - min_length = min(len(top_plate), len(prev_plate)) - char_confidence_comparison = sum( - 1 - for i in range(min_length) - if top_char_confidences[i] <= prev_char_confidences[i] - ) - worse_char_confidences = char_confidence_comparison >= min_length / 2 - - if (shorter_than_previous or smaller_area) and ( - lower_avg_confidence and worse_char_confidences - ): - logger.debug( - f"Keeping previous plate. New plate stats: " - f"length={len(top_plate)}, avg_conf={avg_confidence:.2f}, area={top_area} " - f"vs Previous: length={len(prev_plate)}, avg_conf={prev_avg_confidence:.2f}, area={prev_area}" - ) - return True - - # Check against minimum confidence threshold - if avg_confidence < self.lpr_config.threshold: - logger.debug( - f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.threshold})" - ) - return True - - # Determine subLabel based on known plates, use regex matching - # Default to the detected plate, use label name if there's a match - sub_label = next( - ( - label - for label, plates in self.lpr_config.known_plates.items() - if any(re.match(f"^{plate}$", top_plate) for plate in plates) - ), - top_plate, - ) - - # Send the result to the API - resp = requests.post( - f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", - json={ - "camera": obj_data.get("camera"), - "subLabel": sub_label, - "subLabelScore": avg_confidence, - }, - ) - - if resp.status_code == 200: - self.detected_license_plates[id] = { - "plate": top_plate, - "char_confidences": top_char_confidences, - "area": top_area, - } - - return True - def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: """Return jpg thumbnail of a region of the frame.""" frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)