diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 303690b5c..8e188f899 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -69,7 +69,9 @@ class EmbeddingMaintainer(threading.Thread): self.requires_face_detection = "face" not in self.config.objects.all_objects self.detected_faces: dict[str, float] = {} self.face_classifier = ( - FaceClassificationModel(db) if self.face_recognition_enabled else None + FaceClassificationModel(self.config.face_recognition, db) + if self.face_recognition_enabled + else None ) # create communication for updating event descriptions @@ -201,7 +203,9 @@ class EmbeddingMaintainer(threading.Thread): # Create our own thumbnail based on the bounding box and the frame time try: - yuv_frame = self.frame_manager.get(frame_name, camera_config.frame_shape_yuv) + yuv_frame = self.frame_manager.get( + frame_name, camera_config.frame_shape_yuv + ) except FileNotFoundError: pass @@ -467,11 +471,9 @@ class EmbeddingMaintainer(threading.Thread): f"Detected best face for person as: {sub_label} with score {score}" ) - if score < self.config.face_recognition.threshold or ( - id in self.detected_faces and score <= self.detected_faces[id] - ): + if id in self.detected_faces and score <= self.detected_faces[id]: logger.debug( - f"Recognized face score {score} is less than threshold ({self.config.face_recognition.threshold}) / previous face score ({self.detected_faces.get(id)})." + f"Recognized face score {score} is less than previous face score ({self.detected_faces.get(id)})." ) return diff --git a/frigate/util/model.py b/frigate/util/model.py index d43320006..6badb8547 100644 --- a/frigate/util/model.py +++ b/frigate/util/model.py @@ -10,7 +10,8 @@ from playhouse.sqliteq import SqliteQueueDatabase from sklearn.preprocessing import LabelEncoder, Normalizer from sklearn.svm import SVC -from frigate.util.builtin import deserialize +from frigate.config.semantic_search import FaceRecognitionConfig +from frigate.util.builtin import deserialize, serialize try: import openvino as ov @@ -21,6 +22,9 @@ except ImportError: logger = logging.getLogger(__name__) +MIN_MATCHING_FACES = 2 + + def get_ort_providers( force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False ) -> tuple[list[str], list[dict[str, any]]]: @@ -157,10 +161,19 @@ class ONNXModelRunner: class FaceClassificationModel: - def __init__(self, db: SqliteQueueDatabase): + def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase): + self.config = config self.db = db self.labeler: Optional[LabelEncoder] = None self.classifier: Optional[SVC] = None + self.embedding_query = f""" + SELECT + id, + distance + FROM vec_faces + WHERE face_embedding MATCH ? + AND k = {MIN_MATCHING_FACES} ORDER BY distance + """ def __build_classifier(self) -> None: faces: list[tuple[str, bytes]] = self.db.execute_sql( @@ -170,7 +183,9 @@ class FaceClassificationModel: self.labeler = LabelEncoder() norms = Normalizer(norm="l2").transform(embeddings) labels = self.labeler.fit_transform([f[0].split("-")[0] for f in faces]) - self.classifier = SVC(kernel="linear", probability=True) + self.classifier = SVC( + kernel="linear", probability=True, decision_function_shape="ovo" + ) self.classifier.fit(norms, labels) def clear_classifier(self) -> None: @@ -178,17 +193,50 @@ class FaceClassificationModel: self.labeler = None def classify_face(self, embedding: np.ndarray) -> Optional[tuple[str, float]]: + best_faces = self.db.execute_sql( + self.embedding_query, [serialize(embedding)] + ).fetchall() + logger.debug(f"Face embedding match: {best_faces}") + + if not best_faces or len(best_faces) < MIN_MATCHING_FACES: + logger.debug( + f"{len(best_faces)} < {MIN_MATCHING_FACES} min required faces." + ) + return None + + sub_label = str(best_faces[0][0]).split("-")[0] + avg_score = 0 + + # check that the cosine similarity is close enough to match the face + for face in best_faces: + score = 1.0 - face[1] + + if face[0].split("-")[0] != sub_label: + logger.debug("Detected multiple faces, result is not valid.") + return None + + avg_score += score + + avg_score = round(avg_score / MIN_MATCHING_FACES, 2) + + if avg_score < self.config.threshold: + logger.debug( + f"Recognized face score {avg_score} is less than threshold ({self.config.threshold}))." + ) + return None + if not self.classifier: self.__build_classifier() - res = self.classifier.predict([embedding]) + cosine_index = self.labeler.transform([sub_label])[0] + probabilities: list[float] = self.classifier.predict_proba([embedding])[0] + svc_probability = max(probabilities) + logger.debug(f"SVC face classification probability: {svc_probability} and index match: {cosine_index} / {probabilities.index(svc_probability)}") - if res is None: - return None + if cosine_index == probabilities.index(svc_probability): + return ( + sub_label, + min(avg_score, svc_probability), + ) - label = res[0] - probabilities = self.classifier.predict_proba([embedding])[0] - return ( - self.labeler.inverse_transform([label])[0], - round(probabilities[label], 2), - ) + return None