mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-09-23 17:52:05 +02:00
* continue to use paddleocr v3 text detection model for large v5 was not finding text on multi-line plates at all in testing * implement clustering of plate variants per event should reduce OCR inconsistencies and improve plate recognition stability by using string similarity to cluster similar variants (10 per event id) and choosing the highest confidence representative as the final plate * pass camera * prune number of variants based on detect fps * implement replacement rules for cleaning up and normalizing plates * docs * docs
305 lines
11 KiB
Python
305 lines
11 KiB
Python
import logging
|
|
import os
|
|
import warnings
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from frigate.comms.inter_process import InterProcessRequestor
|
|
from frigate.const import MODEL_CACHE_DIR
|
|
from frigate.detectors.detection_runners import BaseModelRunner, get_optimized_runner
|
|
from frigate.embeddings.types import EnrichmentModelTypeEnum
|
|
from frigate.types import ModelStatusTypesEnum
|
|
from frigate.util.downloader import ModelDownloader
|
|
|
|
from .base_embedding import BaseEmbedding
|
|
|
|
warnings.filterwarnings(
|
|
"ignore",
|
|
category=FutureWarning,
|
|
message="The class CLIPFeatureExtractor is deprecated",
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
LPR_EMBEDDING_SIZE = 256
|
|
|
|
|
|
class PaddleOCRDetection(BaseEmbedding):
|
|
def __init__(
|
|
self,
|
|
model_size: str,
|
|
requestor: InterProcessRequestor,
|
|
device: str = "AUTO",
|
|
):
|
|
model_file = (
|
|
"detection_v3-large.onnx"
|
|
if model_size == "large"
|
|
else "detection_v5-small.onnx"
|
|
)
|
|
super().__init__(
|
|
model_name="paddleocr-onnx",
|
|
model_file=model_file,
|
|
download_urls={
|
|
model_file: f"https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/{'v3' if model_size == 'large' else 'v5'}/{model_file}"
|
|
},
|
|
)
|
|
self.requestor = requestor
|
|
self.model_size = model_size
|
|
self.device = device
|
|
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
|
|
self.runner: BaseModelRunner | None = None
|
|
files_names = list(self.download_urls.keys())
|
|
if not all(
|
|
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
|
|
):
|
|
logger.debug(f"starting model download for {self.model_name}")
|
|
self.downloader = ModelDownloader(
|
|
model_name=self.model_name,
|
|
download_path=self.download_path,
|
|
file_names=files_names,
|
|
download_func=self._download_model,
|
|
)
|
|
self.downloader.ensure_model_files()
|
|
else:
|
|
self.downloader = None
|
|
ModelDownloader.mark_files_state(
|
|
self.requestor,
|
|
self.model_name,
|
|
files_names,
|
|
ModelStatusTypesEnum.downloaded,
|
|
)
|
|
self._load_model_and_utils()
|
|
logger.debug(f"models are already downloaded for {self.model_name}")
|
|
|
|
def _load_model_and_utils(self):
|
|
if self.runner is None:
|
|
if self.downloader:
|
|
self.downloader.wait_for_download()
|
|
|
|
self.runner = get_optimized_runner(
|
|
os.path.join(self.download_path, self.model_file),
|
|
self.device,
|
|
model_type=EnrichmentModelTypeEnum.paddleocr.value,
|
|
)
|
|
|
|
def _preprocess_inputs(self, raw_inputs):
|
|
preprocessed = []
|
|
for x in raw_inputs:
|
|
preprocessed.append(x)
|
|
return [{"x": preprocessed[0]}]
|
|
|
|
|
|
class PaddleOCRClassification(BaseEmbedding):
|
|
def __init__(
|
|
self,
|
|
model_size: str,
|
|
requestor: InterProcessRequestor,
|
|
device: str = "AUTO",
|
|
):
|
|
super().__init__(
|
|
model_name="paddleocr-onnx",
|
|
model_file="classification.onnx",
|
|
download_urls={
|
|
"classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx"
|
|
},
|
|
)
|
|
self.requestor = requestor
|
|
self.model_size = model_size
|
|
self.device = device
|
|
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
|
|
self.runner: BaseModelRunner | None = None
|
|
files_names = list(self.download_urls.keys())
|
|
if not all(
|
|
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
|
|
):
|
|
logger.debug(f"starting model download for {self.model_name}")
|
|
self.downloader = ModelDownloader(
|
|
model_name=self.model_name,
|
|
download_path=self.download_path,
|
|
file_names=files_names,
|
|
download_func=self._download_model,
|
|
)
|
|
self.downloader.ensure_model_files()
|
|
else:
|
|
self.downloader = None
|
|
ModelDownloader.mark_files_state(
|
|
self.requestor,
|
|
self.model_name,
|
|
files_names,
|
|
ModelStatusTypesEnum.downloaded,
|
|
)
|
|
self._load_model_and_utils()
|
|
logger.debug(f"models are already downloaded for {self.model_name}")
|
|
|
|
def _load_model_and_utils(self):
|
|
if self.runner is None:
|
|
if self.downloader:
|
|
self.downloader.wait_for_download()
|
|
|
|
self.runner = get_optimized_runner(
|
|
os.path.join(self.download_path, self.model_file),
|
|
self.device,
|
|
model_type=EnrichmentModelTypeEnum.paddleocr.value,
|
|
)
|
|
|
|
def _preprocess_inputs(self, raw_inputs):
|
|
processed = []
|
|
for img in raw_inputs:
|
|
processed.append({"x": img})
|
|
return processed
|
|
|
|
|
|
class PaddleOCRRecognition(BaseEmbedding):
|
|
def __init__(
|
|
self,
|
|
model_size: str,
|
|
requestor: InterProcessRequestor,
|
|
device: str = "AUTO",
|
|
):
|
|
super().__init__(
|
|
model_name="paddleocr-onnx",
|
|
model_file="recognition_v4.onnx",
|
|
download_urls={
|
|
"recognition_v4.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/v4/recognition_v4.onnx",
|
|
"ppocr_keys_v1.txt": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/v4/ppocr_keys_v1.txt",
|
|
},
|
|
)
|
|
self.requestor = requestor
|
|
self.model_size = model_size
|
|
self.device = device
|
|
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
|
|
self.runner: BaseModelRunner | None = None
|
|
files_names = list(self.download_urls.keys())
|
|
if not all(
|
|
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
|
|
):
|
|
logger.debug(f"starting model download for {self.model_name}")
|
|
self.downloader = ModelDownloader(
|
|
model_name=self.model_name,
|
|
download_path=self.download_path,
|
|
file_names=files_names,
|
|
download_func=self._download_model,
|
|
)
|
|
self.downloader.ensure_model_files()
|
|
else:
|
|
self.downloader = None
|
|
ModelDownloader.mark_files_state(
|
|
self.requestor,
|
|
self.model_name,
|
|
files_names,
|
|
ModelStatusTypesEnum.downloaded,
|
|
)
|
|
self._load_model_and_utils()
|
|
logger.debug(f"models are already downloaded for {self.model_name}")
|
|
|
|
def _load_model_and_utils(self):
|
|
if self.runner is None:
|
|
if self.downloader:
|
|
self.downloader.wait_for_download()
|
|
|
|
self.runner = get_optimized_runner(
|
|
os.path.join(self.download_path, self.model_file),
|
|
self.device,
|
|
model_type=EnrichmentModelTypeEnum.paddleocr.value,
|
|
)
|
|
|
|
def _preprocess_inputs(self, raw_inputs):
|
|
processed = []
|
|
for img in raw_inputs:
|
|
processed.append({"x": img})
|
|
return processed
|
|
|
|
|
|
class LicensePlateDetector(BaseEmbedding):
|
|
def __init__(
|
|
self,
|
|
model_size: str,
|
|
requestor: InterProcessRequestor,
|
|
device: str = "AUTO",
|
|
):
|
|
super().__init__(
|
|
model_name="yolov9_license_plate",
|
|
model_file="yolov9-256-license-plates.onnx",
|
|
download_urls={
|
|
"yolov9-256-license-plates.onnx": "https://github.com/hawkeye217/yolov9-license-plates/raw/refs/heads/master/models/yolov9-256-license-plates.onnx"
|
|
},
|
|
)
|
|
|
|
self.requestor = requestor
|
|
self.model_size = model_size
|
|
self.device = device
|
|
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
|
|
self.runner: BaseModelRunner | None = None
|
|
files_names = list(self.download_urls.keys())
|
|
if not all(
|
|
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
|
|
):
|
|
logger.debug(f"starting model download for {self.model_name}")
|
|
self.downloader = ModelDownloader(
|
|
model_name=self.model_name,
|
|
download_path=self.download_path,
|
|
file_names=files_names,
|
|
download_func=self._download_model,
|
|
)
|
|
self.downloader.ensure_model_files()
|
|
else:
|
|
self.downloader = None
|
|
ModelDownloader.mark_files_state(
|
|
self.requestor,
|
|
self.model_name,
|
|
files_names,
|
|
ModelStatusTypesEnum.downloaded,
|
|
)
|
|
self._load_model_and_utils()
|
|
logger.debug(f"models are already downloaded for {self.model_name}")
|
|
|
|
def _load_model_and_utils(self):
|
|
if self.runner is None:
|
|
if self.downloader:
|
|
self.downloader.wait_for_download()
|
|
|
|
self.runner = get_optimized_runner(
|
|
os.path.join(self.download_path, self.model_file),
|
|
self.device,
|
|
model_type="yolov9",
|
|
)
|
|
|
|
def _preprocess_inputs(self, raw_inputs):
|
|
if isinstance(raw_inputs, list):
|
|
raise ValueError("License plate embedding does not support batch inputs.")
|
|
|
|
img = raw_inputs
|
|
height, width, channels = img.shape
|
|
|
|
# Resize maintaining aspect ratio
|
|
if width > height:
|
|
new_height = int(((height / width) * LPR_EMBEDDING_SIZE) // 4 * 4)
|
|
img = cv2.resize(img, (LPR_EMBEDDING_SIZE, new_height))
|
|
else:
|
|
new_width = int(((width / height) * LPR_EMBEDDING_SIZE) // 4 * 4)
|
|
img = cv2.resize(img, (new_width, LPR_EMBEDDING_SIZE))
|
|
|
|
# Get new dimensions after resize
|
|
og_h, og_w, channels = img.shape
|
|
|
|
# Create black square frame
|
|
frame = np.full(
|
|
(LPR_EMBEDDING_SIZE, LPR_EMBEDDING_SIZE, channels),
|
|
(0, 0, 0),
|
|
dtype=np.float32,
|
|
)
|
|
|
|
# Center the resized image in the square frame
|
|
x_center = (LPR_EMBEDDING_SIZE - og_w) // 2
|
|
y_center = (LPR_EMBEDDING_SIZE - og_h) // 2
|
|
frame[y_center : y_center + og_h, x_center : x_center + og_w] = img
|
|
|
|
# Normalize to 0-1
|
|
frame = frame / 255.0
|
|
|
|
# Convert from HWC to CHW format and add batch dimension
|
|
frame = np.transpose(frame, (2, 0, 1))
|
|
frame = np.expand_dims(frame, axis=0)
|
|
return [{"images": frame}]
|