mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-03-07 02:18:07 +01:00
Refactor face recognition (#17368)
* Refactor face recognition to allow for running lbph or embedding * Cleanup * Use weighted average for faces * Set correct url * Cleanup * Update docs * Update docs * Use scipy trimmed mean * Normalize * Handle color and gray landmark detection * Upgrade to new arcface model * Implement sigmoid function * Rename * Rename to arcface * Fix * Add face recognition model size to ui config * Update toast
This commit is contained in:
308
frigate/data_processing/common/face/model.py
Normal file
308
frigate/data_processing/common/face/model.py
Normal file
@@ -0,0 +1,308 @@
|
||||
import logging
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from scipy import stats
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import MODEL_CACHE_DIR
|
||||
from frigate.embeddings.onnx.facenet import ArcfaceEmbedding
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FaceRecognizer(ABC):
|
||||
"""Face recognition runner."""
|
||||
|
||||
def __init__(self, config: FrigateConfig) -> None:
|
||||
self.config = config
|
||||
self.landmark_detector = cv2.face.createFacemarkLBF()
|
||||
self.landmark_detector.loadModel(
|
||||
os.path.join(MODEL_CACHE_DIR, "facedet/landmarkdet.yaml")
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def build(self) -> None:
|
||||
"""Build face recognition model."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def clear(self) -> None:
|
||||
"""Clear current built model."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
|
||||
pass
|
||||
|
||||
def align_face(
|
||||
self,
|
||||
image: np.ndarray,
|
||||
output_width: int,
|
||||
output_height: int,
|
||||
) -> np.ndarray:
|
||||
# landmark is run on grayscale images
|
||||
|
||||
if image.ndim == 3:
|
||||
land_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
land_image = image
|
||||
|
||||
_, lands = self.landmark_detector.fit(
|
||||
land_image, np.array([(0, 0, land_image.shape[1], land_image.shape[0])])
|
||||
)
|
||||
landmarks: np.ndarray = lands[0][0]
|
||||
|
||||
# get landmarks for eyes
|
||||
leftEyePts = landmarks[42:48]
|
||||
rightEyePts = landmarks[36:42]
|
||||
|
||||
# compute the center of mass for each eye
|
||||
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
|
||||
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
|
||||
|
||||
# compute the angle between the eye centroids
|
||||
dY = rightEyeCenter[1] - leftEyeCenter[1]
|
||||
dX = rightEyeCenter[0] - leftEyeCenter[0]
|
||||
angle = np.degrees(np.arctan2(dY, dX)) - 180
|
||||
|
||||
# compute the desired right eye x-coordinate based on the
|
||||
# desired x-coordinate of the left eye
|
||||
desiredRightEyeX = 1.0 - 0.35
|
||||
|
||||
# determine the scale of the new resulting image by taking
|
||||
# the ratio of the distance between eyes in the *current*
|
||||
# image to the ratio of distance between eyes in the
|
||||
# *desired* image
|
||||
dist = np.sqrt((dX**2) + (dY**2))
|
||||
desiredDist = desiredRightEyeX - 0.35
|
||||
desiredDist *= output_width
|
||||
scale = desiredDist / dist
|
||||
|
||||
# compute center (x, y)-coordinates (i.e., the median point)
|
||||
# between the two eyes in the input image
|
||||
# grab the rotation matrix for rotating and scaling the face
|
||||
eyesCenter = (
|
||||
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
|
||||
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
|
||||
)
|
||||
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
|
||||
|
||||
# update the translation component of the matrix
|
||||
tX = output_width * 0.5
|
||||
tY = output_height * 0.35
|
||||
M[0, 2] += tX - eyesCenter[0]
|
||||
M[1, 2] += tY - eyesCenter[1]
|
||||
|
||||
# apply the affine transformation
|
||||
return cv2.warpAffine(
|
||||
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
|
||||
)
|
||||
|
||||
def get_blur_factor(self, input: np.ndarray) -> float:
|
||||
"""Calculates the factor for the confidence based on the blur of the image."""
|
||||
if not self.config.face_recognition.blur_confidence_filter:
|
||||
return 1.0
|
||||
|
||||
variance = cv2.Laplacian(input, cv2.CV_64F).var()
|
||||
|
||||
if variance < 60: # image is very blurry
|
||||
return 0.96
|
||||
elif variance < 70: # image moderately blurry
|
||||
return 0.98
|
||||
elif variance < 80: # image is slightly blurry
|
||||
return 0.99
|
||||
else:
|
||||
return 1.0
|
||||
|
||||
|
||||
class LBPHRecognizer(FaceRecognizer):
|
||||
def __init__(self, config: FrigateConfig):
|
||||
super().__init__(config)
|
||||
self.label_map: dict[int, str] = {}
|
||||
self.recognizer: cv2.face.LBPHFaceRecognizer | None = None
|
||||
|
||||
def clear(self) -> None:
|
||||
self.face_recognizer = None
|
||||
self.label_map = {}
|
||||
|
||||
def build(self):
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
labels = []
|
||||
faces = []
|
||||
idx = 0
|
||||
|
||||
dir = "/media/frigate/clips/faces"
|
||||
for name in os.listdir(dir):
|
||||
if name == "train":
|
||||
continue
|
||||
|
||||
face_folder = os.path.join(dir, name)
|
||||
|
||||
if not os.path.isdir(face_folder):
|
||||
continue
|
||||
|
||||
self.label_map[idx] = name
|
||||
for image in os.listdir(face_folder):
|
||||
img = cv2.imread(os.path.join(face_folder, image))
|
||||
|
||||
if img is None:
|
||||
continue
|
||||
|
||||
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||
img = self.align_face(img, img.shape[1], img.shape[0])
|
||||
faces.append(img)
|
||||
labels.append(idx)
|
||||
|
||||
idx += 1
|
||||
|
||||
if not faces:
|
||||
return
|
||||
|
||||
self.recognizer: cv2.face.LBPHFaceRecognizer = (
|
||||
cv2.face.LBPHFaceRecognizer_create(
|
||||
radius=2, threshold=(1 - self.config.face_recognition.min_score) * 1000
|
||||
)
|
||||
)
|
||||
self.recognizer.train(faces, np.array(labels))
|
||||
|
||||
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
if not self.label_map or not self.recognizer:
|
||||
self.build()
|
||||
|
||||
if not self.recognizer:
|
||||
return None
|
||||
|
||||
# face recognition is best run on grayscale images
|
||||
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# get blur factor before aligning face
|
||||
blur_factor = self.get_blur_factor(img)
|
||||
logger.debug(f"face detected with bluriness {blur_factor}")
|
||||
|
||||
# align face and run recognition
|
||||
img = self.align_face(img, img.shape[1], img.shape[0])
|
||||
index, distance = self.recognizer.predict(img)
|
||||
|
||||
if index == -1:
|
||||
return None
|
||||
|
||||
score = (1.0 - (distance / 1000)) * blur_factor
|
||||
return self.label_map[index], round(score, 2)
|
||||
|
||||
|
||||
class ArcFaceRecognizer(FaceRecognizer):
|
||||
def __init__(self, config: FrigateConfig):
|
||||
super().__init__(config)
|
||||
self.mean_embs: dict[int, np.ndarray] = {}
|
||||
self.face_embedder: ArcfaceEmbedding = ArcfaceEmbedding()
|
||||
|
||||
def clear(self) -> None:
|
||||
self.mean_embs = {}
|
||||
|
||||
def build(self):
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
face_embeddings_map: dict[str, list[np.ndarray]] = {}
|
||||
idx = 0
|
||||
|
||||
dir = "/media/frigate/clips/faces"
|
||||
for name in os.listdir(dir):
|
||||
if name == "train":
|
||||
continue
|
||||
|
||||
face_folder = os.path.join(dir, name)
|
||||
|
||||
if not os.path.isdir(face_folder):
|
||||
continue
|
||||
|
||||
face_embeddings_map[name] = []
|
||||
for image in os.listdir(face_folder):
|
||||
img = cv2.imread(os.path.join(face_folder, image))
|
||||
|
||||
if img is None:
|
||||
continue
|
||||
|
||||
img = self.align_face(img, img.shape[1], img.shape[0])
|
||||
emb = self.face_embedder([img])[0].squeeze()
|
||||
face_embeddings_map[name].append(emb)
|
||||
|
||||
idx += 1
|
||||
|
||||
if not face_embeddings_map:
|
||||
return
|
||||
|
||||
for name, embs in face_embeddings_map.items():
|
||||
self.mean_embs[name] = stats.trim_mean(embs, 0.15)
|
||||
|
||||
def similarity_to_confidence(
|
||||
self, cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12
|
||||
):
|
||||
"""
|
||||
Default sigmoid function to map cosine similarity to confidence.
|
||||
|
||||
Args:
|
||||
cosine_similarity (float): The input cosine similarity.
|
||||
median (float): Assumed median of cosine similarity distribution.
|
||||
range_width (float): Assumed range of cosine similarity distribution (90th percentile - 10th percentile).
|
||||
slope_factor (float): Adjusts the steepness of the curve.
|
||||
|
||||
Returns:
|
||||
float: The confidence score.
|
||||
"""
|
||||
|
||||
# Calculate slope and bias
|
||||
slope = slope_factor / range_width
|
||||
bias = median
|
||||
|
||||
# Calculate confidence
|
||||
confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias)))
|
||||
return confidence
|
||||
|
||||
def classify(self, face_image):
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
if not self.mean_embs:
|
||||
self.build()
|
||||
|
||||
if not self.mean_embs:
|
||||
return None
|
||||
|
||||
# face recognition is best run on grayscale images
|
||||
|
||||
# get blur factor before aligning face
|
||||
blur_factor = self.get_blur_factor(face_image)
|
||||
logger.debug(f"face detected with bluriness {blur_factor}")
|
||||
|
||||
# align face and run recognition
|
||||
img = self.align_face(face_image, face_image.shape[1], face_image.shape[0])
|
||||
embedding = self.face_embedder([img])[0].squeeze()
|
||||
|
||||
score = 0
|
||||
label = ""
|
||||
|
||||
for name, mean_emb in self.mean_embs.items():
|
||||
dot_product = np.dot(embedding, mean_emb)
|
||||
magnitude_A = np.linalg.norm(embedding)
|
||||
magnitude_B = np.linalg.norm(mean_emb)
|
||||
|
||||
cosine_similarity = dot_product / (magnitude_A * magnitude_B)
|
||||
confidence = self.similarity_to_confidence(cosine_similarity)
|
||||
|
||||
if cosine_similarity > score:
|
||||
score = confidence
|
||||
label = name
|
||||
|
||||
if score < self.config.face_recognition.min_score:
|
||||
return None
|
||||
|
||||
return label, round(score * blur_factor, 2)
|
||||
@@ -19,6 +19,11 @@ from frigate.comms.event_metadata_updater import (
|
||||
)
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import FACE_DIR, MODEL_CACHE_DIR
|
||||
from frigate.data_processing.common.face.model import (
|
||||
ArcFaceRecognizer,
|
||||
FaceRecognizer,
|
||||
LBPHRecognizer,
|
||||
)
|
||||
from frigate.util.image import area
|
||||
|
||||
from ..types import DataProcessorMetrics
|
||||
@@ -31,6 +36,36 @@ MAX_DETECTION_HEIGHT = 1080
|
||||
MIN_MATCHING_FACES = 2
|
||||
|
||||
|
||||
def weighted_average_by_area(results_list: list[tuple[str, float, int]]):
|
||||
if len(results_list) < 3:
|
||||
return "unknown", 0.0
|
||||
|
||||
score_count = {}
|
||||
weighted_scores = {}
|
||||
total_face_areas = {}
|
||||
|
||||
for name, score, face_area in results_list:
|
||||
if name not in weighted_scores:
|
||||
score_count[name] = 1
|
||||
weighted_scores[name] = 0.0
|
||||
total_face_areas[name] = 0.0
|
||||
else:
|
||||
score_count[name] += 1
|
||||
|
||||
weighted_scores[name] += score * face_area
|
||||
total_face_areas[name] += face_area
|
||||
|
||||
prominent_name = max(score_count)
|
||||
|
||||
# if a single name is not prominent in the history then we are not confident
|
||||
if score_count[prominent_name] / len(results_list) < 0.65:
|
||||
return "unknown", 0.0
|
||||
|
||||
return prominent_name, weighted_scores[prominent_name] / total_face_areas[
|
||||
prominent_name
|
||||
]
|
||||
|
||||
|
||||
class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -42,10 +77,9 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
self.face_config = config.face_recognition
|
||||
self.sub_label_publisher = sub_label_publisher
|
||||
self.face_detector: cv2.FaceDetectorYN = None
|
||||
self.landmark_detector: cv2.face.FacemarkLBF = None
|
||||
self.recognizer: cv2.face.LBPHFaceRecognizer = None
|
||||
self.requires_face_detection = "face" not in self.config.objects.all_objects
|
||||
self.detected_faces: dict[str, float] = {}
|
||||
self.person_face_history: dict[str, list[tuple[str, float, int]]] = {}
|
||||
self.recognizer: FaceRecognizer | None = None
|
||||
|
||||
download_path = os.path.join(MODEL_CACHE_DIR, "facedet")
|
||||
self.model_files = {
|
||||
@@ -72,7 +106,13 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
self.__build_detector()
|
||||
|
||||
self.label_map: dict[int, str] = {}
|
||||
self.__build_classifier()
|
||||
|
||||
if self.face_config.model_size == "small":
|
||||
self.recognizer = LBPHRecognizer(self.config)
|
||||
else:
|
||||
self.recognizer = ArcFaceRecognizer(self.config)
|
||||
|
||||
self.recognizer.build()
|
||||
|
||||
def __download_models(self, path: str) -> None:
|
||||
try:
|
||||
@@ -92,126 +132,6 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
score_threshold=0.5,
|
||||
nms_threshold=0.3,
|
||||
)
|
||||
self.landmark_detector = cv2.face.createFacemarkLBF()
|
||||
self.landmark_detector.loadModel(
|
||||
os.path.join(MODEL_CACHE_DIR, "facedet/landmarkdet.yaml")
|
||||
)
|
||||
|
||||
def __build_classifier(self) -> None:
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
labels = []
|
||||
faces = []
|
||||
|
||||
dir = "/media/frigate/clips/faces"
|
||||
for idx, name in enumerate(os.listdir(dir)):
|
||||
if name == "train":
|
||||
continue
|
||||
|
||||
face_folder = os.path.join(dir, name)
|
||||
|
||||
if not os.path.isdir(face_folder):
|
||||
continue
|
||||
|
||||
self.label_map[idx] = name
|
||||
for image in os.listdir(face_folder):
|
||||
img = cv2.imread(os.path.join(face_folder, image))
|
||||
|
||||
if img is None:
|
||||
continue
|
||||
|
||||
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||
img = self.__align_face(img, img.shape[1], img.shape[0])
|
||||
faces.append(img)
|
||||
labels.append(idx)
|
||||
|
||||
if not faces:
|
||||
return
|
||||
|
||||
self.recognizer: cv2.face.LBPHFaceRecognizer = (
|
||||
cv2.face.LBPHFaceRecognizer_create(
|
||||
radius=2, threshold=(1 - self.face_config.min_score) * 1000
|
||||
)
|
||||
)
|
||||
self.recognizer.train(faces, np.array(labels))
|
||||
|
||||
def __align_face(
|
||||
self,
|
||||
image: np.ndarray,
|
||||
output_width: int,
|
||||
output_height: int,
|
||||
) -> np.ndarray:
|
||||
_, lands = self.landmark_detector.fit(
|
||||
image, np.array([(0, 0, image.shape[1], image.shape[0])])
|
||||
)
|
||||
landmarks: np.ndarray = lands[0][0]
|
||||
|
||||
# get landmarks for eyes
|
||||
leftEyePts = landmarks[42:48]
|
||||
rightEyePts = landmarks[36:42]
|
||||
|
||||
# compute the center of mass for each eye
|
||||
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
|
||||
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
|
||||
|
||||
# compute the angle between the eye centroids
|
||||
dY = rightEyeCenter[1] - leftEyeCenter[1]
|
||||
dX = rightEyeCenter[0] - leftEyeCenter[0]
|
||||
angle = np.degrees(np.arctan2(dY, dX)) - 180
|
||||
|
||||
# compute the desired right eye x-coordinate based on the
|
||||
# desired x-coordinate of the left eye
|
||||
desiredRightEyeX = 1.0 - 0.35
|
||||
|
||||
# determine the scale of the new resulting image by taking
|
||||
# the ratio of the distance between eyes in the *current*
|
||||
# image to the ratio of distance between eyes in the
|
||||
# *desired* image
|
||||
dist = np.sqrt((dX**2) + (dY**2))
|
||||
desiredDist = desiredRightEyeX - 0.35
|
||||
desiredDist *= output_width
|
||||
scale = desiredDist / dist
|
||||
|
||||
# compute center (x, y)-coordinates (i.e., the median point)
|
||||
# between the two eyes in the input image
|
||||
# grab the rotation matrix for rotating and scaling the face
|
||||
eyesCenter = (
|
||||
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
|
||||
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
|
||||
)
|
||||
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
|
||||
|
||||
# update the translation component of the matrix
|
||||
tX = output_width * 0.5
|
||||
tY = output_height * 0.35
|
||||
M[0, 2] += tX - eyesCenter[0]
|
||||
M[1, 2] += tY - eyesCenter[1]
|
||||
|
||||
# apply the affine transformation
|
||||
return cv2.warpAffine(
|
||||
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
|
||||
)
|
||||
|
||||
def __get_blur_factor(self, input: np.ndarray) -> float:
|
||||
"""Calculates the factor for the confidence based on the blur of the image."""
|
||||
if not self.face_config.blur_confidence_filter:
|
||||
return 1.0
|
||||
|
||||
variance = cv2.Laplacian(input, cv2.CV_64F).var()
|
||||
|
||||
if variance < 60: # image is very blurry
|
||||
return 0.96
|
||||
elif variance < 70: # image moderately blurry
|
||||
return 0.98
|
||||
elif variance < 80: # image is slightly blurry
|
||||
return 0.99
|
||||
else:
|
||||
return 1.0
|
||||
|
||||
def __clear_classifier(self) -> None:
|
||||
self.face_recognizer = None
|
||||
self.label_map = {}
|
||||
|
||||
def __detect_face(
|
||||
self, input: np.ndarray, threshold: float
|
||||
@@ -254,33 +174,6 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
|
||||
return face
|
||||
|
||||
def __classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None:
|
||||
if not self.landmark_detector:
|
||||
return None
|
||||
|
||||
if not self.label_map or not self.recognizer:
|
||||
self.__build_classifier()
|
||||
|
||||
if not self.recognizer:
|
||||
return None
|
||||
|
||||
# face recognition is best run on grayscale images
|
||||
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# get blur factor before aligning face
|
||||
blur_factor = self.__get_blur_factor(img)
|
||||
logger.debug(f"face detected with bluriness {blur_factor}")
|
||||
|
||||
# align face and run recognition
|
||||
img = self.__align_face(img, img.shape[1], img.shape[0])
|
||||
index, distance = self.recognizer.predict(img)
|
||||
|
||||
if index == -1:
|
||||
return None
|
||||
|
||||
score = (1.0 - (distance / 1000)) * blur_factor
|
||||
return self.label_map[index], round(score, 2)
|
||||
|
||||
def __update_metrics(self, duration: float) -> None:
|
||||
self.metrics.face_rec_fps.value = (
|
||||
self.metrics.face_rec_fps.value * 9 + duration
|
||||
@@ -301,7 +194,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
|
||||
# don't overwrite sub label for objects that have a sub label
|
||||
# that is not a face
|
||||
if obj_data.get("sub_label") and id not in self.detected_faces:
|
||||
if obj_data.get("sub_label") and id not in self.person_face_history:
|
||||
logger.debug(
|
||||
f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
|
||||
)
|
||||
@@ -370,53 +263,46 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
|
||||
]
|
||||
|
||||
res = self.__classify_face(face_frame)
|
||||
res = self.recognizer.classify(face_frame)
|
||||
|
||||
if not res:
|
||||
self.__update_metrics(datetime.datetime.now().timestamp() - start)
|
||||
return
|
||||
|
||||
sub_label, score = res
|
||||
|
||||
# calculate the overall face score as the probability * area of face
|
||||
# this will help to reduce false positives from small side-angle faces
|
||||
# if a large front-on face image may have scored slightly lower but
|
||||
# is more likely to be accurate due to the larger face area
|
||||
face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2)
|
||||
|
||||
logger.debug(
|
||||
f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}"
|
||||
f"Detected best face for person as: {sub_label} with probability {score}"
|
||||
)
|
||||
|
||||
if self.config.face_recognition.save_attempts:
|
||||
# write face to library
|
||||
folder = os.path.join(FACE_DIR, "train")
|
||||
file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp")
|
||||
file = os.path.join(folder, f"{id}-{sub_label}-{score}-0.webp")
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
cv2.imwrite(file, face_frame)
|
||||
|
||||
if score < self.config.face_recognition.recognition_threshold:
|
||||
logger.debug(
|
||||
f"Recognized face distance {score} is less than threshold {self.config.face_recognition.recognition_threshold}"
|
||||
)
|
||||
self.__update_metrics(datetime.datetime.now().timestamp() - start)
|
||||
return
|
||||
if id not in self.person_face_history:
|
||||
self.person_face_history[id] = []
|
||||
|
||||
if id in self.detected_faces and face_score <= self.detected_faces[id]:
|
||||
logger.debug(
|
||||
f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})."
|
||||
)
|
||||
self.__update_metrics(datetime.datetime.now().timestamp() - start)
|
||||
return
|
||||
|
||||
self.sub_label_publisher.publish(
|
||||
EventMetadataTypeEnum.sub_label, (id, sub_label, score)
|
||||
self.person_face_history[id].append(
|
||||
(sub_label, score, face_frame.shape[0] * face_frame.shape[1])
|
||||
)
|
||||
self.detected_faces[id] = face_score
|
||||
(weighted_sub_label, weighted_score) = weighted_average_by_area(
|
||||
self.person_face_history[id]
|
||||
)
|
||||
|
||||
if weighted_score >= self.face_config.recognition_threshold:
|
||||
self.sub_label_publisher.publish(
|
||||
EventMetadataTypeEnum.sub_label,
|
||||
(id, weighted_sub_label, weighted_score),
|
||||
)
|
||||
|
||||
self.__update_metrics(datetime.datetime.now().timestamp() - start)
|
||||
|
||||
def handle_request(self, topic, request_data) -> dict[str, any] | None:
|
||||
if topic == EmbeddingsRequestEnum.clear_face_classifier.value:
|
||||
self.__clear_classifier()
|
||||
self.recognizer.clear()
|
||||
elif topic == EmbeddingsRequestEnum.recognize_face.value:
|
||||
img = cv2.imdecode(
|
||||
np.frombuffer(base64.b64decode(request_data["image"]), dtype=np.uint8),
|
||||
@@ -431,7 +317,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
return {"message": "No face was detected.", "success": False}
|
||||
|
||||
face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
|
||||
res = self.__classify_face(face)
|
||||
res = self.recognizer.classify(face)
|
||||
|
||||
if not res:
|
||||
return {"success": False, "message": "No face was recognized."}
|
||||
@@ -480,7 +366,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
with open(file, "wb") as output:
|
||||
output.write(thumbnail.tobytes())
|
||||
|
||||
self.__clear_classifier()
|
||||
self.recognizer.clear()
|
||||
return {
|
||||
"message": "Successfully registered face.",
|
||||
"success": True,
|
||||
@@ -500,7 +386,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
"success": False,
|
||||
}
|
||||
|
||||
res = self.__classify_face(img)
|
||||
res = self.recognizer.classify(img)
|
||||
|
||||
if not res:
|
||||
return
|
||||
@@ -527,5 +413,5 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
|
||||
os.unlink(os.path.join(folder, files[-1]))
|
||||
|
||||
def expire_object(self, object_id: str):
|
||||
if object_id in self.detected_faces:
|
||||
self.detected_faces.pop(object_id)
|
||||
if object_id in self.person_face_history:
|
||||
self.person_face_history.pop(object_id)
|
||||
|
||||
Reference in New Issue
Block a user