LPR improvements (#17716)

* add support for multi-line plates

* config for model size

* default to small model

* add license plate as attribute to motorcycle

* use model size

* docs

* attribute map

* i18n key fix
This commit is contained in:
Josh Hawkins
2025-04-15 10:40:12 -05:00
committed by GitHub
parent 8803fd7fff
commit 760ed25f0c
11 changed files with 217 additions and 101 deletions

View File

@@ -53,7 +53,7 @@ class LicensePlateProcessingMixin:
def _detect(self, image: np.ndarray) -> List[np.ndarray]:
"""
Detect possible license plates in the input image by first resizing and normalizing it,
Detect possible areas of text in the input image by first resizing and normalizing it,
running a detection model, and filtering out low-probability regions.
Args:
@@ -80,6 +80,13 @@ class LicensePlateProcessingMixin:
outputs = self.model_runner.detection_model([normalized_image])[0]
outputs = outputs[0, :, :]
if False:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/probability_map_{current_time}.jpg",
(outputs * 255).astype(np.uint8),
)
boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
return self._filter_polygon(boxes, (h, w))
@@ -125,9 +132,6 @@ class LicensePlateProcessingMixin:
input_shape = [3, 48, 320]
num_images = len(images)
# sort images by aspect ratio for processing
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
for index in range(0, num_images, self.batch_size):
input_h, input_w = input_shape[1], input_shape[2]
max_wh_ratio = input_w / input_h
@@ -135,13 +139,13 @@ class LicensePlateProcessingMixin:
# calculate the maximum aspect ratio in the current batch
for i in range(index, min(num_images, index + self.batch_size)):
h, w = images[indices[i]].shape[0:2]
h, w = images[i].shape[0:2]
max_wh_ratio = max(max_wh_ratio, w * 1.0 / h)
# preprocess the images based on the max aspect ratio
for i in range(index, min(num_images, index + self.batch_size)):
norm_image = self._preprocess_recognition_image(
camera, images[indices[i]], max_wh_ratio
camera, images[i], max_wh_ratio
)
norm_image = norm_image[np.newaxis, :]
norm_images.append(norm_image)
@@ -150,16 +154,20 @@ class LicensePlateProcessingMixin:
return self.ctc_decoder(outputs)
def _process_license_plate(
self, camera: string, id: string, image: np.ndarray
) -> Tuple[List[str], List[float], List[int]]:
self, camera: str, id: str, image: np.ndarray
) -> Tuple[List[str], List[List[float]], List[int]]:
"""
Complete pipeline for detecting, classifying, and recognizing license plates in the input image.
Combines multi-line plates into a single plate string, grouping boxes by vertical alignment and ordering top to bottom,
but only combines boxes if their average confidence scores meet the threshold and their heights are similar.
Args:
camera (str): Camera identifier.
id (str): Event identifier.
image (np.ndarray): The input image in which to detect, classify, and recognize license plates.
Returns:
Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates.
Tuple[List[str], List[List[float]], List[int]]: Detected license plate texts, character-level confidence scores for each plate (flattened into a single list per plate), and areas of the plates.
"""
if (
self.model_runner.detection_model.runner is None
@@ -186,69 +194,162 @@ class LicensePlateProcessingMixin:
boxes, plate_width=plate_width, gap_fraction=0.1
)
boxes = self._sort_boxes(list(boxes))
plate_images = [self._crop_license_plate(image, x) for x in boxes]
current_time = int(datetime.datetime.now().timestamp())
if WRITE_DEBUG_IMAGES:
for i, img in enumerate(plate_images):
cv2.imwrite(
f"debug/frames/license_plate_cropped_{current_time}_{i + 1}.jpg",
img,
debug_image = image.copy()
for box in boxes:
box = box.astype(int)
x_min, y_min = np.min(box[:, 0]), np.min(box[:, 1])
x_max, y_max = np.max(box[:, 0]), np.max(box[:, 1])
cv2.rectangle(
debug_image,
(x_min, y_min),
(x_max, y_max),
color=(0, 255, 0),
thickness=2,
)
if self.config.lpr.debug_save_plates:
logger.debug(f"{camera}: Saving plates for event {id}")
Path(os.path.join(CLIPS_DIR, f"lpr/{camera}/{id}")).mkdir(
parents=True, exist_ok=True
cv2.imwrite(
f"debug/frames/license_plate_boxes_{current_time}.jpg", debug_image
)
for i, img in enumerate(plate_images):
cv2.imwrite(
os.path.join(
CLIPS_DIR, f"lpr/{camera}/{id}/{current_time}_{i + 1}.jpg"
),
img,
boxes = self._sort_boxes(list(boxes))
# Step 1: Compute box heights and group boxes by vertical alignment and height similarity
box_info = []
for i, box in enumerate(boxes):
y_coords = box[:, 1]
y_min, y_max = np.min(y_coords), np.max(y_coords)
height = y_max - y_min
box_info.append((y_min, y_max, height, i))
# Initial grouping based on y-coordinate overlap and height similarity
initial_groups = []
current_group = [box_info[0]]
height_tolerance = 0.25 # Allow 25% difference in height for grouping
for i in range(1, len(box_info)):
prev_y_min, prev_y_max, prev_height, _ = current_group[-1]
curr_y_min, _, curr_height, _ = box_info[i]
# Check y-coordinate overlap
overlap_threshold = 0.1 * (prev_y_max - prev_y_min)
overlaps = curr_y_min <= prev_y_max + overlap_threshold
# Check height similarity
height_ratio = min(prev_height, curr_height) / max(prev_height, curr_height)
height_similar = height_ratio >= (1 - height_tolerance)
if overlaps and height_similar:
current_group.append(box_info[i])
else:
initial_groups.append(current_group)
current_group = [box_info[i]]
initial_groups.append(current_group)
# Step 2: Process each initial group, filter by confidence
all_license_plates = []
all_confidences = []
all_areas = []
processed_indices = set()
recognition_threshold = self.lpr_config.recognition_threshold
for group in initial_groups:
# Sort group by y-coordinate (top to bottom)
group.sort(key=lambda x: x[0])
group_indices = [item[3] for item in group]
# Skip if all indices in this group have already been processed
if all(idx in processed_indices for idx in group_indices):
continue
# Crop images for the group
group_boxes = [boxes[i] for i in group_indices]
group_plate_images = [
self._crop_license_plate(image, box) for box in group_boxes
]
if WRITE_DEBUG_IMAGES:
for i, img in enumerate(group_plate_images):
cv2.imwrite(
f"debug/frames/license_plate_cropped_{current_time}_{group_indices[i] + 1}.jpg",
img,
)
if self.config.lpr.debug_save_plates:
logger.debug(f"{camera}: Saving plates for event {id}")
Path(os.path.join(CLIPS_DIR, f"lpr/{camera}/{id}")).mkdir(
parents=True, exist_ok=True
)
for i, img in enumerate(group_plate_images):
cv2.imwrite(
os.path.join(
CLIPS_DIR,
f"lpr/{camera}/{id}/{current_time}_{group_indices[i] + 1}.jpg",
),
img,
)
# keep track of the index of each image for correct area calc later
sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in plate_images])
reverse_mapping = {
idx: original_idx for original_idx, idx in enumerate(sorted_indices)
}
# Recognize text in each cropped image
results, confidences = self._recognize(camera, group_plate_images)
results, confidences = self._recognize(camera, plate_images)
if not results:
continue
if results:
license_plates = [""] * len(plate_images)
average_confidences = [[0.0]] * len(plate_images)
areas = [0] * len(plate_images)
if not confidences:
confidences = [[0.0] for _ in results]
# map results back to original image order
for i, (plate, conf) in enumerate(zip(results, confidences)):
original_idx = reverse_mapping[i]
# Compute average confidence for each box's recognized text
avg_confidences = []
for conf_list in confidences:
avg_conf = sum(conf_list) / len(conf_list) if conf_list else 0.0
avg_confidences.append(avg_conf)
height, width = plate_images[original_idx].shape[:2]
area = height * width
# Filter boxes based on the recognition threshold
qualifying_indices = []
qualifying_results = []
qualifying_confidences = []
for i, (avg_conf, result, conf_list) in enumerate(
zip(avg_confidences, results, confidences)
):
if avg_conf >= recognition_threshold:
qualifying_indices.append(group_indices[i])
qualifying_results.append(result)
qualifying_confidences.append(conf_list)
average_confidence = conf
if not qualifying_results:
continue
# set to True to write each cropped image for debugging
if False:
filename = f"debug/frames/plate_{original_idx}_{plate}_{area}.jpg"
cv2.imwrite(filename, plate_images[original_idx])
processed_indices.update(qualifying_indices)
license_plates[original_idx] = plate
average_confidences[original_idx] = average_confidence
areas[original_idx] = area
# Combine the qualifying results into a single plate string
combined_plate = " ".join(qualifying_results)
# Filter out plates that have a length of less than min_plate_length characters
# or that don't match the expected format (if defined)
# Sort by area, then by plate length, then by confidence all desc
flat_confidences = [
conf for conf_list in qualifying_confidences for conf in conf_list
]
# Compute the combined area for qualifying boxes
qualifying_boxes = [boxes[i] for i in qualifying_indices]
qualifying_plate_images = [
self._crop_license_plate(image, box) for box in qualifying_boxes
]
group_areas = [
img.shape[0] * img.shape[1] for img in qualifying_plate_images
]
combined_area = sum(group_areas)
all_license_plates.append(combined_plate)
all_confidences.append(flat_confidences)
all_areas.append(combined_area)
# Step 3: Filter and sort the combined plates
if all_license_plates:
filtered_data = []
for plate, conf, area in zip(license_plates, average_confidences, areas):
for plate, conf_list, area in zip(
all_license_plates, all_confidences, all_areas
):
if len(plate) < self.lpr_config.min_plate_length:
logger.debug(
f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})"
@@ -261,11 +362,11 @@ class LicensePlateProcessingMixin:
logger.debug(f"Filtered out '{plate}' due to format mismatch")
continue
filtered_data.append((plate, conf, area))
filtered_data.append((plate, conf_list, area))
sorted_data = sorted(
filtered_data,
key=lambda x: (x[2], len(x[0]), x[1]),
key=lambda x: (x[2], len(x[0]), sum(x[1]) / len(x[1]) if x[1] else 0),
reverse=True,
)
@@ -428,40 +529,34 @@ class LicensePlateProcessingMixin:
contour = contours[index]
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
points, min_side = self._get_min_boxes(contour)
if min_side < self.min_size:
points, sside = self._get_min_boxes(contour)
if sside < self.min_size:
continue
points = np.array(points)
points = np.array(points, dtype=np.float32)
score = self._box_score(output, contour)
if self.box_thresh > score:
continue
polygon = Polygon(points)
distance = polygon.area / polygon.length
points = self._expand_box(points)
# Use pyclipper to shrink the polygon slightly based on the computed distance.
offset = PyclipperOffset()
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON)
points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2))
# get the minimum bounding box around the shrunken polygon.
box, min_side = self._get_min_boxes(points)
if min_side < self.min_size + 2:
# Get the minimum area rectangle again after expansion
points, sside = self._get_min_boxes(points.reshape(-1, 1, 2))
if sside < self.min_size + 2:
continue
box = np.array(box)
points = np.array(points, dtype=np.float32)
# normalize and clip box coordinates to fit within the destination image size.
box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width)
box[:, 1] = np.clip(
np.round(box[:, 1] / height * dest_height), 0, dest_height
points[:, 0] = np.clip(
np.round(points[:, 0] / width * dest_width), 0, dest_width
)
points[:, 1] = np.clip(
np.round(points[:, 1] / height * dest_height), 0, dest_height
)
boxes.append(box.astype("int32"))
boxes.append(points.astype("int32"))
scores.append(score)
return np.array(boxes, dtype="int32"), scores
@@ -969,16 +1064,21 @@ class LicensePlateProcessingMixin:
# Adjust length score based on confidence of extra characters
conf_threshold = 0.75 # Minimum confidence for a character to be "trusted"
if len(top_plate) > len(prev_plate):
extra_conf = min(
top_char_confidences[len(prev_plate) :]
) # Lowest extra char confidence
if extra_conf < conf_threshold:
curr_length_score *= extra_conf / conf_threshold # Penalize if weak
elif len(prev_plate) > len(top_plate):
extra_conf = min(prev_char_confidences[len(top_plate) :])
if extra_conf < conf_threshold:
prev_length_score *= extra_conf / conf_threshold
top_plate_char_count = len(top_plate.replace(" ", ""))
prev_plate_char_count = len(prev_plate.replace(" ", ""))
if top_plate_char_count > prev_plate_char_count:
extra_confidences = top_char_confidences[prev_plate_char_count:]
if extra_confidences: # Ensure the slice is not empty
extra_conf = min(extra_confidences) # Lowest extra char confidence
if extra_conf < conf_threshold:
curr_length_score *= extra_conf / conf_threshold # Penalize if weak
elif prev_plate_char_count > top_plate_char_count:
extra_confidences = prev_char_confidences[top_plate_char_count:]
if extra_confidences: # Ensure the slice is not empty
extra_conf = min(extra_confidences)
if extra_conf < conf_threshold:
prev_length_score *= extra_conf / conf_threshold
# Area score: Normalize by max area
max_area = max(top_area, prev_area)
@@ -1146,20 +1246,20 @@ class LicensePlateProcessingMixin:
else:
id = obj_data["id"]
# don't run for non car or non license plate (dedicated lpr with frigate+) objects
# don't run for non car/motorcycle or non license plate (dedicated lpr with frigate+) objects
if (
obj_data.get("label") != "car"
obj_data.get("label") not in ["car", "motorcycle"]
and obj_data.get("label") != "license_plate"
):
logger.debug(
f"{camera}: Not a processing license plate for non car object."
f"{camera}: Not a processing license plate for non car/motorcycle object."
)
return
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug(
f"{camera}: Not a processing license plate for a stationary car object."
f"{camera}: Not a processing license plate for a stationary car/motorcycle object."
)
return
@@ -1198,7 +1298,7 @@ class LicensePlateProcessingMixin:
if not license_plate:
logger.debug(
f"{camera}: Detected no license plates for car object."
f"{camera}: Detected no license plates for car/motorcycle object."
)
return
@@ -1230,7 +1330,7 @@ class LicensePlateProcessingMixin:
logger.debug(f"{camera}: No attributes to parse.")
return
if obj_data.get("label") == "car":
if obj_data.get("label") in ["car", "motorcycle"]:
attributes: list[dict[str, any]] = obj_data.get(
"current_attributes", []
)

View File

@@ -9,7 +9,7 @@ from ...types import DataProcessorModelRunner
class LicensePlateModelRunner(DataProcessorModelRunner):
def __init__(self, requestor, device: str = "CPU", model_size: str = "large"):
def __init__(self, requestor, device: str = "CPU", model_size: str = "small"):
super().__init__(requestor, device, model_size)
self.detection_model = PaddleOCRDetection(
model_size=model_size, requestor=requestor, device=device