blakeblackshear.frigate/frigate/data_processing/real_time/face_processor.py
Nicolas Mowen e23c046136 Processing refactor (#15935)
* Refactor post processor to be real time processor

* Build out generic API for post processing

* Cleanup

* Fix
2025-01-18 21:34:09 -07:00

399 lines
14 KiB
Python

"""Handle processing images for face detection and recognition."""
import base64
import datetime
import logging
import os
import random
import string
from typing import Optional
import cv2
import numpy as np
import requests
from frigate.config import FrigateConfig
from frigate.const import FACE_DIR, FRIGATE_LOCALHOST, MODEL_CACHE_DIR
from frigate.util.image import area
from ..types import DataProcessorMetrics
from .api import RealTimeProcessorApi
logger = logging.getLogger(__name__)
MIN_MATCHING_FACES = 2
class FaceProcessor(RealTimeProcessorApi):
def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics):
super().__init__(config, metrics)
self.face_config = config.face_recognition
self.face_detector: cv2.FaceDetectorYN = None
self.landmark_detector: cv2.face.FacemarkLBF = None
self.face_recognizer: cv2.face.LBPHFaceRecognizer = None
self.requires_face_detection = "face" not in self.config.objects.all_objects
self.detected_faces: dict[str, float] = {}
download_path = os.path.join(MODEL_CACHE_DIR, "facedet")
self.model_files = {
"facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx",
"landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml",
}
if not all(
os.path.exists(os.path.join(download_path, n))
for n in self.model_files.keys()
):
# conditionally import ModelDownloader
from frigate.util.downloader import ModelDownloader
self.downloader = ModelDownloader(
model_name="facedet",
download_path=download_path,
file_names=self.model_files.keys(),
download_func=self.__download_models,
complete_func=self.__build_detector,
)
self.downloader.ensure_model_files()
else:
self.__build_detector()
self.label_map: dict[int, str] = {}
self.__build_classifier()
def __download_models(self, path: str) -> None:
try:
file_name = os.path.basename(path)
# conditionally import ModelDownloader
from frigate.util.downloader import ModelDownloader
ModelDownloader.download_from_url(self.model_files[file_name], path)
except Exception as e:
logger.error(f"Failed to download {path}: {e}")
def __build_detector(self) -> None:
self.face_detector = cv2.FaceDetectorYN.create(
"/config/model_cache/facedet/facedet.onnx",
config="",
input_size=(320, 320),
score_threshold=0.8,
nms_threshold=0.3,
)
self.landmark_detector = cv2.face.createFacemarkLBF()
self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml")
def __build_classifier(self) -> None:
if not self.landmark_detector:
return None
labels = []
faces = []
dir = "/media/frigate/clips/faces"
for idx, name in enumerate(os.listdir(dir)):
if name == "train":
continue
face_folder = os.path.join(dir, name)
if not os.path.isdir(face_folder):
continue
self.label_map[idx] = name
for image in os.listdir(face_folder):
img = cv2.imread(os.path.join(face_folder, image))
if img is None:
continue
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
faces.append(img)
labels.append(idx)
self.recognizer: cv2.face.LBPHFaceRecognizer = (
cv2.face.LBPHFaceRecognizer_create(
radius=2, threshold=(1 - self.face_config.min_score) * 1000
)
)
self.recognizer.train(faces, np.array(labels))
def __align_face(
self,
image: np.ndarray,
output_width: int,
output_height: int,
) -> np.ndarray:
_, lands = self.landmark_detector.fit(
image, np.array([(0, 0, image.shape[1], image.shape[0])])
)
landmarks: np.ndarray = lands[0][0]
# get landmarks for eyes
leftEyePts = landmarks[42:48]
rightEyePts = landmarks[36:42]
# compute the center of mass for each eye
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
# compute the angle between the eye centroids
dY = rightEyeCenter[1] - leftEyeCenter[1]
dX = rightEyeCenter[0] - leftEyeCenter[0]
angle = np.degrees(np.arctan2(dY, dX)) - 180
# compute the desired right eye x-coordinate based on the
# desired x-coordinate of the left eye
desiredRightEyeX = 1.0 - 0.35
# determine the scale of the new resulting image by taking
# the ratio of the distance between eyes in the *current*
# image to the ratio of distance between eyes in the
# *desired* image
dist = np.sqrt((dX**2) + (dY**2))
desiredDist = desiredRightEyeX - 0.35
desiredDist *= output_width
scale = desiredDist / dist
# compute center (x, y)-coordinates (i.e., the median point)
# between the two eyes in the input image
# grab the rotation matrix for rotating and scaling the face
eyesCenter = (
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
)
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
# update the translation component of the matrix
tX = output_width * 0.5
tY = output_height * 0.35
M[0, 2] += tX - eyesCenter[0]
M[1, 2] += tY - eyesCenter[1]
# apply the affine transformation
return cv2.warpAffine(
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
)
def __clear_classifier(self) -> None:
self.face_recognizer = None
self.label_map = {}
def __detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Detect faces in input image."""
if not self.face_detector:
return None
self.face_detector.setInputSize((input.shape[1], input.shape[0]))
faces = self.face_detector.detect(input)
if faces is None or faces[1] is None:
return None
face = None
for _, potential_face in enumerate(faces[1]):
raw_bbox = potential_face[0:4].astype(np.uint16)
x: int = max(raw_bbox[0], 0)
y: int = max(raw_bbox[1], 0)
w: int = raw_bbox[2]
h: int = raw_bbox[3]
bbox = (x, y, x + w, y + h)
if face is None or area(bbox) > area(face):
face = bbox
return face
def __classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None:
if not self.landmark_detector:
return None
if not self.label_map:
self.__build_classifier()
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
index, distance = self.recognizer.predict(img)
if index == -1:
return None
score = 1.0 - (distance / 1000)
return self.label_map[index], round(score, 2)
def __update_metrics(self, duration: float) -> None:
self.metrics.face_rec_fps.value = (
self.metrics.face_rec_fps.value * 9 + duration
) / 10
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray):
"""Look for faces in image."""
start = datetime.datetime.now().timestamp()
id = obj_data["id"]
# don't run for non person objects
if obj_data.get("label") != "person":
logger.debug("Not a processing face for non person object.")
return
# don't overwrite sub label for objects that have a sub label
# that is not a face
if obj_data.get("sub_label") and id not in self.detected_faces:
logger.debug(
f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
)
return
face: Optional[dict[str, any]] = None
if self.requires_face_detection:
logger.debug("Running manual face detection.")
person_box = obj_data.get("box")
if not person_box:
return
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = person_box
person = rgb[top:bottom, left:right]
face_box = self.__detect_face(person)
if not face_box:
logger.debug("Detected no faces for person object.")
return
face_frame = person[
max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
]
face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR)
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "face":
continue
if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
face = attr
# no faces detected in this frame
if not face:
return
face_box = face.get("box")
# check that face is valid
if not face_box or area(face_box) < self.config.face_recognition.min_area:
logger.debug(f"Invalid face box {face}")
return
face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
face_frame = face_frame[
max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
]
res = self.__classify_face(face_frame)
if not res:
return
sub_label, score = res
# calculate the overall face score as the probability * area of face
# this will help to reduce false positives from small side-angle faces
# if a large front-on face image may have scored slightly lower but
# is more likely to be accurate due to the larger face area
face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2)
logger.debug(
f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}"
)
if self.config.face_recognition.save_attempts:
# write face to library
folder = os.path.join(FACE_DIR, "train")
file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp")
os.makedirs(folder, exist_ok=True)
cv2.imwrite(file, face_frame)
if score < self.config.face_recognition.threshold:
logger.debug(
f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}"
)
self.__update_metrics(datetime.datetime.now().timestamp() - start)
return
if id in self.detected_faces and face_score <= self.detected_faces[id]:
logger.debug(
f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})."
)
self.__update_metrics(datetime.datetime.now().timestamp() - start)
return
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": score,
},
)
if resp.status_code == 200:
self.detected_faces[id] = face_score
self.__update_metrics(datetime.datetime.now().timestamp() - start)
def handle_request(self, request_data) -> dict[str, any] | None:
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
label = request_data["face_name"]
id = f"{label}-{rand_id}"
if request_data.get("cropped"):
thumbnail = request_data["image"]
else:
img = cv2.imdecode(
np.frombuffer(base64.b64decode(request_data["image"]), dtype=np.uint8),
cv2.IMREAD_COLOR,
)
face_box = self.__detect_face(img)
if not face_box:
return {
"message": "No face was detected.",
"success": False,
}
face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
ret, thumbnail = cv2.imencode(
".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
# write face to library
folder = os.path.join(FACE_DIR, label)
file = os.path.join(folder, f"{id}.webp")
os.makedirs(folder, exist_ok=True)
# save face image
with open(file, "wb") as output:
output.write(thumbnail.tobytes())
self.__clear_classifier()
return {
"message": "Successfully registered face.",
"success": True,
}
def expire_object(self, object_id: str):
if object_id in self.detected_faces:
self.detected_faces.pop(object_id)