mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-21 19:07:46 +01:00
Validate faces using cosine distance and SVC
This commit is contained in:
parent
6e41fe6132
commit
2d4f5ecd6f
@ -69,7 +69,9 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
self.requires_face_detection = "face" not in self.config.objects.all_objects
|
self.requires_face_detection = "face" not in self.config.objects.all_objects
|
||||||
self.detected_faces: dict[str, float] = {}
|
self.detected_faces: dict[str, float] = {}
|
||||||
self.face_classifier = (
|
self.face_classifier = (
|
||||||
FaceClassificationModel(db) if self.face_recognition_enabled else None
|
FaceClassificationModel(self.config.face_recognition, db)
|
||||||
|
if self.face_recognition_enabled
|
||||||
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
# create communication for updating event descriptions
|
# create communication for updating event descriptions
|
||||||
@ -201,7 +203,9 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
|
|
||||||
# Create our own thumbnail based on the bounding box and the frame time
|
# Create our own thumbnail based on the bounding box and the frame time
|
||||||
try:
|
try:
|
||||||
yuv_frame = self.frame_manager.get(frame_name, camera_config.frame_shape_yuv)
|
yuv_frame = self.frame_manager.get(
|
||||||
|
frame_name, camera_config.frame_shape_yuv
|
||||||
|
)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -467,11 +471,9 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
f"Detected best face for person as: {sub_label} with score {score}"
|
f"Detected best face for person as: {sub_label} with score {score}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if score < self.config.face_recognition.threshold or (
|
if id in self.detected_faces and score <= self.detected_faces[id]:
|
||||||
id in self.detected_faces and score <= self.detected_faces[id]
|
|
||||||
):
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Recognized face score {score} is less than threshold ({self.config.face_recognition.threshold}) / previous face score ({self.detected_faces.get(id)})."
|
f"Recognized face score {score} is less than previous face score ({self.detected_faces.get(id)})."
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -10,7 +10,8 @@ from playhouse.sqliteq import SqliteQueueDatabase
|
|||||||
from sklearn.preprocessing import LabelEncoder, Normalizer
|
from sklearn.preprocessing import LabelEncoder, Normalizer
|
||||||
from sklearn.svm import SVC
|
from sklearn.svm import SVC
|
||||||
|
|
||||||
from frigate.util.builtin import deserialize
|
from frigate.config.semantic_search import FaceRecognitionConfig
|
||||||
|
from frigate.util.builtin import deserialize, serialize
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import openvino as ov
|
import openvino as ov
|
||||||
@ -21,6 +22,9 @@ except ImportError:
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
MIN_MATCHING_FACES = 2
|
||||||
|
|
||||||
|
|
||||||
def get_ort_providers(
|
def get_ort_providers(
|
||||||
force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
|
force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
|
||||||
) -> tuple[list[str], list[dict[str, any]]]:
|
) -> tuple[list[str], list[dict[str, any]]]:
|
||||||
@ -157,10 +161,19 @@ class ONNXModelRunner:
|
|||||||
|
|
||||||
|
|
||||||
class FaceClassificationModel:
|
class FaceClassificationModel:
|
||||||
def __init__(self, db: SqliteQueueDatabase):
|
def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase):
|
||||||
|
self.config = config
|
||||||
self.db = db
|
self.db = db
|
||||||
self.labeler: Optional[LabelEncoder] = None
|
self.labeler: Optional[LabelEncoder] = None
|
||||||
self.classifier: Optional[SVC] = None
|
self.classifier: Optional[SVC] = None
|
||||||
|
self.embedding_query = f"""
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
distance
|
||||||
|
FROM vec_faces
|
||||||
|
WHERE face_embedding MATCH ?
|
||||||
|
AND k = {MIN_MATCHING_FACES} ORDER BY distance
|
||||||
|
"""
|
||||||
|
|
||||||
def __build_classifier(self) -> None:
|
def __build_classifier(self) -> None:
|
||||||
faces: list[tuple[str, bytes]] = self.db.execute_sql(
|
faces: list[tuple[str, bytes]] = self.db.execute_sql(
|
||||||
@ -170,7 +183,9 @@ class FaceClassificationModel:
|
|||||||
self.labeler = LabelEncoder()
|
self.labeler = LabelEncoder()
|
||||||
norms = Normalizer(norm="l2").transform(embeddings)
|
norms = Normalizer(norm="l2").transform(embeddings)
|
||||||
labels = self.labeler.fit_transform([f[0].split("-")[0] for f in faces])
|
labels = self.labeler.fit_transform([f[0].split("-")[0] for f in faces])
|
||||||
self.classifier = SVC(kernel="linear", probability=True)
|
self.classifier = SVC(
|
||||||
|
kernel="linear", probability=True, decision_function_shape="ovo"
|
||||||
|
)
|
||||||
self.classifier.fit(norms, labels)
|
self.classifier.fit(norms, labels)
|
||||||
|
|
||||||
def clear_classifier(self) -> None:
|
def clear_classifier(self) -> None:
|
||||||
@ -178,17 +193,50 @@ class FaceClassificationModel:
|
|||||||
self.labeler = None
|
self.labeler = None
|
||||||
|
|
||||||
def classify_face(self, embedding: np.ndarray) -> Optional[tuple[str, float]]:
|
def classify_face(self, embedding: np.ndarray) -> Optional[tuple[str, float]]:
|
||||||
|
best_faces = self.db.execute_sql(
|
||||||
|
self.embedding_query, [serialize(embedding)]
|
||||||
|
).fetchall()
|
||||||
|
logger.debug(f"Face embedding match: {best_faces}")
|
||||||
|
|
||||||
|
if not best_faces or len(best_faces) < MIN_MATCHING_FACES:
|
||||||
|
logger.debug(
|
||||||
|
f"{len(best_faces)} < {MIN_MATCHING_FACES} min required faces."
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
sub_label = str(best_faces[0][0]).split("-")[0]
|
||||||
|
avg_score = 0
|
||||||
|
|
||||||
|
# check that the cosine similarity is close enough to match the face
|
||||||
|
for face in best_faces:
|
||||||
|
score = 1.0 - face[1]
|
||||||
|
|
||||||
|
if face[0].split("-")[0] != sub_label:
|
||||||
|
logger.debug("Detected multiple faces, result is not valid.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
avg_score += score
|
||||||
|
|
||||||
|
avg_score = round(avg_score / MIN_MATCHING_FACES, 2)
|
||||||
|
|
||||||
|
if avg_score < self.config.threshold:
|
||||||
|
logger.debug(
|
||||||
|
f"Recognized face score {avg_score} is less than threshold ({self.config.threshold}))."
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
if not self.classifier:
|
if not self.classifier:
|
||||||
self.__build_classifier()
|
self.__build_classifier()
|
||||||
|
|
||||||
res = self.classifier.predict([embedding])
|
cosine_index = self.labeler.transform([sub_label])[0]
|
||||||
|
probabilities: list[float] = self.classifier.predict_proba([embedding])[0]
|
||||||
|
svc_probability = max(probabilities)
|
||||||
|
logger.debug(f"SVC face classification probability: {svc_probability} and index match: {cosine_index} / {probabilities.index(svc_probability)}")
|
||||||
|
|
||||||
if res is None:
|
if cosine_index == probabilities.index(svc_probability):
|
||||||
return None
|
|
||||||
|
|
||||||
label = res[0]
|
|
||||||
probabilities = self.classifier.predict_proba([embedding])[0]
|
|
||||||
return (
|
return (
|
||||||
self.labeler.inverse_transform([label])[0],
|
sub_label,
|
||||||
round(probabilities[label], 2),
|
min(avg_score, svc_probability),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
|
Loading…
Reference in New Issue
Block a user