mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-02-05 00:15:51 +01:00
f513e3fb3d
* Add ability to view attempts * Improve UI * Cleanup * Correctly refresh ui when item is deleted * Select correct library by default * Add min score * Cleanup
270 lines
9.0 KiB
Python
270 lines
9.0 KiB
Python
"""Model Utils"""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any, Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import onnxruntime as ort
|
|
from playhouse.sqliteq import SqliteQueueDatabase
|
|
|
|
from frigate.config.semantic_search import FaceRecognitionConfig
|
|
|
|
try:
|
|
import openvino as ov
|
|
except ImportError:
|
|
# openvino is not included
|
|
pass
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
MIN_MATCHING_FACES = 2
|
|
|
|
|
|
def get_ort_providers(
|
|
force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
|
|
) -> tuple[list[str], list[dict[str, any]]]:
|
|
if force_cpu:
|
|
return (
|
|
["CPUExecutionProvider"],
|
|
[
|
|
{
|
|
"enable_cpu_mem_arena": False,
|
|
}
|
|
],
|
|
)
|
|
|
|
providers = []
|
|
options = []
|
|
|
|
for provider in ort.get_available_providers():
|
|
if provider == "CUDAExecutionProvider":
|
|
device_id = 0 if not device.isdigit() else int(device)
|
|
providers.append(provider)
|
|
options.append(
|
|
{
|
|
"arena_extend_strategy": "kSameAsRequested",
|
|
"device_id": device_id,
|
|
}
|
|
)
|
|
elif provider == "TensorrtExecutionProvider":
|
|
# TensorrtExecutionProvider uses too much memory without options to control it
|
|
# so it is not enabled by default
|
|
if device == "Tensorrt":
|
|
os.makedirs(
|
|
"/config/model_cache/tensorrt/ort/trt-engines", exist_ok=True
|
|
)
|
|
device_id = 0 if not device.isdigit() else int(device)
|
|
providers.append(provider)
|
|
options.append(
|
|
{
|
|
"device_id": device_id,
|
|
"trt_fp16_enable": requires_fp16
|
|
and os.environ.get("USE_FP_16", "True") != "False",
|
|
"trt_timing_cache_enable": True,
|
|
"trt_engine_cache_enable": True,
|
|
"trt_timing_cache_path": "/config/model_cache/tensorrt/ort",
|
|
"trt_engine_cache_path": "/config/model_cache/tensorrt/ort/trt-engines",
|
|
}
|
|
)
|
|
else:
|
|
continue
|
|
elif provider == "OpenVINOExecutionProvider":
|
|
os.makedirs("/config/model_cache/openvino/ort", exist_ok=True)
|
|
providers.append(provider)
|
|
options.append(
|
|
{
|
|
"arena_extend_strategy": "kSameAsRequested",
|
|
"cache_dir": "/config/model_cache/openvino/ort",
|
|
"device_type": device,
|
|
}
|
|
)
|
|
elif provider == "CPUExecutionProvider":
|
|
providers.append(provider)
|
|
options.append(
|
|
{
|
|
"enable_cpu_mem_arena": False,
|
|
}
|
|
)
|
|
else:
|
|
providers.append(provider)
|
|
options.append({})
|
|
|
|
return (providers, options)
|
|
|
|
|
|
class ONNXModelRunner:
|
|
"""Run onnx models optimally based on available hardware."""
|
|
|
|
def __init__(self, model_path: str, device: str, requires_fp16: bool = False):
|
|
self.model_path = model_path
|
|
self.ort: ort.InferenceSession = None
|
|
self.ov: ov.Core = None
|
|
providers, options = get_ort_providers(device == "CPU", device, requires_fp16)
|
|
self.interpreter = None
|
|
|
|
if "OpenVINOExecutionProvider" in providers:
|
|
try:
|
|
# use OpenVINO directly
|
|
self.type = "ov"
|
|
self.ov = ov.Core()
|
|
self.ov.set_property(
|
|
{ov.properties.cache_dir: "/config/model_cache/openvino"}
|
|
)
|
|
self.interpreter = self.ov.compile_model(
|
|
model=model_path, device_name=device
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"OpenVINO failed to build model, using CPU instead: {e}"
|
|
)
|
|
self.interpreter = None
|
|
|
|
# Use ONNXRuntime
|
|
if self.interpreter is None:
|
|
self.type = "ort"
|
|
self.ort = ort.InferenceSession(
|
|
model_path,
|
|
providers=providers,
|
|
provider_options=options,
|
|
)
|
|
|
|
def get_input_names(self) -> list[str]:
|
|
if self.type == "ov":
|
|
input_names = []
|
|
|
|
for input in self.interpreter.inputs:
|
|
input_names.extend(input.names)
|
|
|
|
return input_names
|
|
elif self.type == "ort":
|
|
return [input.name for input in self.ort.get_inputs()]
|
|
|
|
def run(self, input: dict[str, Any]) -> Any:
|
|
if self.type == "ov":
|
|
infer_request = self.interpreter.create_infer_request()
|
|
input_tensor = list(input.values())
|
|
|
|
if len(input_tensor) == 1:
|
|
input_tensor = ov.Tensor(array=input_tensor[0])
|
|
else:
|
|
input_tensor = ov.Tensor(array=input_tensor)
|
|
|
|
infer_request.infer(input_tensor)
|
|
return [infer_request.get_output_tensor().data]
|
|
elif self.type == "ort":
|
|
return self.ort.run(None, input)
|
|
|
|
|
|
class FaceClassificationModel:
|
|
def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase):
|
|
self.config = config
|
|
self.db = db
|
|
self.landmark_detector = cv2.face.createFacemarkLBF()
|
|
self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml")
|
|
self.recognizer: cv2.face.LBPHFaceRecognizer = (
|
|
cv2.face.LBPHFaceRecognizer_create(
|
|
radius=2, threshold=(1 - config.min_score) * 1000
|
|
)
|
|
)
|
|
self.label_map: dict[int, str] = {}
|
|
self.__build_classifier()
|
|
|
|
def __build_classifier(self) -> None:
|
|
labels = []
|
|
faces = []
|
|
|
|
dir = "/media/frigate/clips/faces"
|
|
for idx, name in enumerate(os.listdir(dir)):
|
|
if name == "debug":
|
|
continue
|
|
|
|
self.label_map[idx] = name
|
|
face_folder = os.path.join(dir, name)
|
|
for image in os.listdir(face_folder):
|
|
img = cv2.imread(os.path.join(face_folder, image))
|
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
img = self.__align_face(img, img.shape[1], img.shape[0])
|
|
faces.append(img)
|
|
labels.append(idx)
|
|
|
|
self.recognizer.train(faces, np.array(labels))
|
|
|
|
def __align_face(
|
|
self,
|
|
image: np.ndarray,
|
|
output_width: int,
|
|
output_height: int,
|
|
) -> np.ndarray:
|
|
_, lands = self.landmark_detector.fit(
|
|
image, np.array([(0, 0, image.shape[1], image.shape[0])])
|
|
)
|
|
landmarks = lands[0][0]
|
|
|
|
# get landmarks for eyes
|
|
leftEyePts = landmarks[42:48]
|
|
rightEyePts = landmarks[36:42]
|
|
|
|
# compute the center of mass for each eye
|
|
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
|
|
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
|
|
|
|
# compute the angle between the eye centroids
|
|
dY = rightEyeCenter[1] - leftEyeCenter[1]
|
|
dX = rightEyeCenter[0] - leftEyeCenter[0]
|
|
angle = np.degrees(np.arctan2(dY, dX)) - 180
|
|
|
|
# compute the desired right eye x-coordinate based on the
|
|
# desired x-coordinate of the left eye
|
|
desiredRightEyeX = 1.0 - 0.35
|
|
|
|
# determine the scale of the new resulting image by taking
|
|
# the ratio of the distance between eyes in the *current*
|
|
# image to the ratio of distance between eyes in the
|
|
# *desired* image
|
|
dist = np.sqrt((dX**2) + (dY**2))
|
|
desiredDist = desiredRightEyeX - 0.35
|
|
desiredDist *= output_width
|
|
scale = desiredDist / dist
|
|
|
|
# compute center (x, y)-coordinates (i.e., the median point)
|
|
# between the two eyes in the input image
|
|
# grab the rotation matrix for rotating and scaling the face
|
|
eyesCenter = (
|
|
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
|
|
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
|
|
)
|
|
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
|
|
|
|
# update the translation component of the matrix
|
|
tX = output_width * 0.5
|
|
tY = output_height * 0.35
|
|
M[0, 2] += tX - eyesCenter[0]
|
|
M[1, 2] += tY - eyesCenter[1]
|
|
|
|
# apply the affine transformation
|
|
return cv2.warpAffine(
|
|
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
|
|
)
|
|
|
|
def clear_classifier(self) -> None:
|
|
self.classifier = None
|
|
self.labeler = None
|
|
self.label_map = {}
|
|
|
|
def classify_face(self, face_image: np.ndarray) -> Optional[tuple[str, float]]:
|
|
if not self.label_map:
|
|
self.__build_classifier()
|
|
|
|
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
|
|
img = self.__align_face(img, img.shape[1], img.shape[0])
|
|
index, distance = self.recognizer.predict(img)
|
|
|
|
if index == -1:
|
|
return None
|
|
|
|
score = 1.0 - (distance / 1000)
|
|
return self.label_map[index], round(score, 2)
|