diff --git a/frigate/config/semantic_search.py b/frigate/config/semantic_search.py index f5e881e4e..d95468f09 100644 --- a/frigate/config/semantic_search.py +++ b/frigate/config/semantic_search.py @@ -24,7 +24,7 @@ class SemanticSearchConfig(FrigateBaseModel): class FaceRecognitionConfig(FrigateBaseModel): enabled: bool = Field(default=False, title="Enable face recognition.") threshold: float = Field( - default=0.9, title="Face similarity score required to be considered a match." + default=170, title="minimum face distance score required to be considered a match." ) min_area: int = Field( default=500, title="Min area of face box to consider running face recognition." diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 8e188f899..bebed8fb8 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -451,29 +451,20 @@ class EmbeddingMaintainer(threading.Thread): ), ] - ret, webp = cv2.imencode( - ".webp", face_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 100] - ) - - if not ret: - logger.debug("Not processing face due to error creating cropped image.") - return - - embedding = self.embeddings.embed_face("unknown", webp.tobytes(), upsert=False) - res = self.face_classifier.classify_face(embedding) + res = self.face_classifier.classify_face(face_frame) if not res: return - sub_label, score = res + sub_label, distance = res logger.debug( - f"Detected best face for person as: {sub_label} with score {score}" + f"Detected best face for person as: {sub_label} with distance {distance}" ) - if id in self.detected_faces and score <= self.detected_faces[id]: + if id in self.detected_faces and distance >= self.detected_faces[id]: logger.debug( - f"Recognized face score {score} is less than previous face score ({self.detected_faces.get(id)})." + f"Recognized face distance {distance} is greater than previous face distance ({self.detected_faces.get(id)})." ) return @@ -482,12 +473,11 @@ class EmbeddingMaintainer(threading.Thread): json={ "camera": obj_data.get("camera"), "subLabel": sub_label, - "subLabelScore": score, }, ) if resp.status_code == 200: - self.detected_faces[id] = score + self.detected_faces[id] = distance def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: """Return the dimensions of the input image as [x, y, width, height].""" diff --git a/frigate/util/model.py b/frigate/util/model.py index 8aa2545ce..748b1358d 100644 --- a/frigate/util/model.py +++ b/frigate/util/model.py @@ -4,6 +4,7 @@ import logging import os from typing import Any, Optional +import cv2 import numpy as np import onnxruntime as ort from playhouse.sqliteq import SqliteQueueDatabase @@ -164,81 +165,38 @@ class FaceClassificationModel: def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase): self.config = config self.db = db - self.labeler: Optional[LabelEncoder] = None - self.classifier: Optional[SVC] = None - self.embedding_query = f""" - SELECT - id, - distance - FROM vec_faces - WHERE face_embedding MATCH ? - AND k = {MIN_MATCHING_FACES} ORDER BY distance - """ + self.recognizer = cv2.face.LBPHFaceRecognizer_create(radius=4, threshold=config.threshold) + self.label_map: dict[int, str] = {} def __build_classifier(self) -> None: - faces: list[tuple[str, bytes]] = self.db.execute_sql( - "SELECT id, face_embedding FROM vec_faces" - ).fetchall() - embeddings = np.array([deserialize(f[1]) for f in faces]) - self.labeler = LabelEncoder() - norms = Normalizer(norm="l2").transform(embeddings) - labels = self.labeler.fit_transform([f[0].split("-")[0] for f in faces]) - self.classifier = SVC( - kernel="linear", probability=True, decision_function_shape="ovo" - ) - self.classifier.fit(norms, labels) + labels = [] + faces = [] + + dir = "/media/frigate/clips/faces" + for idx, name in enumerate(os.listdir(dir)): + self.label_map[idx] = name + face_folder = os.path.join(dir, name) + for image in os.listdir(face_folder): + img = cv2.imread(os.path.join(face_folder, image)) + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + equ = cv2.equalizeHist(gray) + faces.append(equ) + labels.append(idx) + + self.recognizer.train(faces, np.array(labels)) def clear_classifier(self) -> None: self.classifier = None self.labeler = None - def classify_face(self, embedding: np.ndarray) -> Optional[tuple[str, float]]: - best_faces = self.db.execute_sql( - self.embedding_query, [serialize(embedding)] - ).fetchall() - logger.debug(f"Face embedding match: {best_faces}") - - if not best_faces or len(best_faces) < MIN_MATCHING_FACES: - logger.debug( - f"{len(best_faces)} < {MIN_MATCHING_FACES} min required faces." - ) - return None - - sub_label = str(best_faces[0][0]).split("-")[0] - avg_score = 0 - - # check that the cosine similarity is close enough to match the face - for face in best_faces: - score = 1.0 - face[1] - - if face[0].split("-")[0] != sub_label: - logger.debug("Detected multiple faces, result is not valid.") - return None - - avg_score += score - - avg_score = round(avg_score / MIN_MATCHING_FACES, 2) - - if avg_score < self.config.threshold: - logger.debug( - f"Recognized face score {avg_score} is less than threshold ({self.config.threshold}))." - ) - return None - - if not self.classifier: + def classify_face(self, face_image: np.ndarray) -> Optional[tuple[str, float]]: + if not self.label_map: self.__build_classifier() - cosine_index = self.labeler.transform([sub_label])[0] - probabilities: np.ndarray = self.classifier.predict_proba([embedding])[0] - svc_probability = max(probabilities) - logger.debug( - f"SVC face classification probability: {svc_probability} and index match: {cosine_index} / {np.where(probabilities == svc_probability)[0]}" - ) + index, distance = self.recognizer.predict(cv2.equalizeHist(cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY))) - if cosine_index == np.where(probabilities == svc_probability)[0]: - return ( - sub_label, - min(avg_score, svc_probability), - ) + if index == -1: + return None + + return self.label_map[index], distance - return None