blakeblackshear.frigate/frigate/embeddings/onnx/lpr_embedding.py
Josh Hawkins 645868e099
Upgrade PaddleOCR models to v4 (rec) and v5 (det) (#18505)
The PP_OCRv5 text detection models have greatly improved over v3. The v5 recognition model makes improvements to challenging handwriting and uncommon characters, which are not necessary for LPR, so using v4 seemed like a better choice to continue to keep inference time as low as possible. Also included is the full dictionary for Chinese character support.
2025-06-01 14:21:12 -06:00

304 lines
10 KiB
Python

import logging
import os
import warnings
import cv2
import numpy as np
from frigate.comms.inter_process import InterProcessRequestor
from frigate.const import MODEL_CACHE_DIR
from frigate.types import ModelStatusTypesEnum
from frigate.util.downloader import ModelDownloader
from .base_embedding import BaseEmbedding
from .runner import ONNXModelRunner
warnings.filterwarnings(
"ignore",
category=FutureWarning,
message="The class CLIPFeatureExtractor is deprecated",
)
logger = logging.getLogger(__name__)
LPR_EMBEDDING_SIZE = 256
class PaddleOCRDetection(BaseEmbedding):
def __init__(
self,
model_size: str,
requestor: InterProcessRequestor,
device: str = "AUTO",
):
model_file = (
"detection_v5-large.onnx"
if model_size == "large"
else "detection_v5-small.onnx"
)
super().__init__(
model_name="paddleocr-onnx",
model_file=model_file,
download_urls={
model_file: f"https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/v5/{model_file}"
},
)
self.requestor = requestor
self.model_size = model_size
self.device = device
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
self.runner: ONNXModelRunner | None = None
files_names = list(self.download_urls.keys())
if not all(
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
):
logger.debug(f"starting model download for {self.model_name}")
self.downloader = ModelDownloader(
model_name=self.model_name,
download_path=self.download_path,
file_names=files_names,
download_func=self._download_model,
)
self.downloader.ensure_model_files()
else:
self.downloader = None
ModelDownloader.mark_files_state(
self.requestor,
self.model_name,
files_names,
ModelStatusTypesEnum.downloaded,
)
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:
self.downloader.wait_for_download()
self.runner = ONNXModelRunner(
os.path.join(self.download_path, self.model_file),
self.device,
self.model_size,
)
def _preprocess_inputs(self, raw_inputs):
preprocessed = []
for x in raw_inputs:
preprocessed.append(x)
return [{"x": preprocessed[0]}]
class PaddleOCRClassification(BaseEmbedding):
def __init__(
self,
model_size: str,
requestor: InterProcessRequestor,
device: str = "AUTO",
):
super().__init__(
model_name="paddleocr-onnx",
model_file="classification.onnx",
download_urls={
"classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx"
},
)
self.requestor = requestor
self.model_size = model_size
self.device = device
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
self.runner: ONNXModelRunner | None = None
files_names = list(self.download_urls.keys())
if not all(
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
):
logger.debug(f"starting model download for {self.model_name}")
self.downloader = ModelDownloader(
model_name=self.model_name,
download_path=self.download_path,
file_names=files_names,
download_func=self._download_model,
)
self.downloader.ensure_model_files()
else:
self.downloader = None
ModelDownloader.mark_files_state(
self.requestor,
self.model_name,
files_names,
ModelStatusTypesEnum.downloaded,
)
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:
self.downloader.wait_for_download()
self.runner = ONNXModelRunner(
os.path.join(self.download_path, self.model_file),
self.device,
self.model_size,
)
def _preprocess_inputs(self, raw_inputs):
processed = []
for img in raw_inputs:
processed.append({"x": img})
return processed
class PaddleOCRRecognition(BaseEmbedding):
def __init__(
self,
model_size: str,
requestor: InterProcessRequestor,
device: str = "AUTO",
):
super().__init__(
model_name="paddleocr-onnx",
model_file="recognition_v4.onnx",
download_urls={
"recognition_v4.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/v4/recognition_v4.onnx",
"ppocr_keys_v1.txt": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/v4/ppocr_keys_v1.txt",
},
)
self.requestor = requestor
self.model_size = model_size
self.device = device
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
self.runner: ONNXModelRunner | None = None
files_names = list(self.download_urls.keys())
if not all(
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
):
logger.debug(f"starting model download for {self.model_name}")
self.downloader = ModelDownloader(
model_name=self.model_name,
download_path=self.download_path,
file_names=files_names,
download_func=self._download_model,
)
self.downloader.ensure_model_files()
else:
self.downloader = None
ModelDownloader.mark_files_state(
self.requestor,
self.model_name,
files_names,
ModelStatusTypesEnum.downloaded,
)
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:
self.downloader.wait_for_download()
self.runner = ONNXModelRunner(
os.path.join(self.download_path, self.model_file),
self.device,
self.model_size,
)
def _preprocess_inputs(self, raw_inputs):
processed = []
for img in raw_inputs:
processed.append({"x": img})
return processed
class LicensePlateDetector(BaseEmbedding):
def __init__(
self,
model_size: str,
requestor: InterProcessRequestor,
device: str = "AUTO",
):
super().__init__(
model_name="yolov9_license_plate",
model_file="yolov9-256-license-plates.onnx",
download_urls={
"yolov9-256-license-plates.onnx": "https://github.com/hawkeye217/yolov9-license-plates/raw/refs/heads/master/models/yolov9-256-license-plates.onnx"
},
)
self.requestor = requestor
self.model_size = model_size
self.device = device
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
self.runner: ONNXModelRunner | None = None
files_names = list(self.download_urls.keys())
if not all(
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
):
logger.debug(f"starting model download for {self.model_name}")
self.downloader = ModelDownloader(
model_name=self.model_name,
download_path=self.download_path,
file_names=files_names,
download_func=self._download_model,
)
self.downloader.ensure_model_files()
else:
self.downloader = None
ModelDownloader.mark_files_state(
self.requestor,
self.model_name,
files_names,
ModelStatusTypesEnum.downloaded,
)
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:
self.downloader.wait_for_download()
self.runner = ONNXModelRunner(
os.path.join(self.download_path, self.model_file),
self.device,
self.model_size,
)
def _preprocess_inputs(self, raw_inputs):
if isinstance(raw_inputs, list):
raise ValueError("License plate embedding does not support batch inputs.")
img = raw_inputs
height, width, channels = img.shape
# Resize maintaining aspect ratio
if width > height:
new_height = int(((height / width) * LPR_EMBEDDING_SIZE) // 4 * 4)
img = cv2.resize(img, (LPR_EMBEDDING_SIZE, new_height))
else:
new_width = int(((width / height) * LPR_EMBEDDING_SIZE) // 4 * 4)
img = cv2.resize(img, (new_width, LPR_EMBEDDING_SIZE))
# Get new dimensions after resize
og_h, og_w, channels = img.shape
# Create black square frame
frame = np.full(
(LPR_EMBEDDING_SIZE, LPR_EMBEDDING_SIZE, channels),
(0, 0, 0),
dtype=np.float32,
)
# Center the resized image in the square frame
x_center = (LPR_EMBEDDING_SIZE - og_w) // 2
y_center = (LPR_EMBEDDING_SIZE - og_h) // 2
frame[y_center : y_center + og_h, x_center : x_center + og_w] = img
# Normalize to 0-1
frame = frame / 255.0
# Convert from HWC to CHW format and add batch dimension
frame = np.transpose(frame, (2, 0, 1))
frame = np.expand_dims(frame, axis=0)
return [{"images": frame}]