diff --git a/docker/main/requirements-wheels.txt b/docker/main/requirements-wheels.txt index ab91c94a7..bb4ac622b 100644 --- a/docker/main/requirements-wheels.txt +++ b/docker/main/requirements-wheels.txt @@ -53,6 +53,7 @@ pywebpush == 2.0.* # alpr pyclipper == 1.3.* shapely == 2.0.* +Levenshtein==0.26.* prometheus-client == 0.21.* # HailoRT Wheels appdirs==1.4.* diff --git a/docs/docs/configuration/license_plate_recognition.md b/docs/docs/configuration/license_plate_recognition.md index a2b976726..6c90f099e 100644 --- a/docs/docs/configuration/license_plate_recognition.md +++ b/docs/docs/configuration/license_plate_recognition.md @@ -24,14 +24,20 @@ lpr: ## Advanced Configuration -Several options are available to fine-tune the LPR feature. For example, you can adjust the `min_area` setting, which defines the minimum size in pixels a license plate must be before LPR runs. The default is 500 pixels. +Several options are available to fine-tune the LPR feature. For example, you can adjust the `min_area` setting, which defines the minimum size in pixels a license plate must be before LPR runs. The default is 1000 pixels. + +The `min_plate_length` field specifies the minimum number of characters a license plate must have to be added to the object as a sub label. + +If you want to allow a number of number of missing/incorrect characters to still cause a detected plate to match a known plate, set the `match_distance` field. For example, setting `match_distance` to 1 would cause a detected plate of ABCDE to match ABCBE or ABCD. Additionally, you can define `known_plates` as strings or regular expressions, allowing Frigate to label tracked vehicles with custom sub_labels when a recognized plate is detected. This information is then accessible in the UI, filters, and notifications. ```yaml lpr: enabled: true - min_area: 500 + min_area: 1500 + min_plate_length: 4 + match_distance: 1 known_plates: Wife's Car: - "ABC-1234" diff --git a/frigate/config/classification.py b/frigate/config/classification.py index 4e806f9d9..9dc790b4b 100644 --- a/frigate/config/classification.py +++ b/frigate/config/classification.py @@ -66,8 +66,16 @@ class LicensePlateRecognitionConfig(FrigateBaseModel): title="License plate confidence score required to be added to the object as a sub label.", ) min_area: int = Field( - default=500, - title="Min area of license plate to consider running license plate recognition.", + default=1000, + title="Minimum area of license plate to consider running license plate recognition.", + ) + min_plate_length: int = Field( + default=4, + title="Minimum number of characters a license plate must have to be added to the object as a sub label.", + ) + match_distance: int = Field( + default=1, + title="Allow this number of missing/incorrect characters to still cause a detected plate to match a known plate.", ) known_plates: Optional[Dict[str, List[str]]] = Field( default={}, title="Known plates to track." diff --git a/frigate/data_processing/real_time/license_plate_processor.py b/frigate/data_processing/real_time/license_plate_processor.py index 27303601b..09d2eedc8 100644 --- a/frigate/data_processing/real_time/license_plate_processor.py +++ b/frigate/data_processing/real_time/license_plate_processor.py @@ -9,6 +9,7 @@ from typing import List, Optional, Tuple import cv2 import numpy as np import requests +from Levenshtein import distance from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset from shapely.geometry import Polygon @@ -23,7 +24,7 @@ from .api import RealTimeProcessorApi logger = logging.getLogger(__name__) -MIN_PLATE_LENGTH = 3 +WRITE_DEBUG_IMAGES = False class LicensePlateProcessor(RealTimeProcessorApi): @@ -86,12 +87,24 @@ class LicensePlateProcessor(RealTimeProcessorApi): requestor=self.requestor, device="CPU", ) + self.yolov9_detection_model = GenericONNXEmbedding( + model_name="yolov9_license_plate", + model_file="yolov9-256-license-plates.onnx", + download_urls={ + "yolov9-256-license-plates.onnx": "https://github.com/hawkeye217/yolov9-license-plates/raw/refs/heads/master/models/yolov9-256-license-plates.onnx" + }, + model_size="large", + model_type=ModelTypeEnum.yolov9_lpr_detect, + requestor=self.requestor, + device="CPU", + ) if self.lpr_config.enabled: # all models need to be loaded to run LPR self.detection_model._load_model_and_utils() self.classification_model._load_model_and_utils() self.recognition_model._load_model_and_utils() + self.yolov9_detection_model._load_model_and_utils() def _detect(self, image: np.ndarray) -> List[np.ndarray]: """ @@ -112,6 +125,13 @@ class LicensePlateProcessor(RealTimeProcessorApi): resized_image = self._resize_image(image) normalized_image = self._normalize_image(resized_image) + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + cv2.imwrite( + f"debug/frames/license_plate_resized_{current_time}.jpg", + resized_image, + ) + outputs = self.detection_model([normalized_image])[0] outputs = outputs[0, :, :] @@ -207,12 +227,27 @@ class LicensePlateProcessor(RealTimeProcessorApi): plate_points = self._detect(image) if len(plate_points) == 0: + logger.debug("No points found by OCR detector model") return [], [], [] plate_points = self._sort_polygon(list(plate_points)) plate_images = [self._crop_license_plate(image, x) for x in plate_points] rotated_images, _ = self._classify(plate_images) + # debug rotated and classification result + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + for i, img in enumerate(plate_images): + cv2.imwrite( + f"debug/frames/license_plate_rotated_{current_time}_{i + 1}.jpg", + img, + ) + for i, img in enumerate(rotated_images): + cv2.imwrite( + f"debug/frames/license_plate_classified_{current_time}_{i + 1}.jpg", + img, + ) + # keep track of the index of each image for correct area calc later sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images]) reverse_mapping = { @@ -240,7 +275,7 @@ class LicensePlateProcessor(RealTimeProcessorApi): save_image = cv2.cvtColor( rotated_images[original_idx], cv2.COLOR_RGB2BGR ) - filename = f"/config/plate_{original_idx}_{plate}_{area}.jpg" + filename = f"debug/frames/plate_{original_idx}_{plate}_{area}.jpg" cv2.imwrite(filename, save_image) license_plates[original_idx] = plate @@ -255,7 +290,7 @@ class LicensePlateProcessor(RealTimeProcessorApi): for plate, conf, area in zip( license_plates, average_confidences, areas ) - if len(plate) >= MIN_PLATE_LENGTH + if len(plate) >= self.lpr_config.min_plate_length ], key=lambda x: (x[2], len(x[0]), x[1]), reverse=True, @@ -331,6 +366,7 @@ class LicensePlateProcessor(RealTimeProcessorApi): # get minimum bounding box (rotated rectangle) around the contour and the smallest side length. points, min_side = self._get_min_boxes(contour) + logger.debug(f"min side {index}, {min_side}") if min_side < self.min_size: continue @@ -338,6 +374,7 @@ class LicensePlateProcessor(RealTimeProcessorApi): points = np.array(points) score = self._box_score(output, contour) + logger.debug(f"box score {index}, {score}") if self.box_thresh > score: continue @@ -492,7 +529,7 @@ class LicensePlateProcessor(RealTimeProcessorApi): def _sort_polygon(points): """ Sort polygons based on their position in the image. If polygons are close in vertical - position (within 10 pixels), sort them by horizontal position. + position (within 5 pixels), sort them by horizontal position. Args: points: List of polygons to sort. @@ -503,7 +540,7 @@ class LicensePlateProcessor(RealTimeProcessorApi): points.sort(key=lambda x: (x[0][1], x[0][0])) for i in range(len(points) - 1): for j in range(i, -1, -1): - if abs(points[j + 1][0][1] - points[j][0][1]) < 10 and ( + if abs(points[j + 1][0][1] - points[j][0][1]) < 5 and ( points[j + 1][0][0] < points[j][0][0] ): temp = points[j] @@ -602,7 +639,8 @@ class LicensePlateProcessor(RealTimeProcessorApi): for j in range(len(outputs)): label, score = outputs[j] results[indices[i + j]] = [label, score] - if "180" in label and score >= self.lpr_config.threshold: + # make sure we have high confidence if we need to flip a box, this will be rare in lpr + if "180" in label and score >= 0.9: images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1) return images, results @@ -646,7 +684,11 @@ class LicensePlateProcessor(RealTimeProcessorApi): resized_image = resized_image.transpose((2, 0, 1)) resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5 - padded_image = np.zeros((input_shape[0], input_h, input_w), dtype=np.float32) + # Compute mean pixel value of the resized image (per channel) + mean_pixel = np.mean(resized_image, axis=(1, 2), keepdims=True) + padded_image = np.full( + (input_shape[0], input_h, input_w), mean_pixel, dtype=np.float32 + ) padded_image[:, :, :resized_w] = resized_image return padded_image @@ -698,13 +740,141 @@ class LicensePlateProcessor(RealTimeProcessorApi): return image def __update_metrics(self, duration: float) -> None: + """ + Update inference metrics. + """ self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: - """Return the dimensions of the input image as [x, y, width, height].""" - # TODO: use a small model here to detect plates - height, width = input.shape[:2] - return (0, 0, width, height) + """ + Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ + + Return the dimensions of the detected plate as [x1, y1, x2, y2]. + """ + predictions = self.yolov9_detection_model(input) + + confidence_threshold = self.lpr_config.threshold + + top_score = -1 + top_box = None + + # Loop over predictions + for prediction in predictions: + score = prediction[6] + if score >= confidence_threshold: + bbox = prediction[1:5] + # Scale boxes back to original image size + scale_x = input.shape[1] / 256 + scale_y = input.shape[0] / 256 + bbox[0] *= scale_x + bbox[1] *= scale_y + bbox[2] *= scale_x + bbox[3] *= scale_y + + if score > top_score: + top_score = score + top_box = bbox + + # Return the top scoring bounding box if found + if top_box is not None: + # expand box by 15% to help with OCR + expansion = (top_box[2:] - top_box[:2]) * 0.1 + + # Expand box + expanded_box = np.array( + [ + top_box[0] - expansion[0], # x1 + top_box[1] - expansion[1], # y1 + top_box[2] + expansion[0], # x2 + top_box[3] + expansion[1], # y2 + ] + ).clip(0, [input.shape[1], input.shape[0]] * 2) + + logger.debug(f"Found license plate: {expanded_box.astype(int)}") + return tuple(expanded_box.astype(int)) + else: + return None # No detection above the threshold + + def _should_keep_previous_plate( + self, id, top_plate, top_char_confidences, top_area, avg_confidence + ): + if id not in self.detected_license_plates: + return False + + prev_data = self.detected_license_plates[id] + prev_plate = prev_data["plate"] + prev_char_confidences = prev_data["char_confidences"] + prev_area = prev_data["area"] + prev_avg_confidence = ( + sum(prev_char_confidences) / len(prev_char_confidences) + if prev_char_confidences + else 0 + ) + + # 1. Normalize metrics + # Length score - use relative comparison + # If lengths are equal, score is 0.5 for both + # If one is longer, it gets a higher score up to 1.0 + max_length_diff = 4 # Maximum expected difference in plate lengths + length_diff = len(top_plate) - len(prev_plate) + curr_length_score = 0.5 + ( + length_diff / (2 * max_length_diff) + ) # Normalize to 0-1 + curr_length_score = max(0, min(1, curr_length_score)) # Clamp to 0-1 + prev_length_score = 1 - curr_length_score # Inverse relationship + + # Area score (normalize based on max of current and previous) + max_area = max(top_area, prev_area) + curr_area_score = top_area / max_area + prev_area_score = prev_area / max_area + + # Average confidence score (already normalized 0-1) + curr_conf_score = avg_confidence + prev_conf_score = prev_avg_confidence + + # Character confidence comparison score + min_length = min(len(top_plate), len(prev_plate)) + if min_length > 0: + curr_char_conf = sum(top_char_confidences[:min_length]) / min_length + prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length + else: + curr_char_conf = 0 + prev_char_conf = 0 + + # 2. Define weights + weights = { + "length": 0.4, + "area": 0.3, + "avg_confidence": 0.2, + "char_confidence": 0.1, + } + + # 3. Calculate weighted scores + curr_score = ( + curr_length_score * weights["length"] + + curr_area_score * weights["area"] + + curr_conf_score * weights["avg_confidence"] + + curr_char_conf * weights["char_confidence"] + ) + + prev_score = ( + prev_length_score * weights["length"] + + prev_area_score * weights["area"] + + prev_conf_score * weights["avg_confidence"] + + prev_char_conf * weights["char_confidence"] + ) + + # 4. Log the comparison for debugging + logger.debug( + f"Plate comparison - Current plate: {top_plate} (score: {curr_score:.3f}) vs " + f"Previous plate: {prev_plate} (score: {prev_score:.3f})\n" + f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), " + f"Area: {top_area} vs {prev_area}, " + f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}" + ) + + # 5. Return True if we should keep the previous plate (i.e., if it scores higher) + return prev_score > curr_score def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): """Look for license plates in image.""" @@ -739,19 +909,45 @@ class LicensePlateProcessor(RealTimeProcessorApi): if not car_box: return - rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) + rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) left, top, right, bottom = car_box car = rgb[top:bottom, left:right] + + # double the size of the car for better box detection + car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0]))) + + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + cv2.imwrite( + f"debug/frames/car_frame_{current_time}.jpg", + car, + ) + + yolov9_start = datetime.datetime.now().timestamp() license_plate = self._detect_license_plate(car) + logger.debug( + f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms" + ) if not license_plate: logger.debug("Detected no license plates for car object.") return + license_plate_area = max( + 0, + (license_plate[2] - license_plate[0]) + * (license_plate[3] - license_plate[1]), + ) + + # check that license plate is valid + # double the value because we've doubled the size of the car + if license_plate_area < self.config.lpr.min_area * 2: + logger.debug("License plate is less than min_area") + return + license_plate_frame = car[ license_plate[1] : license_plate[3], license_plate[0] : license_plate[2] ] - license_plate_frame = cv2.cvtColor(license_plate_frame, cv2.COLOR_RGB2BGR) else: # don't run for object without attributes if not obj_data.get("current_attributes"): @@ -788,6 +984,22 @@ class LicensePlateProcessor(RealTimeProcessorApi): license_plate_box[0] : license_plate_box[2], ] + # double the size of the license plate frame for better OCR + license_plate_frame = cv2.resize( + license_plate_frame, + ( + int(2 * license_plate_frame.shape[1]), + int(2 * license_plate_frame.shape[0]), + ), + ) + + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + cv2.imwrite( + f"debug/frames/license_plate_frame_{current_time}.jpg", + license_plate_frame, + ) + # run detection, returns results sorted by confidence, best first license_plates, confidences, areas = self._process_license_plate( license_plate_frame @@ -824,38 +1036,11 @@ class LicensePlateProcessor(RealTimeProcessorApi): # Check if we have a previously detected plate for this ID if id in self.detected_license_plates: - prev_plate = self.detected_license_plates[id]["plate"] - prev_char_confidences = self.detected_license_plates[id]["char_confidences"] - prev_area = self.detected_license_plates[id]["area"] - prev_avg_confidence = ( - (sum(prev_char_confidences) / len(prev_char_confidences)) - if prev_char_confidences - else 0 - ) - - # Define conditions for keeping the previous plate - shorter_than_previous = len(top_plate) < len(prev_plate) - lower_avg_confidence = avg_confidence <= prev_avg_confidence - smaller_area = top_area < prev_area - - # Compare character-by-character confidence where possible - min_length = min(len(top_plate), len(prev_plate)) - char_confidence_comparison = sum( - 1 - for i in range(min_length) - if top_char_confidences[i] <= prev_char_confidences[i] - ) - worse_char_confidences = char_confidence_comparison >= min_length / 2 - - if (shorter_than_previous or smaller_area) and ( - lower_avg_confidence and worse_char_confidences + if self._should_keep_previous_plate( + id, top_plate, top_char_confidences, top_area, avg_confidence ): - logger.debug( - f"Keeping previous plate. New plate stats: " - f"length={len(top_plate)}, avg_conf={avg_confidence:.2f}, area={top_area} " - f"vs Previous: length={len(prev_plate)}, avg_conf={prev_avg_confidence:.2f}, area={prev_area}" - ) - return True + logger.debug("Keeping previous plate") + return # Check against minimum confidence threshold if avg_confidence < self.lpr_config.threshold: @@ -870,7 +1055,11 @@ class LicensePlateProcessor(RealTimeProcessorApi): ( label for label, plates in self.lpr_config.known_plates.items() - if any(re.match(f"^{plate}$", top_plate) for plate in plates) + if any( + re.match(f"^{plate}$", top_plate) + or distance(plate, top_plate) <= self.lpr_config.match_distance + for plate in plates + ) ), top_plate, ) diff --git a/frigate/embeddings/functions/onnx.py b/frigate/embeddings/functions/onnx.py index 67b2c44a2..a8d52922b 100644 --- a/frigate/embeddings/functions/onnx.py +++ b/frigate/embeddings/functions/onnx.py @@ -5,6 +5,7 @@ from enum import Enum from io import BytesIO from typing import Dict, List, Optional, Union +import cv2 import numpy as np import requests from PIL import Image @@ -32,6 +33,7 @@ disable_progress_bar() logger = logging.getLogger(__name__) FACE_EMBEDDING_SIZE = 160 +LPR_EMBEDDING_SIZE = 256 class ModelTypeEnum(str, Enum): @@ -41,6 +43,7 @@ class ModelTypeEnum(str, Enum): lpr_detect = "lpr_detect" lpr_classify = "lpr_classify" lpr_recognize = "lpr_recognize" + yolov9_lpr_detect = "yolov9_lpr_detect" class GenericONNXEmbedding: @@ -148,6 +151,8 @@ class GenericONNXEmbedding: self.feature_extractor = [] elif self.model_type == ModelTypeEnum.lpr_recognize: self.feature_extractor = [] + elif self.model_type == ModelTypeEnum.yolov9_lpr_detect: + self.feature_extractor = [] self.runner = ONNXModelRunner( os.path.join(self.download_path, self.model_file), @@ -237,6 +242,45 @@ class GenericONNXEmbedding: for img in raw_inputs: processed.append({"x": img}) return processed + elif self.model_type == ModelTypeEnum.yolov9_lpr_detect: + if isinstance(raw_inputs, list): + raise ValueError( + "License plate embedding does not support batch inputs." + ) + # Get image as numpy array + img = self._process_image(raw_inputs) + height, width, channels = img.shape + + # Resize maintaining aspect ratio + if width > height: + new_height = int(((height / width) * LPR_EMBEDDING_SIZE) // 4 * 4) + img = cv2.resize(img, (LPR_EMBEDDING_SIZE, new_height)) + else: + new_width = int(((width / height) * LPR_EMBEDDING_SIZE) // 4 * 4) + img = cv2.resize(img, (new_width, LPR_EMBEDDING_SIZE)) + + # Get new dimensions after resize + og_h, og_w, channels = img.shape + + # Create black square frame + frame = np.full( + (LPR_EMBEDDING_SIZE, LPR_EMBEDDING_SIZE, channels), + (0, 0, 0), + dtype=np.float32, + ) + + # Center the resized image in the square frame + x_center = (LPR_EMBEDDING_SIZE - og_w) // 2 + y_center = (LPR_EMBEDDING_SIZE - og_h) // 2 + frame[y_center : y_center + og_h, x_center : x_center + og_w] = img + + # Normalize to 0-1 + frame = frame / 255.0 + + # Convert from HWC to CHW format and add batch dimension + frame = np.transpose(frame, (2, 0, 1)) + frame = np.expand_dims(frame, axis=0) + return [{"images": frame}] else: raise ValueError(f"Unable to preprocess inputs for {self.model_type}")