mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-07-26 13:47:03 +02:00
Implement facenet tflite for small face recognition model (#17402)
This commit is contained in:
parent
0947bffeef
commit
36446ceded
@ -23,15 +23,15 @@ Frigate needs to first detect a `face` before it can recognize a face.
|
||||
|
||||
Frigate has support for two face recognition model types:
|
||||
|
||||
- **small**: Frigate will use CV2 Local Binary Pattern Face Recognizer to recognize faces, which runs locally on the CPU. This model is optimized for efficiency and is not as accurate.
|
||||
- **large**: Frigate will run a face embedding model, this model is optimized for accuracy. It is only recommended to be run when an integrated or dedicated GPU is available.
|
||||
- **small**: Frigate will run a FaceNet embedding model to recognize faces, which runs locally on the CPU. This model is optimized for efficiency and is not as accurate.
|
||||
- **large**: Frigate will run a large ArcFace embedding model that is optimized for accuracy. It is only recommended to be run when an integrated or dedicated GPU is available.
|
||||
|
||||
In both cases a lightweight face landmark detection model is also used to align faces before running the recognition model.
|
||||
|
||||
## Minimum System Requirements
|
||||
|
||||
The `small` model is optimized for efficiency and runs on the CPU, there are no significantly different system requirements.
|
||||
The `large` model is optimized for accuracy and an integrated or discrete GPU is highly recommended.
|
||||
The `small` model is optimized for efficiency and runs on the CPU, most CPUs should run the model efficiently.
|
||||
The `large` model is optimized for accuracy, an integrated or discrete GPU is highly recommended.
|
||||
|
||||
## Configuration
|
||||
|
||||
|
@ -10,7 +10,7 @@ from scipy import stats
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import MODEL_CACHE_DIR
|
||||
from frigate.embeddings.onnx.face_embedding import ArcfaceEmbedding
|
||||
from frigate.embeddings.onnx.face_embedding import ArcfaceEmbedding, FaceNetEmbedding
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -124,83 +124,140 @@ class FaceRecognizer(ABC):
|
||||
return 1.0
|
||||
|
||||
|
||||
class LBPHRecognizer(FaceRecognizer):
|
||||
def similarity_to_confidence(
|
||||
cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12
|
||||
):
|
||||
"""
|
||||
Default sigmoid function to map cosine similarity to confidence.
|
||||
|
||||
Args:
|
||||
cosine_similarity (float): The input cosine similarity.
|
||||
median (float): Assumed median of cosine similarity distribution.
|
||||
range_width (float): Assumed range of cosine similarity distribution (90th percentile - 10th percentile).
|
||||
slope_factor (float): Adjusts the steepness of the curve.
|
||||
|
||||
Returns:
|
||||
float: The confidence score.
|
||||
"""
|
||||
|
||||
# Calculate slope and bias
|
||||
slope = slope_factor / range_width
|
||||
bias = median
|
||||
|
||||
# Calculate confidence
|
||||
confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias)))
|
||||
return confidence
|
||||
|
||||
|
||||
class FaceNetRecognizer(FaceRecognizer):
|
||||
def __init__(self, config: FrigateConfig):
|
||||
super().__init__(config)
|
||||
self.label_map: dict[int, str] = {}
|
||||
self.recognizer: cv2.face.LBPHFaceRecognizer | None = None
|
||||
self.mean_embs: dict[int, np.ndarray] = {}
|
||||
self.face_embedder: FaceNetEmbedding = FaceNetEmbedding()
|
||||
self.model_builder_queue: queue.Queue | None = None
|
||||
|
||||
def clear(self) -> None:
|
||||
self.face_recognizer = None
|
||||
self.label_map = {}
|
||||
self.mean_embs = {}
|
||||
|
||||
def run_build_task(self) -> None:
|
||||
self.model_builder_queue = queue.Queue()
|
||||
|
||||
def build_model():
|
||||
face_embeddings_map: dict[str, list[np.ndarray]] = {}
|
||||
idx = 0
|
||||
|
||||
dir = "/media/frigate/clips/faces"
|
||||
for name in os.listdir(dir):
|
||||
if name == "train":
|
||||
continue
|
||||
|
||||
face_folder = os.path.join(dir, name)
|
||||
|
||||
if not os.path.isdir(face_folder):
|
||||
continue
|
||||
|
||||
face_embeddings_map[name] = []
|
||||
for image in os.listdir(face_folder):
|
||||
img = cv2.imread(os.path.join(face_folder, image))
|
||||
|
||||
if img is None:
|
||||
continue
|
||||
|
||||
img = self.align_face(img, img.shape[1], img.shape[0])
|
||||
emb = self.face_embedder([img])[0].squeeze()
|
||||
face_embeddings_map[name].append(emb)
|
||||
|
||||
idx += 1
|
||||
|
||||
self.model_builder_queue.put(face_embeddings_map)
|
||||
|
||||
thread = threading.Thread(target=build_model, daemon=True)
|
||||
thread.start()
|
||||
|
||||
def build(self):
|
||||
if not self.landmark_detector:
|
||||
self.init_landmark_detector()
|
||||
return None
|
||||
|
||||
labels = []
|
||||
faces = []
|
||||
idx = 0
|
||||
|
||||
dir = "/media/frigate/clips/faces"
|
||||
for name in os.listdir(dir):
|
||||
if name == "train":
|
||||
continue
|
||||
|
||||
face_folder = os.path.join(dir, name)
|
||||
|
||||
if not os.path.isdir(face_folder):
|
||||
continue
|
||||
|
||||
self.label_map[idx] = name
|
||||
for image in os.listdir(face_folder):
|
||||
img = cv2.imread(os.path.join(face_folder, image))
|
||||
|
||||
if img is None:
|
||||
continue
|
||||
|
||||
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||
img = self.align_face(img, img.shape[1], img.shape[0])
|
||||
faces.append(img)
|
||||
labels.append(idx)
|
||||
|
||||
idx += 1
|
||||
|
||||
if not faces:
|
||||
if self.model_builder_queue is not None:
|
||||
try:
|
||||
face_embeddings_map: dict[str, list[np.ndarray]] = (
|
||||
self.model_builder_queue.get(timeout=0.1)
|
||||
)
|
||||
self.model_builder_queue = None
|
||||
except queue.Empty:
|
||||
return
|
||||
else:
|
||||
self.run_build_task()
|
||||
return
|
||||
|
||||
self.recognizer: cv2.face.LBPHFaceRecognizer = (
|
||||
cv2.face.LBPHFaceRecognizer_create(radius=2, threshold=400)
|
||||
)
|
||||
self.recognizer.train(faces, np.array(labels))
|
||||
if not face_embeddings_map:
|
||||
return
|
||||
|
||||
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
|
||||
for name, embs in face_embeddings_map.items():
|
||||
if embs:
|
||||
self.mean_embs[name] = stats.trim_mean(embs, 0.15)
|
||||
|
||||
logger.debug("Finished building ArcFace model")
|
||||
|
||||
def classify(self, face_image):
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
if not self.label_map or not self.recognizer:
|
||||
if not self.mean_embs:
|
||||
self.build()
|
||||
|
||||
if not self.recognizer:
|
||||
if not self.mean_embs:
|
||||
return None
|
||||
|
||||
# face recognition is best run on grayscale images
|
||||
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# get blur factor before aligning face
|
||||
blur_factor = self.get_blur_factor(img)
|
||||
logger.debug(f"face detected with bluriness {blur_factor}")
|
||||
blur_factor = self.get_blur_factor(face_image)
|
||||
logger.debug(f"face detected with blurriness {blur_factor}")
|
||||
|
||||
# align face and run recognition
|
||||
img = self.align_face(img, img.shape[1], img.shape[0])
|
||||
index, distance = self.recognizer.predict(img)
|
||||
img = self.align_face(face_image, face_image.shape[1], face_image.shape[0])
|
||||
embedding = self.face_embedder([img])[0].squeeze()
|
||||
|
||||
if index == -1:
|
||||
return None
|
||||
score = 0
|
||||
label = ""
|
||||
|
||||
score = (1.0 - (distance / 1000)) * blur_factor
|
||||
return self.label_map[index], round(score, 2)
|
||||
for name, mean_emb in self.mean_embs.items():
|
||||
dot_product = np.dot(embedding, mean_emb)
|
||||
magnitude_A = np.linalg.norm(embedding)
|
||||
magnitude_B = np.linalg.norm(mean_emb)
|
||||
|
||||
cosine_similarity = dot_product / (magnitude_A * magnitude_B)
|
||||
confidence = similarity_to_confidence(
|
||||
cosine_similarity, median=0.5, range_width=0.6
|
||||
)
|
||||
|
||||
if confidence > score:
|
||||
score = confidence
|
||||
label = name
|
||||
|
||||
return label, round(score * blur_factor, 2)
|
||||
|
||||
|
||||
class ArcFaceRecognizer(FaceRecognizer):
|
||||
@ -274,30 +331,6 @@ class ArcFaceRecognizer(FaceRecognizer):
|
||||
|
||||
logger.debug("Finished building ArcFace model")
|
||||
|
||||
def similarity_to_confidence(
|
||||
self, cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12
|
||||
):
|
||||
"""
|
||||
Default sigmoid function to map cosine similarity to confidence.
|
||||
|
||||
Args:
|
||||
cosine_similarity (float): The input cosine similarity.
|
||||
median (float): Assumed median of cosine similarity distribution.
|
||||
range_width (float): Assumed range of cosine similarity distribution (90th percentile - 10th percentile).
|
||||
slope_factor (float): Adjusts the steepness of the curve.
|
||||
|
||||
Returns:
|
||||
float: The confidence score.
|
||||
"""
|
||||
|
||||
# Calculate slope and bias
|
||||
slope = slope_factor / range_width
|
||||
bias = median
|
||||
|
||||
# Calculate confidence
|
||||
confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias)))
|
||||
return confidence
|
||||
|
||||
def classify(self, face_image):
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
@ -312,7 +345,7 @@ class ArcFaceRecognizer(FaceRecognizer):
|
||||
|
||||
# get blur factor before aligning face
|
||||
blur_factor = self.get_blur_factor(face_image)
|
||||
logger.debug(f"face detected with bluriness {blur_factor}")
|
||||
logger.debug(f"face detected with blurriness {blur_factor}")
|
||||
|
||||
# align face and run recognition
|
||||
img = self.align_face(face_image, face_image.shape[1], face_image.shape[0])
|
||||
@ -327,7 +360,7 @@ class ArcFaceRecognizer(FaceRecognizer):
|
||||
magnitude_B = np.linalg.norm(mean_emb)
|
||||
|
||||
cosine_similarity = dot_product / (magnitude_A * magnitude_B)
|
||||
confidence = self.similarity_to_confidence(cosine_similarity)
|
||||
confidence = similarity_to_confidence(cosine_similarity)
|
||||
|
||||
if confidence > score:
|
||||
score = confidence
|
||||
|
@ -21,8 +21,8 @@ from frigate.config import FrigateConfig
|
||||
from frigate.const import FACE_DIR, MODEL_CACHE_DIR
|
||||
from frigate.data_processing.common.face.model import (
|
||||
ArcFaceRecognizer,
|
||||
FaceNetRecognizer,
|
||||
FaceRecognizer,
|
||||
LBPHRecognizer,
|
||||
)
|
||||
from frigate.util.image import area
|
||||
|
||||
@ -78,7 +78,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
self.label_map: dict[int, str] = {}
|
||||
|
||||
if self.face_config.model_size == "small":
|
||||
self.recognizer = LBPHRecognizer(self.config)
|
||||
self.recognizer = FaceNetRecognizer(self.config)
|
||||
else:
|
||||
self.recognizer = ArcFaceRecognizer(self.config)
|
||||
|
||||
@ -412,10 +412,6 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
|
||||
prominent_name = max(score_count)
|
||||
|
||||
# if a single name is not prominent in the history then we are not confident
|
||||
if score_count[prominent_name] / len(results_list) < 0.65:
|
||||
return "unknown", 0.0
|
||||
|
||||
return prominent_name, weighted_scores[prominent_name] / total_face_areas[
|
||||
prominent_name
|
||||
]
|
||||
|
@ -236,6 +236,10 @@ class EmbeddingsContext:
|
||||
if len(os.listdir(folder)) == 0:
|
||||
os.rmdir(folder)
|
||||
|
||||
self.requestor.send_data(
|
||||
EmbeddingsRequestEnum.clear_face_classifier.value, None
|
||||
)
|
||||
|
||||
def update_description(self, event_id: str, description: str) -> None:
|
||||
self.requestor.send_data(
|
||||
EmbeddingsRequestEnum.embed_description.value,
|
||||
|
@ -11,9 +11,105 @@ from frigate.util.downloader import ModelDownloader
|
||||
from .base_embedding import BaseEmbedding
|
||||
from .runner import ONNXModelRunner
|
||||
|
||||
try:
|
||||
from tflite_runtime.interpreter import Interpreter
|
||||
except ModuleNotFoundError:
|
||||
from tensorflow.lite.python.interpreter import Interpreter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
FACE_EMBEDDING_SIZE = 112
|
||||
ARCFACE_INPUT_SIZE = 112
|
||||
FACENET_INPUT_SIZE = 160
|
||||
|
||||
|
||||
class FaceNetEmbedding(BaseEmbedding):
|
||||
def __init__(
|
||||
self,
|
||||
device: str = "AUTO",
|
||||
):
|
||||
super().__init__(
|
||||
model_name="facedet",
|
||||
model_file="facenet.tflite",
|
||||
download_urls={
|
||||
"facenet.tflite": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facenet.tflite",
|
||||
},
|
||||
)
|
||||
self.device = device
|
||||
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
|
||||
self.tokenizer = None
|
||||
self.feature_extractor = None
|
||||
self.runner = None
|
||||
files_names = list(self.download_urls.keys())
|
||||
|
||||
if not all(
|
||||
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
|
||||
):
|
||||
logger.debug(f"starting model download for {self.model_name}")
|
||||
self.downloader = ModelDownloader(
|
||||
model_name=self.model_name,
|
||||
download_path=self.download_path,
|
||||
file_names=files_names,
|
||||
download_func=self._download_model,
|
||||
)
|
||||
self.downloader.ensure_model_files()
|
||||
else:
|
||||
self.downloader = None
|
||||
self._load_model_and_utils()
|
||||
logger.debug(f"models are already downloaded for {self.model_name}")
|
||||
|
||||
def _load_model_and_utils(self):
|
||||
if self.runner is None:
|
||||
if self.downloader:
|
||||
self.downloader.wait_for_download()
|
||||
|
||||
self.runner = Interpreter(
|
||||
model_path=os.path.join(MODEL_CACHE_DIR, "facedet/facenet.tflite"),
|
||||
num_threads=2,
|
||||
)
|
||||
self.runner.allocate_tensors()
|
||||
self.tensor_input_details = self.runner.get_input_details()
|
||||
self.tensor_output_details = self.runner.get_output_details()
|
||||
|
||||
def _preprocess_inputs(self, raw_inputs):
|
||||
pil = self._process_image(raw_inputs[0])
|
||||
|
||||
# handle images larger than input size
|
||||
width, height = pil.size
|
||||
if width != FACENET_INPUT_SIZE or height != FACENET_INPUT_SIZE:
|
||||
if width > height:
|
||||
new_height = int(((height / width) * FACENET_INPUT_SIZE) // 4 * 4)
|
||||
pil = pil.resize((FACENET_INPUT_SIZE, new_height))
|
||||
else:
|
||||
new_width = int(((width / height) * FACENET_INPUT_SIZE) // 4 * 4)
|
||||
pil = pil.resize((new_width, FACENET_INPUT_SIZE))
|
||||
|
||||
og = np.array(pil).astype(np.float32)
|
||||
|
||||
# Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE
|
||||
og_h, og_w, channels = og.shape
|
||||
frame = np.zeros(
|
||||
(FACENET_INPUT_SIZE, FACENET_INPUT_SIZE, channels), dtype=np.float32
|
||||
)
|
||||
|
||||
# compute center offset
|
||||
x_center = (FACENET_INPUT_SIZE - og_w) // 2
|
||||
y_center = (FACENET_INPUT_SIZE - og_h) // 2
|
||||
|
||||
# copy img image into center of result image
|
||||
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
|
||||
|
||||
# run facenet normalization
|
||||
frame = (frame / 127.5) - 1.0
|
||||
|
||||
frame = np.expand_dims(frame, axis=0)
|
||||
return frame
|
||||
|
||||
def __call__(self, inputs):
|
||||
self._load_model_and_utils()
|
||||
processed = self._preprocess_inputs(inputs)
|
||||
self.runner.set_tensor(self.tensor_input_details[0]["index"], processed)
|
||||
self.runner.invoke()
|
||||
return self.runner.get_tensor(self.tensor_output_details[0]["index"])
|
||||
|
||||
|
||||
class ArcfaceEmbedding(BaseEmbedding):
|
||||
@ -66,25 +162,25 @@ class ArcfaceEmbedding(BaseEmbedding):
|
||||
|
||||
# handle images larger than input size
|
||||
width, height = pil.size
|
||||
if width != FACE_EMBEDDING_SIZE or height != FACE_EMBEDDING_SIZE:
|
||||
if width != ARCFACE_INPUT_SIZE or height != ARCFACE_INPUT_SIZE:
|
||||
if width > height:
|
||||
new_height = int(((height / width) * FACE_EMBEDDING_SIZE) // 4 * 4)
|
||||
pil = pil.resize((FACE_EMBEDDING_SIZE, new_height))
|
||||
new_height = int(((height / width) * ARCFACE_INPUT_SIZE) // 4 * 4)
|
||||
pil = pil.resize((ARCFACE_INPUT_SIZE, new_height))
|
||||
else:
|
||||
new_width = int(((width / height) * FACE_EMBEDDING_SIZE) // 4 * 4)
|
||||
pil = pil.resize((new_width, FACE_EMBEDDING_SIZE))
|
||||
new_width = int(((width / height) * ARCFACE_INPUT_SIZE) // 4 * 4)
|
||||
pil = pil.resize((new_width, ARCFACE_INPUT_SIZE))
|
||||
|
||||
og = np.array(pil).astype(np.float32)
|
||||
|
||||
# Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE
|
||||
og_h, og_w, channels = og.shape
|
||||
frame = np.zeros(
|
||||
(FACE_EMBEDDING_SIZE, FACE_EMBEDDING_SIZE, channels), dtype=np.float32
|
||||
(ARCFACE_INPUT_SIZE, ARCFACE_INPUT_SIZE, channels), dtype=np.float32
|
||||
)
|
||||
|
||||
# compute center offset
|
||||
x_center = (FACE_EMBEDDING_SIZE - og_w) // 2
|
||||
y_center = (FACE_EMBEDDING_SIZE - og_h) // 2
|
||||
x_center = (ARCFACE_INPUT_SIZE - og_w) // 2
|
||||
y_center = (ARCFACE_INPUT_SIZE - og_h) // 2
|
||||
|
||||
# copy img image into center of result image
|
||||
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
|
||||
|
@ -113,11 +113,11 @@
|
||||
"desc": "The size of the model used for face recognition.",
|
||||
"small": {
|
||||
"title": "small",
|
||||
"desc": "Using <em>small</em> employs a Local Binary Pattern Histogram model via OpenCV that runs efficiently on most CPUs."
|
||||
"desc": "Using <em>small</em> employs a FaceNet face embedding model that runs efficiently on most CPUs."
|
||||
},
|
||||
"large": {
|
||||
"title": "large",
|
||||
"desc": "Using <em>large</em> employs an ArcFace Face embedding model and will automatically run on the GPU if applicable."
|
||||
"desc": "Using <em>large</em> employs an ArcFace face embedding model and will automatically run on the GPU if applicable."
|
||||
}
|
||||
}
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user