"""Facenet Embeddings.""" import logging import os import numpy as np from frigate.const import MODEL_CACHE_DIR from frigate.util.downloader import ModelDownloader from .base_embedding import BaseEmbedding from .runner import ONNXModelRunner try: from tflite_runtime.interpreter import Interpreter except ModuleNotFoundError: from tensorflow.lite.python.interpreter import Interpreter logger = logging.getLogger(__name__) ARCFACE_INPUT_SIZE = 112 FACENET_INPUT_SIZE = 160 class FaceNetEmbedding(BaseEmbedding): def __init__(self): super().__init__( model_name="facedet", model_file="facenet.tflite", download_urls={ "facenet.tflite": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facenet.tflite", }, ) self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name) self.tokenizer = None self.feature_extractor = None self.runner = None files_names = list(self.download_urls.keys()) if not all( os.path.exists(os.path.join(self.download_path, n)) for n in files_names ): logger.debug(f"starting model download for {self.model_name}") self.downloader = ModelDownloader( model_name=self.model_name, download_path=self.download_path, file_names=files_names, download_func=self._download_model, ) self.downloader.ensure_model_files() else: self.downloader = None self._load_model_and_utils() logger.debug(f"models are already downloaded for {self.model_name}") def _load_model_and_utils(self): if self.runner is None: if self.downloader: self.downloader.wait_for_download() self.runner = Interpreter( model_path=os.path.join(MODEL_CACHE_DIR, "facedet/facenet.tflite"), num_threads=2, ) self.runner.allocate_tensors() self.tensor_input_details = self.runner.get_input_details() self.tensor_output_details = self.runner.get_output_details() def _preprocess_inputs(self, raw_inputs): pil = self._process_image(raw_inputs[0]) # handle images larger than input size width, height = pil.size if width != FACENET_INPUT_SIZE or height != FACENET_INPUT_SIZE: if width > height: new_height = int(((height / width) * FACENET_INPUT_SIZE) // 4 * 4) pil = pil.resize((FACENET_INPUT_SIZE, new_height)) else: new_width = int(((width / height) * FACENET_INPUT_SIZE) // 4 * 4) pil = pil.resize((new_width, FACENET_INPUT_SIZE)) og = np.array(pil).astype(np.float32) # Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE og_h, og_w, channels = og.shape frame = np.zeros( (FACENET_INPUT_SIZE, FACENET_INPUT_SIZE, channels), dtype=np.float32 ) # compute center offset x_center = (FACENET_INPUT_SIZE - og_w) // 2 y_center = (FACENET_INPUT_SIZE - og_h) // 2 # copy img image into center of result image frame[y_center : y_center + og_h, x_center : x_center + og_w] = og # run facenet normalization frame = (frame / 127.5) - 1.0 frame = np.expand_dims(frame, axis=0) return frame def __call__(self, inputs): self._load_model_and_utils() processed = self._preprocess_inputs(inputs) self.runner.set_tensor(self.tensor_input_details[0]["index"], processed) self.runner.invoke() return self.runner.get_tensor(self.tensor_output_details[0]["index"]) class ArcfaceEmbedding(BaseEmbedding): def __init__(self): super().__init__( model_name="facedet", model_file="arcface.onnx", download_urls={ "arcface.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/arcface.onnx", }, ) self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name) self.tokenizer = None self.feature_extractor = None self.runner = None files_names = list(self.download_urls.keys()) if not all( os.path.exists(os.path.join(self.download_path, n)) for n in files_names ): logger.debug(f"starting model download for {self.model_name}") self.downloader = ModelDownloader( model_name=self.model_name, download_path=self.download_path, file_names=files_names, download_func=self._download_model, ) self.downloader.ensure_model_files() else: self.downloader = None self._load_model_and_utils() logger.debug(f"models are already downloaded for {self.model_name}") def _load_model_and_utils(self): if self.runner is None: if self.downloader: self.downloader.wait_for_download() self.runner = ONNXModelRunner( os.path.join(self.download_path, self.model_file), "GPU", ) def _preprocess_inputs(self, raw_inputs): pil = self._process_image(raw_inputs[0]) # handle images larger than input size width, height = pil.size if width != ARCFACE_INPUT_SIZE or height != ARCFACE_INPUT_SIZE: if width > height: new_height = int(((height / width) * ARCFACE_INPUT_SIZE) // 4 * 4) pil = pil.resize((ARCFACE_INPUT_SIZE, new_height)) else: new_width = int(((width / height) * ARCFACE_INPUT_SIZE) // 4 * 4) pil = pil.resize((new_width, ARCFACE_INPUT_SIZE)) og = np.array(pil).astype(np.float32) # Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE og_h, og_w, channels = og.shape frame = np.zeros( (ARCFACE_INPUT_SIZE, ARCFACE_INPUT_SIZE, channels), dtype=np.float32 ) # compute center offset x_center = (ARCFACE_INPUT_SIZE - og_w) // 2 y_center = (ARCFACE_INPUT_SIZE - og_h) // 2 # copy img image into center of result image frame[y_center : y_center + og_h, x_center : x_center + og_w] = og # run arcface normalization frame = (frame / 127.5) - 1.0 frame = np.transpose(frame, (2, 0, 1)) frame = np.expand_dims(frame, axis=0) return [{"data": frame}]