LPR improvements (#20129)

* continue to use paddleocr v3 text detection model for large

v5 was not finding text on multi-line plates at all in testing

* implement clustering of plate variants per event

should reduce OCR inconsistencies and improve plate recognition stability by using string similarity to cluster similar variants (10 per event id) and choosing the highest confidence representative as the final plate

* pass camera

* prune number of variants based on detect fps

* implement replacement rules for cleaning up and normalizing plates

* docs

* docs
This commit is contained in:
Josh Hawkins 2025-09-18 16:12:17 -05:00 committed by GitHub
parent 68f806bb61
commit 251b029d6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 213 additions and 150 deletions

View File

@ -67,12 +67,15 @@ Fine-tune the LPR feature using these optional parameters at the global level of
- **`min_area`**: Defines the minimum area (in pixels) a license plate must be before recognition runs.
- Default: `1000` pixels. Note: this is intentionally set very low as it is an _area_ measurement (length x width). For reference, 1000 pixels represents a ~32x32 pixel square in your camera image.
- Depending on the resolution of your camera's `detect` stream, you can increase this value to ignore small or distant plates.
- **`device`**: Device to use to run license plate detection *and* recognition models.
- **`device`**: Device to use to run license plate detection _and_ recognition models.
- Default: `CPU`
- This can be `CPU` or one of [onnxruntime's provider options](https://onnxruntime.ai/docs/execution-providers/). For users without a model that detects license plates natively, using a GPU may increase performance of the models, especially the YOLOv9 license plate detector model. See the [Hardware Accelerated Enrichments](/configuration/hardware_acceleration_enrichments.md) documentation.
- **`model_size`**: The size of the model used to detect text on plates.
- This can be `CPU`, `GPU`, or the GPU's device number. For users without a model that detects license plates natively, using a GPU may increase performance of the YOLOv9 license plate detector model. See the [Hardware Accelerated Enrichments](/configuration/hardware_acceleration_enrichments.md) documentation. However, for users who run a model that detects `license_plate` natively, there is little to no performance gain reported with running LPR on GPU compared to the CPU.
- **`model_size`**: The size of the model used to identify regions of text on plates.
- Default: `small`
- This can be `small` or `large`. The `large` model uses an enhanced text detector and is more accurate at finding text on plates but slower than the `small` model. For most users, the small model is recommended. For users in countries with multiple lines of text on plates, the large model is recommended. Note that using the large model does not improve _text recognition_, but it may improve _text detection_.
- This can be `small` or `large`.
- The `small` model is fast and identifies groups of Latin and Chinese characters.
- The `large` model identifies Latin characters only, but uses an enhanced text detector and is more capable at finding characters on multi-line plates. It is significantly slower than the `small` model. Note that using the `large` model does not improve _text recognition_, but it may improve _text detection_.
- For most users, the `small` model is recommended.
### Recognition
@ -102,6 +105,32 @@ Fine-tune the LPR feature using these optional parameters at the global level of
- This setting is best adjusted at the camera level if running LPR on multiple cameras.
- If Frigate is already recognizing plates correctly, leave this setting at the default of `0`. However, if you're experiencing frequent character issues or incomplete plates and you can already easily read the plates yourself, try increasing the value gradually, starting at 5 and adjusting as needed. You should see how different enhancement levels affect your plates. Use the `debug_save_plates` configuration option (see below).
### Normalization Rules
- **`replace_rules`**: List of regex replacement rules to normalize detected plates. These rules are applied sequentially. Each rule must have a `pattern` (which can be a string or a regex, prepended by `r`) and `replacement` (a string, which also supports [backrefs](https://docs.python.org/3/library/re.html#re.sub) like `\1`). These rules are useful for dealing with common OCR issues like noise characters, separators, or confusions (e.g., 'O'→'0').
These rules must be defined at the global level of your `lpr` config.
```yaml
lpr:
replace_rules:
- pattern: r'[%#*?]' # Remove noise symbols
replacement: ""
- pattern: r'[= ]' # Normalize = or space to dash
replacement: "-"
- pattern: "O" # Swap 'O' to '0' (common OCR error)
replacement: "0"
- pattern: r'I' # Swap 'I' to '1'
replacement: "1"
- pattern: r'(\w{3})(\w{3})' # Split 6 chars into groups (e.g., ABC123 → ABC-123)
replacement: r'\1-\2'
```
- Rules fire in order: In the example above: clean noise first, then separators, then swaps, then splits.
- Backrefs (`\1`, `\2`) allow dynamic replacements (e.g., capture groups).
- Any changes made by the rules are printed to the LPR debug log.
- Tip: You can test patterns with tools like regex101.com.
### Debugging
- **`debug_save_plates`**: Set to `True` to save captured text on plates for debugging. These images are stored in `/media/frigate/clips/lpr`, organized into subdirectories by `<camera>/<event_id>`, and named based on the capture timestamp.
@ -136,6 +165,9 @@ lpr:
recognition_threshold: 0.85
format: "^[A-Z]{2} [A-Z][0-9]{4}$" # Only recognize plates that are two letters, followed by a space, followed by a single letter and 4 numbers
match_distance: 1 # Allow one character variation in plate matching
replace_rules:
- pattern: "O"
replacement: "0" # Replace the letter O with the number 0 in every plate
known_plates:
Delivery Van:
- "RJ K5678"

View File

@ -661,6 +661,8 @@ lpr:
enhancement: 0
# Optional: Save plate images to /media/frigate/clips/lpr for debugging purposes (default: shown below)
debug_save_plates: False
# Optional: List of regex replacement rules to normalize detected plates (default: shown below)
replace_rules: {}
# Optional: Configuration for AI generated tracked object descriptions
# WARNING: Depending on the provider, this will send thumbnails over the internet

View File

@ -217,6 +217,13 @@ class CameraFaceRecognitionConfig(FrigateBaseModel):
model_config = ConfigDict(extra="forbid", protected_namespaces=())
class ReplaceRule(FrigateBaseModel):
pattern: str = Field(..., title="Regex pattern to match.")
replacement: str = Field(
..., title="Replacement string (supports backrefs like '\\1')."
)
class LicensePlateRecognitionConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable license plate recognition.")
model_size: str = Field(
@ -269,6 +276,10 @@ class LicensePlateRecognitionConfig(FrigateBaseModel):
title="The device key to use for LPR.",
description="This is an override, to target a specific device. See https://onnxruntime.ai/docs/execution-providers/ for more information",
)
replace_rules: List[ReplaceRule] = Field(
default_factory=list,
title="List of regex replacement rules for normalizing detected plates. Each rule has 'pattern' and 'replacement'.",
)
class CameraLicensePlateRecognitionConfig(FrigateBaseModel):

View File

@ -65,6 +65,7 @@ class LicensePlateProcessingMixin:
# matching
self.similarity_threshold = 0.8
self.cluster_threshold = 0.85
def _detect(self, image: np.ndarray) -> List[np.ndarray]:
"""
@ -209,7 +210,7 @@ class LicensePlateProcessingMixin:
boxes = self._detect(image)
if len(boxes) == 0:
logger.debug("No boxes found by OCR detector model")
logger.debug(f"{camera}: No boxes found by OCR detector model")
return [], [], []
if len(boxes) > 0:
@ -359,6 +360,27 @@ class LicensePlateProcessingMixin:
conf for conf_list in qualifying_confidences for conf in conf_list
]
# Apply replace rules to combined_plate if configured
original_combined = combined_plate
if self.lpr_config.replace_rules:
for rule in self.lpr_config.replace_rules:
try:
pattern = getattr(rule, "pattern", "")
replacement = getattr(rule, "replacement", "")
if pattern:
combined_plate = re.sub(
pattern, replacement, combined_plate
)
except re.error as e:
logger.warning(
f"{camera}: Invalid regex in replace_rules '{pattern}': {e}"
)
if combined_plate != original_combined:
logger.debug(
f"{camera}: Rules applied: '{original_combined}' -> '{combined_plate}'"
)
# Compute the combined area for qualifying boxes
qualifying_boxes = [boxes[i] for i in qualifying_indices]
qualifying_plate_images = [
@ -381,7 +403,7 @@ class LicensePlateProcessingMixin:
):
if len(plate) < self.lpr_config.min_plate_length:
logger.debug(
f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})"
f"{camera}: Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})"
)
continue
@ -389,7 +411,7 @@ class LicensePlateProcessingMixin:
try:
if not re.fullmatch(self.lpr_config.format, plate):
logger.debug(
f"Filtered out '{plate}' due to format mismatch"
f"{camera}: Filtered out '{plate}' due to format mismatch"
)
continue
except re.error:
@ -996,7 +1018,9 @@ class LicensePlateProcessingMixin:
image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
return image
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
def _detect_license_plate(
self, camera: string, input: np.ndarray
) -> tuple[int, int, int, int]:
"""
Use a lightweight YOLOv9 model to detect license plates for users without Frigate+
@ -1066,118 +1090,87 @@ class LicensePlateProcessingMixin:
).clip(0, [input.shape[1], input.shape[0]] * 2)
logger.debug(
f"Found license plate. Bounding box: {expanded_box.astype(int)}"
f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}"
)
return tuple(expanded_box.astype(int))
else:
return None # No detection above the threshold
def _should_keep_previous_plate(
self, id, top_plate, top_char_confidences, top_area, avg_confidence
):
"""Determine if the previous plate should be kept over the current one."""
if id not in self.detected_license_plates:
return False
def _get_cluster_rep(
self, plates: List[dict]
) -> Tuple[str, float, List[float], int]:
"""
Cluster plate variants and select the representative from the best cluster.
"""
if len(plates) == 0:
return "", 0.0, [], 0
prev_data = self.detected_license_plates[id]
prev_plate = prev_data["plate"]
prev_char_confidences = prev_data["char_confidences"]
prev_area = prev_data["area"]
prev_avg_confidence = (
sum(prev_char_confidences) / len(prev_char_confidences)
if prev_char_confidences
else 0
if len(plates) == 1:
p = plates[0]
return p["plate"], p["conf"], p["char_confidences"], p["area"]
# Log initial variants
logger.debug(f"Clustering {len(plates)} plate variants:")
for i, p in enumerate(plates):
logger.debug(
f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})"
)
clusters = []
for i, plate in enumerate(plates):
merged = False
for j, cluster in enumerate(clusters):
sims = [jaro_winkler(plate["plate"], v["plate"]) for v in cluster]
if len(sims) > 0:
avg_sim = sum(sims) / len(sims)
if avg_sim >= self.cluster_threshold:
cluster.append(plate)
logger.debug(
f" Merged variant {i + 1} '{plate['plate']}' (conf: {plate['conf']:.3f}) into cluster {j + 1} (avg_sim: {avg_sim:.3f})"
)
merged = True
break
if not merged:
clusters.append([plate])
logger.debug(
f" Started new cluster {len(clusters)} with variant {i + 1} '{plate['plate']}' (conf: {plate['conf']:.3f})"
)
if not clusters:
return "", 0.0, [], 0
# Log cluster summaries
for j, cluster in enumerate(clusters):
cluster_size = len(cluster)
max_conf = max(v["conf"] for v in cluster)
sample_variants = [v["plate"] for v in cluster[:3]] # First 3 for brevity
logger.debug(
f" Cluster {j + 1}: size {cluster_size}, max conf {max_conf:.3f}, variants: {sample_variants}{'...' if cluster_size > 3 else ''}"
)
# Best cluster: largest size, tiebroken by max conf
def cluster_score(c):
return (len(c), max(v["conf"] for v in c))
best_cluster_idx = max(
range(len(clusters)), key=lambda j: cluster_score(clusters[j])
)
# 1. Normalize metrics
# Length score: Equal lengths = 0.5, penalize extra characters if low confidence
length_diff = len(top_plate) - len(prev_plate)
max_length_diff = 3
curr_length_score = 0.5 + (length_diff / (2 * max_length_diff))
curr_length_score = max(0, min(1, curr_length_score))
prev_length_score = 0.5 - (length_diff / (2 * max_length_diff))
prev_length_score = max(0, min(1, prev_length_score))
# Adjust length score based on confidence of extra characters
conf_threshold = 0.75 # Minimum confidence for a character to be "trusted"
top_plate_char_count = len(top_plate.replace(" ", ""))
prev_plate_char_count = len(prev_plate.replace(" ", ""))
if top_plate_char_count > prev_plate_char_count:
extra_confidences = top_char_confidences[prev_plate_char_count:]
if extra_confidences: # Ensure the slice is not empty
extra_conf = min(extra_confidences) # Lowest extra char confidence
if extra_conf < conf_threshold:
curr_length_score *= extra_conf / conf_threshold # Penalize if weak
elif prev_plate_char_count > top_plate_char_count:
extra_confidences = prev_char_confidences[top_plate_char_count:]
if extra_confidences: # Ensure the slice is not empty
extra_conf = min(extra_confidences)
if extra_conf < conf_threshold:
prev_length_score *= extra_conf / conf_threshold
# Area score: Normalize by max area
max_area = max(top_area, prev_area)
curr_area_score = top_area / max_area if max_area > 0 else 0
prev_area_score = prev_area / max_area if max_area > 0 else 0
# Confidence scores
curr_conf_score = avg_confidence
prev_conf_score = prev_avg_confidence
# Character confidence comparison (average over shared length)
min_length = min(len(top_plate), len(prev_plate))
if min_length > 0:
curr_char_conf = sum(top_char_confidences[:min_length]) / min_length
prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length
else:
curr_char_conf = prev_char_conf = 0
# Penalize any character below threshold
curr_min_conf = min(top_char_confidences) if top_char_confidences else 0
prev_min_conf = min(prev_char_confidences) if prev_char_confidences else 0
curr_conf_penalty = (
1.0 if curr_min_conf >= conf_threshold else (curr_min_conf / conf_threshold)
)
prev_conf_penalty = (
1.0 if prev_min_conf >= conf_threshold else (prev_min_conf / conf_threshold)
)
# 2. Define weights (boost confidence importance)
weights = {
"length": 0.2,
"area": 0.2,
"avg_confidence": 0.35,
"char_confidence": 0.25,
}
# 3. Calculate weighted scores with penalty
curr_score = (
curr_length_score * weights["length"]
+ curr_area_score * weights["area"]
+ curr_conf_score * weights["avg_confidence"]
+ curr_char_conf * weights["char_confidence"]
) * curr_conf_penalty
prev_score = (
prev_length_score * weights["length"]
+ prev_area_score * weights["area"]
+ prev_conf_score * weights["avg_confidence"]
+ prev_char_conf * weights["char_confidence"]
) * prev_conf_penalty
# 4. Log the comparison
best_cluster = clusters[best_cluster_idx]
best_size, best_max_conf = cluster_score(best_cluster)
logger.debug(
f"Plate comparison - Current: {top_plate} (score: {curr_score:.3f}, min_conf: {curr_min_conf:.2f}) vs "
f"Previous: {prev_plate} (score: {prev_score:.3f}, min_conf: {prev_min_conf:.2f}) "
f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), "
f"Area: {top_area} vs {prev_area}, "
f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}, "
f"Char Conf: {curr_char_conf:.2f} vs {prev_char_conf:.2f}"
f" Selected best cluster {best_cluster_idx + 1}: size {best_size}, max conf {best_max_conf:.3f}"
)
# 5. Return True if previous plate scores higher
return prev_score > curr_score
# Rep: highest conf in best cluster
rep = max(best_cluster, key=lambda v: v["conf"])
logger.debug(
f" Selected rep from best cluster: '{rep['plate']}' (conf: {rep['conf']:.3f})"
)
logger.debug(
f" Final clustered plate: '{rep['plate']}' (conf: {rep['conf']:.3f})"
)
return rep["plate"], rep["conf"], rep["char_confidences"], rep["area"]
def _generate_plate_event(self, camera: str, plate: str, plate_score: float) -> str:
"""Generate a unique ID for a plate event based on camera and text."""
@ -1228,7 +1221,7 @@ class LicensePlateProcessingMixin:
)
yolov9_start = datetime.datetime.now().timestamp()
license_plate = self._detect_license_plate(rgb)
license_plate = self._detect_license_plate(camera, rgb)
logger.debug(
f"{camera}: YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms"
@ -1320,7 +1313,7 @@ class LicensePlateProcessingMixin:
)
yolov9_start = datetime.datetime.now().timestamp()
license_plate = self._detect_license_plate(car)
license_plate = self._detect_license_plate(camera, car)
logger.debug(
f"{camera}: YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms"
)
@ -1451,7 +1444,7 @@ class LicensePlateProcessingMixin:
f"{camera}: Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
)
else:
logger.debug("No text detected")
logger.debug(f"{camera}: No text detected")
return
top_plate, top_char_confidences, top_area = (
@ -1494,9 +1487,7 @@ class LicensePlateProcessingMixin:
)
break
if plate_id is None:
plate_id = self._generate_plate_event(
obj_data, top_plate, avg_confidence
)
plate_id = self._generate_plate_event(camera, top_plate, avg_confidence)
logger.debug(
f"{camera}: New plate event for dedicated LPR camera {plate_id}: {top_plate}"
)
@ -1508,25 +1499,68 @@ class LicensePlateProcessingMixin:
id = plate_id
# Check if we have a previously detected plate for this ID
if id in self.detected_license_plates:
if self._should_keep_previous_plate(
id, top_plate, top_char_confidences, top_area, avg_confidence
):
logger.debug(f"{camera}: Keeping previous plate")
return
is_new = id not in self.detected_license_plates
# Collect variant
variant = {
"plate": top_plate,
"conf": avg_confidence,
"char_confidences": top_char_confidences,
"area": top_area,
"timestamp": current_time,
}
# Initialize or append to plates
self.detected_license_plates.setdefault(id, {"plates": [], "camera": camera})
self.detected_license_plates[id]["plates"].append(variant)
# Prune old variants - this is probably higher than it needs to be
# since we don't detect a plate every frame
num_variants = self.config.cameras[camera].detect.fps * 5
if len(self.detected_license_plates[id]["plates"]) > num_variants:
self.detected_license_plates[id]["plates"] = self.detected_license_plates[
id
]["plates"][-num_variants:]
# Cluster and select rep
plates = self.detected_license_plates[id]["plates"]
rep_plate, rep_conf, rep_char_confs, rep_area = self._get_cluster_rep(plates)
if rep_plate != top_plate:
logger.debug(
f"{camera}: Clustering changed top plate '{top_plate}' (conf: {avg_confidence:.3f}) to rep '{rep_plate}' (conf: {rep_conf:.3f})"
)
# Update stored rep
self.detected_license_plates[id].update(
{
"plate": rep_plate,
"char_confidences": rep_char_confs,
"area": rep_area,
"last_seen": current_time if dedicated_lpr else None,
}
)
if not dedicated_lpr:
self.detected_license_plates[id]["obj_data"] = obj_data
if is_new:
if camera not in self.camera_current_cars:
self.camera_current_cars[camera] = []
self.camera_current_cars[camera].append(id)
# Determine subLabel based on known plates, use regex matching
# Default to the detected plate, use label name if there's a match
sub_label = None
try:
sub_label = next(
(
label
for label, plates in self.lpr_config.known_plates.items()
for label, plates_list in self.lpr_config.known_plates.items()
if any(
re.match(f"^{plate}$", top_plate)
or distance(plate, top_plate) <= self.lpr_config.match_distance
for plate in plates
re.match(f"^{plate}$", rep_plate)
or distance(plate, rep_plate) <= self.lpr_config.match_distance
for plate in plates_list
)
),
None,
@ -1535,12 +1569,11 @@ class LicensePlateProcessingMixin:
logger.error(
f"{camera}: Invalid regex in known plates configuration: {self.lpr_config.known_plates}"
)
sub_label = None
# If it's a known plate, publish to sub_label
if sub_label is not None:
self.sub_label_publisher.publish(
(id, sub_label, avg_confidence), EventMetadataTypeEnum.sub_label.value
(id, sub_label, rep_conf), EventMetadataTypeEnum.sub_label.value
)
# always publish to recognized_license_plate field
@ -1550,8 +1583,8 @@ class LicensePlateProcessingMixin:
{
"type": TrackedObjectUpdateTypesEnum.lpr,
"name": sub_label,
"plate": top_plate,
"score": avg_confidence,
"plate": rep_plate,
"score": rep_conf,
"id": id,
"camera": camera,
"timestamp": start,
@ -1559,7 +1592,7 @@ class LicensePlateProcessingMixin:
),
)
self.sub_label_publisher.publish(
(id, "recognized_license_plate", top_plate, avg_confidence),
(id, "recognized_license_plate", rep_plate, rep_conf),
EventMetadataTypeEnum.attribute.value,
)
@ -1569,7 +1602,7 @@ class LicensePlateProcessingMixin:
and "license_plate" not in self.config.cameras[camera].objects.track
):
logger.debug(
f"{camera}: Writing snapshot for {id}, {top_plate}, {current_time}"
f"{camera}: Writing snapshot for {id}, {rep_plate}, {current_time}"
)
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
_, encoded_img = cv2.imencode(".jpg", frame_bgr)
@ -1578,21 +1611,6 @@ class LicensePlateProcessingMixin:
EventMetadataTypeEnum.save_lpr_snapshot.value,
)
if id not in self.detected_license_plates:
if camera not in self.camera_current_cars:
self.camera_current_cars[camera] = []
self.camera_current_cars[camera].append(id)
self.detected_license_plates[id] = {
"plate": top_plate,
"char_confidences": top_char_confidences,
"area": top_area,
"obj_data": obj_data,
"camera": camera,
"last_seen": current_time if dedicated_lpr else None,
}
def handle_request(self, topic, request_data) -> dict[str, Any] | None:
return

View File

@ -33,7 +33,7 @@ class PaddleOCRDetection(BaseEmbedding):
device: str = "AUTO",
):
model_file = (
"detection_v5-large.onnx"
"detection_v3-large.onnx"
if model_size == "large"
else "detection_v5-small.onnx"
)
@ -41,7 +41,7 @@ class PaddleOCRDetection(BaseEmbedding):
model_name="paddleocr-onnx",
model_file=model_file,
download_urls={
model_file: f"https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/v5/{model_file}"
model_file: f"https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/{'v3' if model_size == 'large' else 'v5'}/{model_file}"
},
)
self.requestor = requestor