Use opencv instead of face embedding

This commit is contained in:
Nicolas Mowen 2024-11-21 12:59:43 -07:00
parent 8b8df6d978
commit 51a28d3027
3 changed files with 32 additions and 84 deletions

View File

@ -24,7 +24,7 @@ class SemanticSearchConfig(FrigateBaseModel):
class FaceRecognitionConfig(FrigateBaseModel): class FaceRecognitionConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable face recognition.") enabled: bool = Field(default=False, title="Enable face recognition.")
threshold: float = Field( threshold: float = Field(
default=0.9, title="Face similarity score required to be considered a match." default=170, title="minimum face distance score required to be considered a match."
) )
min_area: int = Field( min_area: int = Field(
default=500, title="Min area of face box to consider running face recognition." default=500, title="Min area of face box to consider running face recognition."

View File

@ -451,29 +451,20 @@ class EmbeddingMaintainer(threading.Thread):
), ),
] ]
ret, webp = cv2.imencode( res = self.face_classifier.classify_face(face_frame)
".webp", face_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
if not ret:
logger.debug("Not processing face due to error creating cropped image.")
return
embedding = self.embeddings.embed_face("unknown", webp.tobytes(), upsert=False)
res = self.face_classifier.classify_face(embedding)
if not res: if not res:
return return
sub_label, score = res sub_label, distance = res
logger.debug( logger.debug(
f"Detected best face for person as: {sub_label} with score {score}" f"Detected best face for person as: {sub_label} with distance {distance}"
) )
if id in self.detected_faces and score <= self.detected_faces[id]: if id in self.detected_faces and distance >= self.detected_faces[id]:
logger.debug( logger.debug(
f"Recognized face score {score} is less than previous face score ({self.detected_faces.get(id)})." f"Recognized face distance {distance} is greater than previous face distance ({self.detected_faces.get(id)})."
) )
return return
@ -482,12 +473,11 @@ class EmbeddingMaintainer(threading.Thread):
json={ json={
"camera": obj_data.get("camera"), "camera": obj_data.get("camera"),
"subLabel": sub_label, "subLabel": sub_label,
"subLabelScore": score,
}, },
) )
if resp.status_code == 200: if resp.status_code == 200:
self.detected_faces[id] = score self.detected_faces[id] = distance
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Return the dimensions of the input image as [x, y, width, height].""" """Return the dimensions of the input image as [x, y, width, height]."""

View File

@ -4,6 +4,7 @@ import logging
import os import os
from typing import Any, Optional from typing import Any, Optional
import cv2
import numpy as np import numpy as np
import onnxruntime as ort import onnxruntime as ort
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
@ -164,81 +165,38 @@ class FaceClassificationModel:
def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase): def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase):
self.config = config self.config = config
self.db = db self.db = db
self.labeler: Optional[LabelEncoder] = None self.recognizer = cv2.face.LBPHFaceRecognizer_create(radius=4, threshold=config.threshold)
self.classifier: Optional[SVC] = None self.label_map: dict[int, str] = {}
self.embedding_query = f"""
SELECT
id,
distance
FROM vec_faces
WHERE face_embedding MATCH ?
AND k = {MIN_MATCHING_FACES} ORDER BY distance
"""
def __build_classifier(self) -> None: def __build_classifier(self) -> None:
faces: list[tuple[str, bytes]] = self.db.execute_sql( labels = []
"SELECT id, face_embedding FROM vec_faces" faces = []
).fetchall()
embeddings = np.array([deserialize(f[1]) for f in faces]) dir = "/media/frigate/clips/faces"
self.labeler = LabelEncoder() for idx, name in enumerate(os.listdir(dir)):
norms = Normalizer(norm="l2").transform(embeddings) self.label_map[idx] = name
labels = self.labeler.fit_transform([f[0].split("-")[0] for f in faces]) face_folder = os.path.join(dir, name)
self.classifier = SVC( for image in os.listdir(face_folder):
kernel="linear", probability=True, decision_function_shape="ovo" img = cv2.imread(os.path.join(face_folder, image))
) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
self.classifier.fit(norms, labels) equ = cv2.equalizeHist(gray)
faces.append(equ)
labels.append(idx)
self.recognizer.train(faces, np.array(labels))
def clear_classifier(self) -> None: def clear_classifier(self) -> None:
self.classifier = None self.classifier = None
self.labeler = None self.labeler = None
def classify_face(self, embedding: np.ndarray) -> Optional[tuple[str, float]]: def classify_face(self, face_image: np.ndarray) -> Optional[tuple[str, float]]:
best_faces = self.db.execute_sql( if not self.label_map:
self.embedding_query, [serialize(embedding)]
).fetchall()
logger.debug(f"Face embedding match: {best_faces}")
if not best_faces or len(best_faces) < MIN_MATCHING_FACES:
logger.debug(
f"{len(best_faces)} < {MIN_MATCHING_FACES} min required faces."
)
return None
sub_label = str(best_faces[0][0]).split("-")[0]
avg_score = 0
# check that the cosine similarity is close enough to match the face
for face in best_faces:
score = 1.0 - face[1]
if face[0].split("-")[0] != sub_label:
logger.debug("Detected multiple faces, result is not valid.")
return None
avg_score += score
avg_score = round(avg_score / MIN_MATCHING_FACES, 2)
if avg_score < self.config.threshold:
logger.debug(
f"Recognized face score {avg_score} is less than threshold ({self.config.threshold}))."
)
return None
if not self.classifier:
self.__build_classifier() self.__build_classifier()
cosine_index = self.labeler.transform([sub_label])[0] index, distance = self.recognizer.predict(cv2.equalizeHist(cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)))
probabilities: np.ndarray = self.classifier.predict_proba([embedding])[0]
svc_probability = max(probabilities)
logger.debug(
f"SVC face classification probability: {svc_probability} and index match: {cosine_index} / {np.where(probabilities == svc_probability)[0]}"
)
if cosine_index == np.where(probabilities == svc_probability)[0]:
return (
sub_label,
min(avg_score, svc_probability),
)
if index == -1:
return None return None
return self.label_map[index], distance