"""Model Utils""" import logging import os from typing import Any import cv2 import numpy as np import onnxruntime as ort from playhouse.sqliteq import SqliteQueueDatabase from frigate.config.semantic_search import FaceRecognitionConfig from frigate.const import MODEL_CACHE_DIR try: import openvino as ov except ImportError: # openvino is not included pass logger = logging.getLogger(__name__) MIN_MATCHING_FACES = 2 def get_ort_providers( force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False ) -> tuple[list[str], list[dict[str, any]]]: if force_cpu: return ( ["CPUExecutionProvider"], [ { "enable_cpu_mem_arena": False, } ], ) providers = [] options = [] for provider in ort.get_available_providers(): if provider == "CUDAExecutionProvider": device_id = 0 if not device.isdigit() else int(device) providers.append(provider) options.append( { "arena_extend_strategy": "kSameAsRequested", "device_id": device_id, } ) elif provider == "TensorrtExecutionProvider": # TensorrtExecutionProvider uses too much memory without options to control it # so it is not enabled by default if device == "Tensorrt": os.makedirs( "/config/model_cache/tensorrt/ort/trt-engines", exist_ok=True ) device_id = 0 if not device.isdigit() else int(device) providers.append(provider) options.append( { "device_id": device_id, "trt_fp16_enable": requires_fp16 and os.environ.get("USE_FP_16", "True") != "False", "trt_timing_cache_enable": True, "trt_engine_cache_enable": True, "trt_timing_cache_path": "/config/model_cache/tensorrt/ort", "trt_engine_cache_path": "/config/model_cache/tensorrt/ort/trt-engines", } ) else: continue elif provider == "OpenVINOExecutionProvider": os.makedirs("/config/model_cache/openvino/ort", exist_ok=True) providers.append(provider) options.append( { "arena_extend_strategy": "kSameAsRequested", "cache_dir": "/config/model_cache/openvino/ort", "device_type": device, } ) elif provider == "CPUExecutionProvider": providers.append(provider) options.append( { "enable_cpu_mem_arena": False, } ) else: providers.append(provider) options.append({}) return (providers, options) class ONNXModelRunner: """Run onnx models optimally based on available hardware.""" def __init__(self, model_path: str, device: str, requires_fp16: bool = False): self.model_path = model_path self.ort: ort.InferenceSession = None self.ov: ov.Core = None providers, options = get_ort_providers(device == "CPU", device, requires_fp16) self.interpreter = None if "OpenVINOExecutionProvider" in providers: try: # use OpenVINO directly self.type = "ov" self.ov = ov.Core() self.ov.set_property( {ov.properties.cache_dir: "/config/model_cache/openvino"} ) self.interpreter = self.ov.compile_model( model=model_path, device_name=device ) except Exception as e: logger.warning( f"OpenVINO failed to build model, using CPU instead: {e}" ) self.interpreter = None # Use ONNXRuntime if self.interpreter is None: self.type = "ort" self.ort = ort.InferenceSession( model_path, providers=providers, provider_options=options, ) def get_input_names(self) -> list[str]: if self.type == "ov": input_names = [] for input in self.interpreter.inputs: input_names.extend(input.names) return input_names elif self.type == "ort": return [input.name for input in self.ort.get_inputs()] def run(self, input: dict[str, Any]) -> Any: if self.type == "ov": infer_request = self.interpreter.create_infer_request() input_tensor = list(input.values()) if len(input_tensor) == 1: input_tensor = ov.Tensor(array=input_tensor[0]) else: input_tensor = ov.Tensor(array=input_tensor) infer_request.infer(input_tensor) return [infer_request.get_output_tensor().data] elif self.type == "ort": return self.ort.run(None, input) class FaceClassificationModel: def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase): self.config = config self.db = db self.face_detector: cv2.FaceDetectorYN = None self.landmark_detector: cv2.face.FacemarkLBF = None self.face_recognizer: cv2.face.LBPHFaceRecognizer = None download_path = os.path.join(MODEL_CACHE_DIR, "facedet") self.model_files = { "facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx", "landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml", } if not all( os.path.exists(os.path.join(download_path, n)) for n in self.model_files.keys() ): # conditionally import ModelDownloader from frigate.util.downloader import ModelDownloader self.downloader = ModelDownloader( model_name="facedet", download_path=download_path, file_names=self.model_files.keys(), download_func=self.__download_models, complete_func=self.__build_detector, ) self.downloader.ensure_model_files() else: self.__build_detector() self.label_map: dict[int, str] = {} self.__build_classifier() def __download_models(self, path: str) -> None: try: file_name = os.path.basename(path) # conditionally import ModelDownloader from frigate.util.downloader import ModelDownloader ModelDownloader.download_from_url(self.model_files[file_name], path) except Exception as e: logger.error(f"Failed to download {path}: {e}") def __build_detector(self) -> None: self.face_detector = cv2.FaceDetectorYN.create( "/config/model_cache/facedet/facedet.onnx", config="", input_size=(320, 320), score_threshold=0.8, nms_threshold=0.3, ) self.landmark_detector = cv2.face.createFacemarkLBF() self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml") def __build_classifier(self) -> None: if not self.landmark_detector: return None labels = [] faces = [] dir = "/media/frigate/clips/faces" for idx, name in enumerate(os.listdir(dir)): if name == "train": continue face_folder = os.path.join(dir, name) if not os.path.isdir(face_folder): continue self.label_map[idx] = name for image in os.listdir(face_folder): img = cv2.imread(os.path.join(face_folder, image)) if img is None: continue img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) img = self.__align_face(img, img.shape[1], img.shape[0]) faces.append(img) labels.append(idx) self.recognizer: cv2.face.LBPHFaceRecognizer = ( cv2.face.LBPHFaceRecognizer_create( radius=2, threshold=(1 - self.config.min_score) * 1000 ) ) self.recognizer.train(faces, np.array(labels)) def __align_face( self, image: np.ndarray, output_width: int, output_height: int, ) -> np.ndarray: _, lands = self.landmark_detector.fit( image, np.array([(0, 0, image.shape[1], image.shape[0])]) ) landmarks = lands[0][0] # get landmarks for eyes leftEyePts = landmarks[42:48] rightEyePts = landmarks[36:42] # compute the center of mass for each eye leftEyeCenter = leftEyePts.mean(axis=0).astype("int") rightEyeCenter = rightEyePts.mean(axis=0).astype("int") # compute the angle between the eye centroids dY = rightEyeCenter[1] - leftEyeCenter[1] dX = rightEyeCenter[0] - leftEyeCenter[0] angle = np.degrees(np.arctan2(dY, dX)) - 180 # compute the desired right eye x-coordinate based on the # desired x-coordinate of the left eye desiredRightEyeX = 1.0 - 0.35 # determine the scale of the new resulting image by taking # the ratio of the distance between eyes in the *current* # image to the ratio of distance between eyes in the # *desired* image dist = np.sqrt((dX**2) + (dY**2)) desiredDist = desiredRightEyeX - 0.35 desiredDist *= output_width scale = desiredDist / dist # compute center (x, y)-coordinates (i.e., the median point) # between the two eyes in the input image # grab the rotation matrix for rotating and scaling the face eyesCenter = ( int((leftEyeCenter[0] + rightEyeCenter[0]) // 2), int((leftEyeCenter[1] + rightEyeCenter[1]) // 2), ) M = cv2.getRotationMatrix2D(eyesCenter, angle, scale) # update the translation component of the matrix tX = output_width * 0.5 tY = output_height * 0.35 M[0, 2] += tX - eyesCenter[0] M[1, 2] += tY - eyesCenter[1] # apply the affine transformation return cv2.warpAffine( image, M, (output_width, output_height), flags=cv2.INTER_CUBIC ) def clear_classifier(self) -> None: self.classifier = None self.labeler = None self.label_map = {} def detect_faces(self, input: np.ndarray) -> tuple[int, cv2.typing.MatLike] | None: if not self.face_detector: return None self.face_detector.setInputSize((input.shape[1], input.shape[0])) return self.face_detector.detect(input) def classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None: if not self.landmark_detector: return None if not self.label_map: self.__build_classifier() img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY) img = self.__align_face(img, img.shape[1], img.shape[0]) index, distance = self.recognizer.predict(img) if index == -1: return None score = 1.0 - (distance / 1000) return self.label_map[index], round(score, 2)