Refactor lpr into real time data processor (#16497)

This commit is contained in:
Josh Hawkins 2025-02-11 14:45:13 -06:00 committed by GitHub
parent f3e2cf0a58
commit 2458f667c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 305 additions and 315 deletions

View File

@ -1,34 +1,41 @@
"""Handle processing images for face detection and recognition."""
import datetime
import logging
import math
from typing import List, Tuple
import re
from typing import List, Optional, Tuple
import cv2
import numpy as np
import requests
from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset
from shapely.geometry import Polygon
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config.classification import LicensePlateRecognitionConfig
from frigate.embeddings.embeddings import Embeddings
from frigate.config import FrigateConfig
from frigate.const import FRIGATE_LOCALHOST
from frigate.embeddings.functions.onnx import GenericONNXEmbedding, ModelTypeEnum
from frigate.util.image import area
from ..types import DataProcessorMetrics
from .api import RealTimeProcessorApi
logger = logging.getLogger(__name__)
MIN_PLATE_LENGTH = 3
class LicensePlateRecognition:
def __init__(
self,
config: LicensePlateRecognitionConfig,
requestor: InterProcessRequestor,
embeddings: Embeddings,
):
self.lpr_config = config
self.requestor = requestor
self.embeddings = embeddings
self.detection_model = self.embeddings.lpr_detection_model
self.classification_model = self.embeddings.lpr_classification_model
self.recognition_model = self.embeddings.lpr_recognition_model
class LicensePlateProcessor(RealTimeProcessorApi):
def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics):
super().__init__(config, metrics)
self.requestor = InterProcessRequestor()
self.lpr_config = config.lpr
self.requires_license_plate_detection = (
"license_plate" not in self.config.objects.all_objects
)
self.detected_license_plates: dict[str, dict[str, any]] = {}
self.ctc_decoder = CTCDecoder()
self.batch_size = 6
@ -39,13 +46,54 @@ class LicensePlateRecognition:
self.box_thresh = 0.8
self.mask_thresh = 0.8
self.lpr_detection_model = None
self.lpr_classification_model = None
self.lpr_recognition_model = None
if self.config.lpr.enabled:
self.detection_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="detection.onnx",
download_urls={
"detection.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/detection.onnx"
},
model_size="large",
model_type=ModelTypeEnum.lpr_detect,
requestor=self.requestor,
device="CPU",
)
self.classification_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="classification.onnx",
download_urls={
"classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx"
},
model_size="large",
model_type=ModelTypeEnum.lpr_classify,
requestor=self.requestor,
device="CPU",
)
self.recognition_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="recognition.onnx",
download_urls={
"recognition.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/recognition.onnx"
},
model_size="large",
model_type=ModelTypeEnum.lpr_recognize,
requestor=self.requestor,
device="CPU",
)
if self.lpr_config.enabled:
# all models need to be loaded to run LPR
self.detection_model._load_model_and_utils()
self.classification_model._load_model_and_utils()
self.recognition_model._load_model_and_utils()
def detect(self, image: np.ndarray) -> List[np.ndarray]:
def _detect(self, image: np.ndarray) -> List[np.ndarray]:
"""
Detect possible license plates in the input image by first resizing and normalizing it,
running a detection model, and filtering out low-probability regions.
@ -59,18 +107,18 @@ class LicensePlateRecognition:
h, w = image.shape[:2]
if sum([h, w]) < 64:
image = self.zero_pad(image)
image = self._zero_pad(image)
resized_image = self.resize_image(image)
normalized_image = self.normalize_image(resized_image)
resized_image = self._resize_image(image)
normalized_image = self._normalize_image(resized_image)
outputs = self.detection_model([normalized_image])[0]
outputs = outputs[0, :, :]
boxes, _ = self.boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
return self.filter_polygon(boxes, (h, w))
boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
return self._filter_polygon(boxes, (h, w))
def classify(
def _classify(
self, images: List[np.ndarray]
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]:
"""
@ -97,7 +145,7 @@ class LicensePlateRecognition:
return self._process_classification_output(images, outputs)
def recognize(
def _recognize(
self, images: List[np.ndarray]
) -> Tuple[List[str], List[List[float]]]:
"""
@ -136,7 +184,7 @@ class LicensePlateRecognition:
outputs = self.recognition_model(norm_images)
return self.ctc_decoder(outputs)
def process_license_plate(
def _process_license_plate(
self, image: np.ndarray
) -> Tuple[List[str], List[float], List[int]]:
"""
@ -157,13 +205,13 @@ class LicensePlateRecognition:
logger.debug("Model runners not loaded")
return [], [], []
plate_points = self.detect(image)
plate_points = self._detect(image)
if len(plate_points) == 0:
return [], [], []
plate_points = self.sort_polygon(list(plate_points))
plate_points = self._sort_polygon(list(plate_points))
plate_images = [self._crop_license_plate(image, x) for x in plate_points]
rotated_images, _ = self.classify(plate_images)
rotated_images, _ = self._classify(plate_images)
# keep track of the index of each image for correct area calc later
sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images])
@ -171,7 +219,7 @@ class LicensePlateRecognition:
idx: original_idx for original_idx, idx in enumerate(sorted_indices)
}
results, confidences = self.recognize(rotated_images)
results, confidences = self._recognize(rotated_images)
if results:
license_plates = [""] * len(rotated_images)
@ -218,7 +266,7 @@ class LicensePlateRecognition:
return [], [], []
def resize_image(self, image: np.ndarray) -> np.ndarray:
def _resize_image(self, image: np.ndarray) -> np.ndarray:
"""
Resize the input image while maintaining the aspect ratio, ensuring dimensions are multiples of 32.
@ -234,7 +282,7 @@ class LicensePlateRecognition:
resize_w = max(int(round(int(w * ratio) / 32) * 32), 32)
return cv2.resize(image, (resize_w, resize_h))
def normalize_image(self, image: np.ndarray) -> np.ndarray:
def _normalize_image(self, image: np.ndarray) -> np.ndarray:
"""
Normalize the input image by subtracting the mean and multiplying by the standard deviation.
@ -252,7 +300,7 @@ class LicensePlateRecognition:
cv2.multiply(image, std, image)
return image.transpose((2, 0, 1))[np.newaxis, ...]
def boxes_from_bitmap(
def _boxes_from_bitmap(
self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int
) -> Tuple[np.ndarray, List[float]]:
"""
@ -282,14 +330,14 @@ class LicensePlateRecognition:
contour = contours[index]
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
points, min_side = self.get_min_boxes(contour)
points, min_side = self._get_min_boxes(contour)
if min_side < self.min_size:
continue
points = np.array(points)
score = self.box_score(output, contour)
score = self._box_score(output, contour)
if self.box_thresh > score:
continue
@ -302,7 +350,7 @@ class LicensePlateRecognition:
points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2))
# get the minimum bounding box around the shrunken polygon.
box, min_side = self.get_min_boxes(points)
box, min_side = self._get_min_boxes(points)
if min_side < self.min_size + 2:
continue
@ -321,7 +369,7 @@ class LicensePlateRecognition:
return np.array(boxes, dtype="int32"), scores
@staticmethod
def get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]:
def _get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]:
"""
Calculate the minimum bounding box (rotated rectangle) for a given contour.
@ -340,7 +388,7 @@ class LicensePlateRecognition:
return box, min(bounding_box[1])
@staticmethod
def box_score(bitmap: np.ndarray, contour: np.ndarray) -> float:
def _box_score(bitmap: np.ndarray, contour: np.ndarray) -> float:
"""
Calculate the average score within the bounding box of a contour.
@ -360,7 +408,7 @@ class LicensePlateRecognition:
return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0]
@staticmethod
def expand_box(points: List[Tuple[float, float]]) -> np.ndarray:
def _expand_box(points: List[Tuple[float, float]]) -> np.ndarray:
"""
Expand a polygonal shape slightly by a factor determined by the area-to-perimeter ratio.
@ -377,7 +425,7 @@ class LicensePlateRecognition:
expanded = np.array(offset.Execute(distance * 1.5)).reshape((-1, 2))
return expanded
def filter_polygon(
def _filter_polygon(
self, points: List[np.ndarray], shape: Tuple[int, int]
) -> np.ndarray:
"""
@ -394,14 +442,14 @@ class LicensePlateRecognition:
height, width = shape
return np.array(
[
self.clockwise_order(point)
self._clockwise_order(point)
for point in points
if self.is_valid_polygon(point, width, height)
if self._is_valid_polygon(point, width, height)
]
)
@staticmethod
def is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool:
def _is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool:
"""
Check if a polygon is valid, meaning it fits within the image bounds
and has sides of a minimum length.
@ -424,7 +472,7 @@ class LicensePlateRecognition:
)
@staticmethod
def clockwise_order(point: np.ndarray) -> np.ndarray:
def _clockwise_order(point: np.ndarray) -> np.ndarray:
"""
Arrange the points of a polygon in clockwise order based on their angular positions
around the polygon's center.
@ -441,7 +489,7 @@ class LicensePlateRecognition:
]
@staticmethod
def sort_polygon(points):
def _sort_polygon(points):
"""
Sort polygons based on their position in the image. If polygons are close in vertical
position (within 10 pixels), sort them by horizontal position.
@ -466,7 +514,7 @@ class LicensePlateRecognition:
return points
@staticmethod
def zero_pad(image: np.ndarray) -> np.ndarray:
def _zero_pad(image: np.ndarray) -> np.ndarray:
"""
Apply zero-padding to an image, ensuring its dimensions are at least 32x32.
The padding is added only if needed.
@ -649,6 +697,210 @@ class LicensePlateRecognition:
image = np.rot90(image, k=3)
return image
def __update_metrics(self, duration: float) -> None:
self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Return the dimensions of the input image as [x, y, width, height]."""
# TODO: use a small model here to detect plates
height, width = input.shape[:2]
return (0, 0, width, height)
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray):
"""Look for license plates in image."""
start = datetime.datetime.now().timestamp()
id = obj_data["id"]
# don't run for non car objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
return
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug("Not a processing license plate for a stationary car object.")
return
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return
license_plate: Optional[dict[str, any]] = None
if self.requires_license_plate_detection:
logger.debug("Running manual license_plate detection.")
car_box = obj_data.get("box")
if not car_box:
return
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = car_box
car = rgb[top:bottom, left:right]
license_plate = self._detect_license_plate(car)
if not license_plate:
logger.debug("Detected no license plates for car object.")
return
license_plate_frame = car[
license_plate[1] : license_plate[3], license_plate[0] : license_plate[2]
]
license_plate_frame = cv2.cvtColor(license_plate_frame, cv2.COLOR_RGB2BGR)
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get("score", 0.0) > license_plate.get(
"score", 0.0
):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return
license_plate_box = license_plate.get("box")
# check that license plate is valid
if (
not license_plate_box
or area(license_plate_box) < self.config.lpr.min_area
):
logger.debug(f"Invalid license plate box {license_plate}")
return
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
license_plate_frame = license_plate_frame[
license_plate_box[1] : license_plate_box[3],
license_plate_box[0] : license_plate_box[2],
]
# run detection, returns results sorted by confidence, best first
license_plates, confidences, areas = self._process_license_plate(
license_plate_frame
)
logger.debug(f"Text boxes: {license_plates}")
logger.debug(f"Confidences: {confidences}")
logger.debug(f"Areas: {areas}")
if license_plates:
for plate, confidence, text_area in zip(license_plates, confidences, areas):
avg_confidence = (
(sum(confidence) / len(confidence)) if confidence else 0
)
logger.debug(
f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
)
else:
# no plates found
logger.debug("No text detected")
return
top_plate, top_char_confidences, top_area = (
license_plates[0],
confidences[0],
areas[0],
)
avg_confidence = (
(sum(top_char_confidences) / len(top_char_confidences))
if top_char_confidences
else 0
)
# Check if we have a previously detected plate for this ID
if id in self.detected_license_plates:
prev_plate = self.detected_license_plates[id]["plate"]
prev_char_confidences = self.detected_license_plates[id]["char_confidences"]
prev_area = self.detected_license_plates[id]["area"]
prev_avg_confidence = (
(sum(prev_char_confidences) / len(prev_char_confidences))
if prev_char_confidences
else 0
)
# Define conditions for keeping the previous plate
shorter_than_previous = len(top_plate) < len(prev_plate)
lower_avg_confidence = avg_confidence <= prev_avg_confidence
smaller_area = top_area < prev_area
# Compare character-by-character confidence where possible
min_length = min(len(top_plate), len(prev_plate))
char_confidence_comparison = sum(
1
for i in range(min_length)
if top_char_confidences[i] <= prev_char_confidences[i]
)
worse_char_confidences = char_confidence_comparison >= min_length / 2
if (shorter_than_previous or smaller_area) and (
lower_avg_confidence and worse_char_confidences
):
logger.debug(
f"Keeping previous plate. New plate stats: "
f"length={len(top_plate)}, avg_conf={avg_confidence:.2f}, area={top_area} "
f"vs Previous: length={len(prev_plate)}, avg_conf={prev_avg_confidence:.2f}, area={prev_area}"
)
return True
# Check against minimum confidence threshold
if avg_confidence < self.lpr_config.threshold:
logger.debug(
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.threshold})"
)
return
# Determine subLabel based on known plates, use regex matching
# Default to the detected plate, use label name if there's a match
sub_label = next(
(
label
for label, plates in self.lpr_config.known_plates.items()
if any(re.match(f"^{plate}$", top_plate) for plate in plates)
),
top_plate,
)
# Send the result to the API
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": avg_confidence,
},
)
if resp.status_code == 200:
self.detected_license_plates[id] = {
"plate": top_plate,
"char_confidences": top_char_confidences,
"area": top_area,
}
self.__update_metrics(datetime.datetime.now().timestamp() - start)
def handle_request(self, topic, request_data) -> dict[str, any] | None:
return
def expire_object(self, object_id: str):
if object_id in self.detected_license_plates:
self.detected_license_plates.pop(object_id)
class CTCDecoder:
"""

View File

@ -131,47 +131,6 @@ class Embeddings:
device="GPU" if config.semantic_search.model_size == "large" else "CPU",
)
self.lpr_detection_model = None
self.lpr_classification_model = None
self.lpr_recognition_model = None
if self.config.lpr.enabled:
self.lpr_detection_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="detection.onnx",
download_urls={
"detection.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/detection.onnx"
},
model_size="large",
model_type=ModelTypeEnum.lpr_detect,
requestor=self.requestor,
device="CPU",
)
self.lpr_classification_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="classification.onnx",
download_urls={
"classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx"
},
model_size="large",
model_type=ModelTypeEnum.lpr_classify,
requestor=self.requestor,
device="CPU",
)
self.lpr_recognition_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="recognition.onnx",
download_urls={
"recognition.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/recognition.onnx"
},
model_size="large",
model_type=ModelTypeEnum.lpr_recognize,
requestor=self.requestor,
device="CPU",
)
def embed_thumbnail(
self, event_id: str, thumbnail: bytes, upsert: bool = True
) -> ndarray:

View File

@ -1,10 +1,8 @@
"""Maintain embeddings in SQLite-vec."""
import base64
import datetime
import logging
import os
import re
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
@ -12,7 +10,6 @@ from typing import Optional
import cv2
import numpy as np
import requests
from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase
@ -26,20 +23,21 @@ from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import (
CLIPS_DIR,
FRIGATE_LOCALHOST,
UPDATE_EVENT_DESCRIPTION,
)
from frigate.data_processing.real_time.api import RealTimeProcessorApi
from frigate.data_processing.real_time.bird_processor import BirdProcessor
from frigate.data_processing.real_time.face_processor import FaceProcessor
from frigate.data_processing.real_time.license_plate_processor import (
LicensePlateProcessor,
)
from frigate.data_processing.types import DataProcessorMetrics
from frigate.embeddings.lpr.lpr import LicensePlateRecognition
from frigate.events.types import EventTypeEnum
from frigate.genai import get_genai_client
from frigate.models import Event
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize
from frigate.util.image import SharedMemoryFrameManager, area, calculate_region
from frigate.util.image import SharedMemoryFrameManager, calculate_region
from .embeddings import Embeddings
@ -82,24 +80,15 @@ class EmbeddingMaintainer(threading.Thread):
if self.config.classification.bird.enabled:
self.processors.append(BirdProcessor(self.config, metrics))
if self.config.lpr.enabled:
self.processors.append(LicensePlateProcessor(self.config, metrics))
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
self.stop_event = stop_event
self.tracked_events: dict[str, list[any]] = {}
self.genai_client = get_genai_client(config)
# set license plate recognition conditions
self.lpr_config = self.config.lpr
self.requires_license_plate_detection = (
"license_plate" not in self.config.objects.all_objects
)
self.detected_license_plates: dict[str, dict[str, any]] = {}
if self.lpr_config.enabled:
self.license_plate_recognition = LicensePlateRecognition(
self.lpr_config, self.requestor, self.embeddings
)
def run(self) -> None:
"""Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set():
@ -164,11 +153,7 @@ class EmbeddingMaintainer(threading.Thread):
camera_config = self.config.cameras[camera]
# no need to process updated objects if face recognition, lpr, genai are disabled
if (
not camera_config.genai.enabled
and not self.lpr_config.enabled
and len(self.processors) == 0
):
if not camera_config.genai.enabled and len(self.processors) == 0:
return
# Create our own thumbnail based on the bounding box and the frame time
@ -188,16 +173,6 @@ class EmbeddingMaintainer(threading.Thread):
for processor in self.processors:
processor.process_frame(data, yuv_frame)
if self.lpr_config.enabled:
start = datetime.datetime.now().timestamp()
processed = self._process_license_plate(data, yuv_frame)
if processed:
duration = datetime.datetime.now().timestamp() - start
self.metrics.alpr_pps.value = (
self.metrics.alpr_pps.value * 9 + duration
) / 10
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if self.genai_client is not None and not data["stationary"]:
@ -229,9 +204,6 @@ class EmbeddingMaintainer(threading.Thread):
for processor in self.processors:
processor.expire_object(event_id)
if event_id in self.detected_license_plates:
self.detected_license_plates.pop(event_id)
if updated_db:
try:
event: Event = Event.get(Event.id == event_id)
@ -354,199 +326,6 @@ class EmbeddingMaintainer(threading.Thread):
if event_id:
self.handle_regenerate_description(event_id, source)
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Return the dimensions of the input image as [x, y, width, height]."""
height, width = input.shape[:2]
return (0, 0, width, height)
def _process_license_plate(
self, obj_data: dict[str, any], frame: np.ndarray
) -> bool:
"""Look for license plates in image."""
id = obj_data["id"]
# don't run for non car objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
return False
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug("Not a processing license plate for a stationary car object.")
return False
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return False
license_plate: Optional[dict[str, any]] = None
if self.requires_license_plate_detection:
logger.debug("Running manual license_plate detection.")
car_box = obj_data.get("box")
if not car_box:
return False
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = car_box
car = rgb[top:bottom, left:right]
license_plate = self._detect_license_plate(car)
if not license_plate:
logger.debug("Detected no license plates for car object.")
return False
license_plate_frame = car[
license_plate[1] : license_plate[3], license_plate[0] : license_plate[2]
]
license_plate_frame = cv2.cvtColor(license_plate_frame, cv2.COLOR_RGB2BGR)
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return False
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get("score", 0.0) > license_plate.get(
"score", 0.0
):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return False
license_plate_box = license_plate.get("box")
# check that license plate is valid
if (
not license_plate_box
or area(license_plate_box) < self.config.lpr.min_area
):
logger.debug(f"Invalid license plate box {license_plate}")
return False
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
license_plate_frame = license_plate_frame[
license_plate_box[1] : license_plate_box[3],
license_plate_box[0] : license_plate_box[2],
]
# run detection, returns results sorted by confidence, best first
license_plates, confidences, areas = (
self.license_plate_recognition.process_license_plate(license_plate_frame)
)
logger.debug(f"Text boxes: {license_plates}")
logger.debug(f"Confidences: {confidences}")
logger.debug(f"Areas: {areas}")
if license_plates:
for plate, confidence, text_area in zip(license_plates, confidences, areas):
avg_confidence = (
(sum(confidence) / len(confidence)) if confidence else 0
)
logger.debug(
f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
)
else:
# no plates found
logger.debug("No text detected")
return True
top_plate, top_char_confidences, top_area = (
license_plates[0],
confidences[0],
areas[0],
)
avg_confidence = (
(sum(top_char_confidences) / len(top_char_confidences))
if top_char_confidences
else 0
)
# Check if we have a previously detected plate for this ID
if id in self.detected_license_plates:
prev_plate = self.detected_license_plates[id]["plate"]
prev_char_confidences = self.detected_license_plates[id]["char_confidences"]
prev_area = self.detected_license_plates[id]["area"]
prev_avg_confidence = (
(sum(prev_char_confidences) / len(prev_char_confidences))
if prev_char_confidences
else 0
)
# Define conditions for keeping the previous plate
shorter_than_previous = len(top_plate) < len(prev_plate)
lower_avg_confidence = avg_confidence <= prev_avg_confidence
smaller_area = top_area < prev_area
# Compare character-by-character confidence where possible
min_length = min(len(top_plate), len(prev_plate))
char_confidence_comparison = sum(
1
for i in range(min_length)
if top_char_confidences[i] <= prev_char_confidences[i]
)
worse_char_confidences = char_confidence_comparison >= min_length / 2
if (shorter_than_previous or smaller_area) and (
lower_avg_confidence and worse_char_confidences
):
logger.debug(
f"Keeping previous plate. New plate stats: "
f"length={len(top_plate)}, avg_conf={avg_confidence:.2f}, area={top_area} "
f"vs Previous: length={len(prev_plate)}, avg_conf={prev_avg_confidence:.2f}, area={prev_area}"
)
return True
# Check against minimum confidence threshold
if avg_confidence < self.lpr_config.threshold:
logger.debug(
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.threshold})"
)
return True
# Determine subLabel based on known plates, use regex matching
# Default to the detected plate, use label name if there's a match
sub_label = next(
(
label
for label, plates in self.lpr_config.known_plates.items()
if any(re.match(f"^{plate}$", top_plate) for plate in plates)
),
top_plate,
)
# Send the result to the API
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": avg_confidence,
},
)
if resp.status_code == 200:
self.detected_license_plates[id] = {
"plate": top_plate,
"char_confidences": top_char_confidences,
"area": top_area,
}
return True
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)