blakeblackshear.frigate/frigate/util/model.py
Nicolas Mowen 5c6f169975 Face recognition logic improvements (#15679)
* Always initialize face model on startup

* Add ability to save face images for debugging

* Implement better face recognition reasonability
2025-01-18 21:34:09 -07:00

270 lines
9.0 KiB
Python

"""Model Utils"""
import logging
import os
from typing import Any, Optional
import cv2
import numpy as np
import onnxruntime as ort
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.config.semantic_search import FaceRecognitionConfig
try:
import openvino as ov
except ImportError:
# openvino is not included
pass
logger = logging.getLogger(__name__)
MIN_MATCHING_FACES = 2
def get_ort_providers(
force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
) -> tuple[list[str], list[dict[str, any]]]:
if force_cpu:
return (
["CPUExecutionProvider"],
[
{
"enable_cpu_mem_arena": False,
}
],
)
providers = []
options = []
for provider in ort.get_available_providers():
if provider == "CUDAExecutionProvider":
device_id = 0 if not device.isdigit() else int(device)
providers.append(provider)
options.append(
{
"arena_extend_strategy": "kSameAsRequested",
"device_id": device_id,
}
)
elif provider == "TensorrtExecutionProvider":
# TensorrtExecutionProvider uses too much memory without options to control it
# so it is not enabled by default
if device == "Tensorrt":
os.makedirs(
"/config/model_cache/tensorrt/ort/trt-engines", exist_ok=True
)
device_id = 0 if not device.isdigit() else int(device)
providers.append(provider)
options.append(
{
"device_id": device_id,
"trt_fp16_enable": requires_fp16
and os.environ.get("USE_FP_16", "True") != "False",
"trt_timing_cache_enable": True,
"trt_engine_cache_enable": True,
"trt_timing_cache_path": "/config/model_cache/tensorrt/ort",
"trt_engine_cache_path": "/config/model_cache/tensorrt/ort/trt-engines",
}
)
else:
continue
elif provider == "OpenVINOExecutionProvider":
os.makedirs("/config/model_cache/openvino/ort", exist_ok=True)
providers.append(provider)
options.append(
{
"arena_extend_strategy": "kSameAsRequested",
"cache_dir": "/config/model_cache/openvino/ort",
"device_type": device,
}
)
elif provider == "CPUExecutionProvider":
providers.append(provider)
options.append(
{
"enable_cpu_mem_arena": False,
}
)
else:
providers.append(provider)
options.append({})
return (providers, options)
class ONNXModelRunner:
"""Run onnx models optimally based on available hardware."""
def __init__(self, model_path: str, device: str, requires_fp16: bool = False):
self.model_path = model_path
self.ort: ort.InferenceSession = None
self.ov: ov.Core = None
providers, options = get_ort_providers(device == "CPU", device, requires_fp16)
self.interpreter = None
if "OpenVINOExecutionProvider" in providers:
try:
# use OpenVINO directly
self.type = "ov"
self.ov = ov.Core()
self.ov.set_property(
{ov.properties.cache_dir: "/config/model_cache/openvino"}
)
self.interpreter = self.ov.compile_model(
model=model_path, device_name=device
)
except Exception as e:
logger.warning(
f"OpenVINO failed to build model, using CPU instead: {e}"
)
self.interpreter = None
# Use ONNXRuntime
if self.interpreter is None:
self.type = "ort"
self.ort = ort.InferenceSession(
model_path,
providers=providers,
provider_options=options,
)
def get_input_names(self) -> list[str]:
if self.type == "ov":
input_names = []
for input in self.interpreter.inputs:
input_names.extend(input.names)
return input_names
elif self.type == "ort":
return [input.name for input in self.ort.get_inputs()]
def run(self, input: dict[str, Any]) -> Any:
if self.type == "ov":
infer_request = self.interpreter.create_infer_request()
input_tensor = list(input.values())
if len(input_tensor) == 1:
input_tensor = ov.Tensor(array=input_tensor[0])
else:
input_tensor = ov.Tensor(array=input_tensor)
infer_request.infer(input_tensor)
return [infer_request.get_output_tensor().data]
elif self.type == "ort":
return self.ort.run(None, input)
class FaceClassificationModel:
def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase):
self.config = config
self.db = db
self.landmark_detector = cv2.face.createFacemarkLBF()
self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml")
self.recognizer: cv2.face.LBPHFaceRecognizer = (
cv2.face.LBPHFaceRecognizer_create(
radius=2, threshold=(1 - config.threshold) * 1000
)
)
self.label_map: dict[int, str] = {}
self.__build_classifier()
def __build_classifier(self) -> None:
labels = []
faces = []
dir = "/media/frigate/clips/faces"
for idx, name in enumerate(os.listdir(dir)):
if name == "debug":
continue
self.label_map[idx] = name
face_folder = os.path.join(dir, name)
for image in os.listdir(face_folder):
img = cv2.imread(os.path.join(face_folder, image))
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
faces.append(img)
labels.append(idx)
self.recognizer.train(faces, np.array(labels))
def __align_face(
self,
image: np.ndarray,
output_width: int,
output_height: int,
) -> np.ndarray:
_, lands = self.landmark_detector.fit(
image, np.array([(0, 0, image.shape[1], image.shape[0])])
)
landmarks = lands[0][0]
# get landmarks for eyes
leftEyePts = landmarks[42:48]
rightEyePts = landmarks[36:42]
# compute the center of mass for each eye
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
# compute the angle between the eye centroids
dY = rightEyeCenter[1] - leftEyeCenter[1]
dX = rightEyeCenter[0] - leftEyeCenter[0]
angle = np.degrees(np.arctan2(dY, dX)) - 180
# compute the desired right eye x-coordinate based on the
# desired x-coordinate of the left eye
desiredRightEyeX = 1.0 - 0.35
# determine the scale of the new resulting image by taking
# the ratio of the distance between eyes in the *current*
# image to the ratio of distance between eyes in the
# *desired* image
dist = np.sqrt((dX**2) + (dY**2))
desiredDist = desiredRightEyeX - 0.35
desiredDist *= output_width
scale = desiredDist / dist
# compute center (x, y)-coordinates (i.e., the median point)
# between the two eyes in the input image
# grab the rotation matrix for rotating and scaling the face
eyesCenter = (
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
)
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
# update the translation component of the matrix
tX = output_width * 0.5
tY = output_height * 0.35
M[0, 2] += tX - eyesCenter[0]
M[1, 2] += tY - eyesCenter[1]
# apply the affine transformation
return cv2.warpAffine(
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
)
def clear_classifier(self) -> None:
self.classifier = None
self.labeler = None
self.label_map = {}
def classify_face(self, face_image: np.ndarray) -> Optional[tuple[str, float]]:
if not self.label_map:
self.__build_classifier()
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
index, distance = self.recognizer.predict(img)
if index == -1:
return None
score = 1.0 - (distance / 1000)
return self.label_map[index], round(score, 2)