blakeblackshear.frigate/frigate/util/model.py

201 lines
6.5 KiB
Python
Raw Normal View History

"""Model Utils"""
import logging
import os
from typing import Any, Optional
2024-11-21 20:59:43 +01:00
import cv2
import numpy as np
import onnxruntime as ort
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.config.semantic_search import FaceRecognitionConfig
try:
import openvino as ov
except ImportError:
# openvino is not included
pass
logger = logging.getLogger(__name__)
MIN_MATCHING_FACES = 2
def get_ort_providers(
force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
) -> tuple[list[str], list[dict[str, any]]]:
if force_cpu:
2024-10-11 20:03:47 +02:00
return (
["CPUExecutionProvider"],
[
{
"enable_cpu_mem_arena": False,
2024-10-11 20:03:47 +02:00
}
],
)
2024-10-14 04:34:51 +02:00
providers = []
options = []
2024-10-14 04:34:51 +02:00
for provider in ort.get_available_providers():
if provider == "CUDAExecutionProvider":
device_id = 0 if not device.isdigit() else int(device)
2024-10-14 04:34:51 +02:00
providers.append(provider)
options.append(
{
"arena_extend_strategy": "kSameAsRequested",
"device_id": device_id,
2024-10-14 04:34:51 +02:00
}
)
elif provider == "TensorrtExecutionProvider":
# TensorrtExecutionProvider uses too much memory without options to control it
# so it is not enabled by default
if device == "Tensorrt":
os.makedirs(
"/config/model_cache/tensorrt/ort/trt-engines", exist_ok=True
)
device_id = 0 if not device.isdigit() else int(device)
providers.append(provider)
options.append(
{
"device_id": device_id,
"trt_fp16_enable": requires_fp16
and os.environ.get("USE_FP_16", "True") != "False",
"trt_timing_cache_enable": True,
"trt_engine_cache_enable": True,
"trt_timing_cache_path": "/config/model_cache/tensorrt/ort",
"trt_engine_cache_path": "/config/model_cache/tensorrt/ort/trt-engines",
}
)
else:
continue
elif provider == "OpenVINOExecutionProvider":
os.makedirs("/config/model_cache/openvino/ort", exist_ok=True)
2024-10-14 04:34:51 +02:00
providers.append(provider)
options.append(
{
2024-10-11 20:03:47 +02:00
"arena_extend_strategy": "kSameAsRequested",
"cache_dir": "/config/model_cache/openvino/ort",
"device_type": device,
}
)
2024-10-11 20:03:47 +02:00
elif provider == "CPUExecutionProvider":
2024-10-14 04:34:51 +02:00
providers.append(provider)
2024-10-11 20:03:47 +02:00
options.append(
{
"enable_cpu_mem_arena": False,
2024-10-11 20:03:47 +02:00
}
)
else:
2024-10-14 04:34:51 +02:00
providers.append(provider)
options.append({})
return (providers, options)
class ONNXModelRunner:
"""Run onnx models optimally based on available hardware."""
def __init__(self, model_path: str, device: str, requires_fp16: bool = False):
self.model_path = model_path
self.ort: ort.InferenceSession = None
self.ov: ov.Core = None
providers, options = get_ort_providers(device == "CPU", device, requires_fp16)
self.interpreter = None
if "OpenVINOExecutionProvider" in providers:
try:
# use OpenVINO directly
self.type = "ov"
self.ov = ov.Core()
self.ov.set_property(
{ov.properties.cache_dir: "/config/model_cache/openvino"}
)
self.interpreter = self.ov.compile_model(
model=model_path, device_name=device
)
except Exception as e:
logger.warning(
f"OpenVINO failed to build model, using CPU instead: {e}"
)
self.interpreter = None
# Use ONNXRuntime
if self.interpreter is None:
self.type = "ort"
self.ort = ort.InferenceSession(
model_path,
providers=providers,
provider_options=options,
)
def get_input_names(self) -> list[str]:
if self.type == "ov":
input_names = []
for input in self.interpreter.inputs:
input_names.extend(input.names)
return input_names
elif self.type == "ort":
return [input.name for input in self.ort.get_inputs()]
def run(self, input: dict[str, Any]) -> Any:
if self.type == "ov":
infer_request = self.interpreter.create_infer_request()
input_tensor = list(input.values())
if len(input_tensor) == 1:
input_tensor = ov.Tensor(array=input_tensor[0])
else:
input_tensor = ov.Tensor(array=input_tensor)
infer_request.infer(input_tensor)
return [infer_request.get_output_tensor().data]
elif self.type == "ort":
return self.ort.run(None, input)
class FaceClassificationModel:
def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase):
self.config = config
self.db = db
2024-11-22 01:08:09 +01:00
self.recognizer = cv2.face.LBPHFaceRecognizer_create(radius=4, threshold=(1 - config.threshold) * 1000)
2024-11-21 20:59:43 +01:00
self.label_map: dict[int, str] = {}
def __build_classifier(self) -> None:
2024-11-21 20:59:43 +01:00
labels = []
faces = []
dir = "/media/frigate/clips/faces"
for idx, name in enumerate(os.listdir(dir)):
self.label_map[idx] = name
face_folder = os.path.join(dir, name)
for image in os.listdir(face_folder):
img = cv2.imread(os.path.join(face_folder, image))
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
equ = cv2.equalizeHist(gray)
faces.append(equ)
labels.append(idx)
self.recognizer.train(faces, np.array(labels))
def clear_classifier(self) -> None:
self.classifier = None
self.labeler = None
2024-11-21 20:59:43 +01:00
def classify_face(self, face_image: np.ndarray) -> Optional[tuple[str, float]]:
if not self.label_map:
self.__build_classifier()
2024-11-21 20:59:43 +01:00
index, distance = self.recognizer.predict(cv2.equalizeHist(cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)))
2024-11-21 20:59:43 +01:00
if index == -1:
return None
2024-11-22 01:08:09 +01:00
score = 1.0 - (distance / 1000)
return self.label_map[index], round(score, 2)