diff --git a/frigate/__main__.py b/frigate/__main__.py index d8cbae3ee..4143f7ae6 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -87,7 +87,7 @@ def main() -> None: if current != full_config: print(f"Line # : {line_number}") print(f"Key : {' -> '.join(map(str, error_path))}") - print(f"Value : {error.get('input','-')}") + print(f"Value : {error.get('input', '-')}") print(f"Message : {error.get('msg', error.get('type', 'Unknown'))}\n") print("*************************************************************") diff --git a/frigate/api/classification.py b/frigate/api/classification.py index 6405516e0..3c505d367 100644 --- a/frigate/api/classification.py +++ b/frigate/api/classification.py @@ -39,16 +39,28 @@ def get_faces(): @router.post("/faces/{name}") async def register_face(request: Request, name: str, file: UploadFile): + if not request.app.frigate_config.face_recognition.enabled: + return JSONResponse( + status_code=400, + content={"message": "Face recognition is not enabled.", "success": False}, + ) + context: EmbeddingsContext = request.app.embeddings - context.register_face(name, await file.read()) + result = context.register_face(name, await file.read()) return JSONResponse( - status_code=200, - content={"success": True, "message": "Successfully registered face."}, + status_code=200 if result.get("success", True) else 400, + content=result, ) @router.post("/faces/train/{name}/classify") -def train_face(name: str, body: dict = None): +def train_face(request: Request, name: str, body: dict = None): + if not request.app.frigate_config.face_recognition.enabled: + return JSONResponse( + status_code=400, + content={"message": "Face recognition is not enabled.", "success": False}, + ) + json: dict[str, any] = body or {} training_file = os.path.join( FACE_DIR, f"train/{sanitize_filename(json.get('training_file', ''))}" @@ -82,6 +94,12 @@ def train_face(name: str, body: dict = None): @router.post("/faces/{name}/delete") def deregister_faces(request: Request, name: str, body: dict = None): + if not request.app.frigate_config.face_recognition.enabled: + return JSONResponse( + status_code=400, + content={"message": "Face recognition is not enabled.", "success": False}, + ) + json: dict[str, any] = body or {} list_of_ids = json.get("ids", "") diff --git a/frigate/app.py b/frigate/app.py index ad5d167c8..e3f2f9d7f 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -41,7 +41,6 @@ from frigate.const import ( ) from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.embeddings import EmbeddingsContext, manage_embeddings -from frigate.embeddings.types import EmbeddingsMetrics from frigate.events.audio import AudioProcessor from frigate.events.cleanup import EventCleanup from frigate.events.external import ExternalEventProcessor @@ -60,6 +59,7 @@ from frigate.models import ( from frigate.object_detection import ObjectDetectProcess from frigate.object_processing import TrackedObjectProcessor from frigate.output.output import output_frames +from frigate.postprocessing.types import PostProcessingMetrics from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController from frigate.record.cleanup import RecordingCleanup @@ -90,8 +90,8 @@ class FrigateApp: self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() self.camera_metrics: dict[str, CameraMetrics] = {} - self.embeddings_metrics: EmbeddingsMetrics | None = ( - EmbeddingsMetrics() if config.semantic_search.enabled else None + self.embeddings_metrics: PostProcessingMetrics | None = ( + PostProcessingMetrics() if config.semantic_search.enabled else None ) self.ptz_metrics: dict[str, PTZMetrics] = {} self.processes: dict[str, int] = {} diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 43da686ce..d75d88500 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -20,14 +20,14 @@ from frigate.models import Event from frigate.util.builtin import serialize from frigate.util.services import listen +from ..postprocessing.types import PostProcessingMetrics from .maintainer import EmbeddingMaintainer -from .types import EmbeddingsMetrics from .util import ZScoreNormalization logger = logging.getLogger(__name__) -def manage_embeddings(config: FrigateConfig, metrics: EmbeddingsMetrics) -> None: +def manage_embeddings(config: FrigateConfig, metrics: PostProcessingMetrics) -> None: # Only initialize embeddings if semantic search is enabled if not config.semantic_search.enabled: return @@ -192,8 +192,8 @@ class EmbeddingsContext: return results - def register_face(self, face_name: str, image_data: bytes) -> None: - self.requestor.send_data( + def register_face(self, face_name: str, image_data: bytes) -> dict[str, any]: + return self.requestor.send_data( EmbeddingsRequestEnum.register_face.value, { "face_name": face_name, diff --git a/frigate/embeddings/embeddings.py b/frigate/embeddings/embeddings.py index 376ae4713..3f046d0c3 100644 --- a/frigate/embeddings/embeddings.py +++ b/frigate/embeddings/embeddings.py @@ -21,8 +21,8 @@ from frigate.models import Event from frigate.types import ModelStatusTypesEnum from frigate.util.builtin import serialize +from ..postprocessing.types import PostProcessingMetrics from .functions.onnx import GenericONNXEmbedding, ModelTypeEnum -from .types import EmbeddingsMetrics logger = logging.getLogger(__name__) @@ -65,7 +65,7 @@ class Embeddings: self, config: FrigateConfig, db: SqliteVecQueueDatabase, - metrics: EmbeddingsMetrics, + metrics: PostProcessingMetrics, ) -> None: self.config = config self.db = db diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 6aa503624..5eb06358d 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -4,9 +4,7 @@ import base64 import datetime import logging import os -import random import re -import string import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path @@ -28,7 +26,6 @@ from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig from frigate.const import ( CLIPS_DIR, - FACE_DIR, FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION, ) @@ -36,13 +33,14 @@ from frigate.embeddings.lpr.lpr import LicensePlateRecognition from frigate.events.types import EventTypeEnum from frigate.genai import get_genai_client from frigate.models import Event +from frigate.postprocessing.face_processor import FaceProcessor +from frigate.postprocessing.processor_api import ProcessorApi from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import serialize from frigate.util.image import SharedMemoryFrameManager, area, calculate_region -from frigate.util.model import FaceClassificationModel +from ..postprocessing.types import PostProcessingMetrics from .embeddings import Embeddings -from .types import EmbeddingsMetrics logger = logging.getLogger(__name__) @@ -56,7 +54,7 @@ class EmbeddingMaintainer(threading.Thread): self, db: SqliteQueueDatabase, config: FrigateConfig, - metrics: EmbeddingsMetrics, + metrics: PostProcessingMetrics, stop_event: MpEvent, ) -> None: super().__init__(name="embeddings_maintainer") @@ -75,16 +73,10 @@ class EmbeddingMaintainer(threading.Thread): ) self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() + self.processors: list[ProcessorApi] = [] - # set face recognition conditions - self.face_recognition_enabled = self.config.face_recognition.enabled - self.requires_face_detection = "face" not in self.config.objects.all_objects - self.detected_faces: dict[str, float] = {} - self.face_classifier = ( - FaceClassificationModel(self.config.face_recognition, db) - if self.face_recognition_enabled - else None - ) + if self.config.face_recognition.enabled: + self.processors.append(FaceProcessor(self.config, metrics)) # create communication for updating event descriptions self.requestor = InterProcessRequestor() @@ -142,46 +134,12 @@ class EmbeddingMaintainer(threading.Thread): self.embeddings.embed_description("", data, upsert=False), pack=False, ) - elif topic == EmbeddingsRequestEnum.register_face.value: - if not self.face_recognition_enabled: - return False + else: + for processor in self.processors: + resp = processor.handle_request(data) - rand_id = "".join( - random.choices(string.ascii_lowercase + string.digits, k=6) - ) - label = data["face_name"] - id = f"{label}-{rand_id}" - - if data.get("cropped"): - pass - else: - img = cv2.imdecode( - np.frombuffer( - base64.b64decode(data["image"]), dtype=np.uint8 - ), - cv2.IMREAD_COLOR, - ) - face_box = self._detect_face(img) - - if not face_box: - return False - - face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]] - ret, thumbnail = cv2.imencode( - ".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100] - ) - - # write face to library - folder = os.path.join(FACE_DIR, label) - file = os.path.join(folder, f"{id}.webp") - os.makedirs(folder, exist_ok=True) - - # save face image - with open(file, "wb") as output: - output.write(thumbnail.tobytes()) - - self.face_classifier.clear_classifier() - return True + if resp is not None: + return resp except Exception as e: logger.error(f"Unable to handle embeddings request {e}") @@ -204,8 +162,8 @@ class EmbeddingMaintainer(threading.Thread): # no need to process updated objects if face recognition, lpr, genai are disabled if ( not camera_config.genai.enabled - and not self.face_recognition_enabled and not self.lpr_config.enabled + and len(self.processors) == 0 ): return @@ -223,15 +181,8 @@ class EmbeddingMaintainer(threading.Thread): ) return - if self.face_recognition_enabled: - start = datetime.datetime.now().timestamp() - processed = self._process_face(data, yuv_frame) - - if processed: - duration = datetime.datetime.now().timestamp() - start - self.metrics.face_rec_fps.value = ( - self.metrics.face_rec_fps.value * 9 + duration - ) / 10 + for processor in self.processors: + processor.process_frame(data, yuv_frame) if self.lpr_config.enabled: start = datetime.datetime.now().timestamp() @@ -271,8 +222,8 @@ class EmbeddingMaintainer(threading.Thread): event_id, camera, updated_db = ended camera_config = self.config.cameras[camera] - if event_id in self.detected_faces: - self.detected_faces.pop(event_id) + for processor in self.processors: + processor.expire_object(event_id) if event_id in self.detected_license_plates: self.detected_license_plates.pop(event_id) @@ -399,150 +350,6 @@ class EmbeddingMaintainer(threading.Thread): if event_id: self.handle_regenerate_description(event_id, source) - def _detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]: - """Detect faces in input image.""" - faces = self.face_classifier.detect_faces(input) - - if faces is None or faces[1] is None: - return None - - face = None - - for _, potential_face in enumerate(faces[1]): - raw_bbox = potential_face[0:4].astype(np.uint16) - x: int = max(raw_bbox[0], 0) - y: int = max(raw_bbox[1], 0) - w: int = raw_bbox[2] - h: int = raw_bbox[3] - bbox = (x, y, x + w, y + h) - - if face is None or area(bbox) > area(face): - face = bbox - - return face - - def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> bool: - """Look for faces in image.""" - id = obj_data["id"] - - # don't run for non person objects - if obj_data.get("label") != "person": - logger.debug("Not a processing face for non person object.") - return False - - # don't overwrite sub label for objects that have a sub label - # that is not a face - if obj_data.get("sub_label") and id not in self.detected_faces: - logger.debug( - f"Not processing face due to existing sub label: {obj_data.get('sub_label')}." - ) - return False - - face: Optional[dict[str, any]] = None - - if self.requires_face_detection: - logger.debug("Running manual face detection.") - person_box = obj_data.get("box") - - if not person_box: - return False - - rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) - left, top, right, bottom = person_box - person = rgb[top:bottom, left:right] - face_box = self._detect_face(person) - - if not face_box: - logger.debug("Detected no faces for person object.") - return False - - face_frame = person[ - max(0, face_box[1]) : min(frame.shape[0], face_box[3]), - max(0, face_box[0]) : min(frame.shape[1], face_box[2]), - ] - face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR) - else: - # don't run for object without attributes - if not obj_data.get("current_attributes"): - logger.debug("No attributes to parse.") - return False - - attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) - for attr in attributes: - if attr.get("label") != "face": - continue - - if face is None or attr.get("score", 0.0) > face.get("score", 0.0): - face = attr - - # no faces detected in this frame - if not face: - return False - - face_box = face.get("box") - - # check that face is valid - if not face_box or area(face_box) < self.config.face_recognition.min_area: - logger.debug(f"Invalid face box {face}") - return False - - face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) - - face_frame = face_frame[ - max(0, face_box[1]) : min(frame.shape[0], face_box[3]), - max(0, face_box[0]) : min(frame.shape[1], face_box[2]), - ] - - res = self.face_classifier.classify_face(face_frame) - - if not res: - return False - - sub_label, score = res - - # calculate the overall face score as the probability * area of face - # this will help to reduce false positives from small side-angle faces - # if a large front-on face image may have scored slightly lower but - # is more likely to be accurate due to the larger face area - face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2) - - logger.debug( - f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}" - ) - - if self.config.face_recognition.save_attempts: - # write face to library - folder = os.path.join(FACE_DIR, "train") - file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp") - os.makedirs(folder, exist_ok=True) - cv2.imwrite(file, face_frame) - - if score < self.config.face_recognition.threshold: - logger.debug( - f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}" - ) - return True - - if id in self.detected_faces and face_score <= self.detected_faces[id]: - logger.debug( - f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})." - ) - return True - - resp = requests.post( - f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", - json={ - "camera": obj_data.get("camera"), - "subLabel": sub_label, - "subLabelScore": score, - }, - ) - - if resp.status_code == 200: - self.detected_faces[id] = face_score - - return True - def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: """Return the dimensions of the input image as [x, y, width, height].""" height, width = input.shape[:2] diff --git a/frigate/postprocessing/face_processor.py b/frigate/postprocessing/face_processor.py new file mode 100644 index 000000000..a75158eb2 --- /dev/null +++ b/frigate/postprocessing/face_processor.py @@ -0,0 +1,398 @@ +"""Handle processing images for face detection and recognition.""" + +import base64 +import datetime +import logging +import os +import random +import string +from typing import Optional + +import cv2 +import numpy as np +import requests + +from frigate.config import FrigateConfig +from frigate.const import FACE_DIR, FRIGATE_LOCALHOST, MODEL_CACHE_DIR +from frigate.util.image import area + +from .processor_api import ProcessorApi +from .types import PostProcessingMetrics + +logger = logging.getLogger(__name__) + + +MIN_MATCHING_FACES = 2 + + +class FaceProcessor(ProcessorApi): + def __init__(self, config: FrigateConfig, metrics: PostProcessingMetrics): + super().__init__(config, metrics) + self.face_config = config.face_recognition + self.face_detector: cv2.FaceDetectorYN = None + self.landmark_detector: cv2.face.FacemarkLBF = None + self.face_recognizer: cv2.face.LBPHFaceRecognizer = None + self.requires_face_detection = "face" not in self.config.objects.all_objects + self.detected_faces: dict[str, float] = {} + + download_path = os.path.join(MODEL_CACHE_DIR, "facedet") + self.model_files = { + "facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx", + "landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml", + } + + if not all( + os.path.exists(os.path.join(download_path, n)) + for n in self.model_files.keys() + ): + # conditionally import ModelDownloader + from frigate.util.downloader import ModelDownloader + + self.downloader = ModelDownloader( + model_name="facedet", + download_path=download_path, + file_names=self.model_files.keys(), + download_func=self.__download_models, + complete_func=self.__build_detector, + ) + self.downloader.ensure_model_files() + else: + self.__build_detector() + + self.label_map: dict[int, str] = {} + self.__build_classifier() + + def __download_models(self, path: str) -> None: + try: + file_name = os.path.basename(path) + # conditionally import ModelDownloader + from frigate.util.downloader import ModelDownloader + + ModelDownloader.download_from_url(self.model_files[file_name], path) + except Exception as e: + logger.error(f"Failed to download {path}: {e}") + + def __build_detector(self) -> None: + self.face_detector = cv2.FaceDetectorYN.create( + "/config/model_cache/facedet/facedet.onnx", + config="", + input_size=(320, 320), + score_threshold=0.8, + nms_threshold=0.3, + ) + self.landmark_detector = cv2.face.createFacemarkLBF() + self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml") + + def __build_classifier(self) -> None: + if not self.landmark_detector: + return None + + labels = [] + faces = [] + + dir = "/media/frigate/clips/faces" + for idx, name in enumerate(os.listdir(dir)): + if name == "train": + continue + + face_folder = os.path.join(dir, name) + + if not os.path.isdir(face_folder): + continue + + self.label_map[idx] = name + for image in os.listdir(face_folder): + img = cv2.imread(os.path.join(face_folder, image)) + + if img is None: + continue + + img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + img = self.__align_face(img, img.shape[1], img.shape[0]) + faces.append(img) + labels.append(idx) + + self.recognizer: cv2.face.LBPHFaceRecognizer = ( + cv2.face.LBPHFaceRecognizer_create( + radius=2, threshold=(1 - self.face_config.min_score) * 1000 + ) + ) + self.recognizer.train(faces, np.array(labels)) + + def __align_face( + self, + image: np.ndarray, + output_width: int, + output_height: int, + ) -> np.ndarray: + _, lands = self.landmark_detector.fit( + image, np.array([(0, 0, image.shape[1], image.shape[0])]) + ) + landmarks: np.ndarray = lands[0][0] + + # get landmarks for eyes + leftEyePts = landmarks[42:48] + rightEyePts = landmarks[36:42] + + # compute the center of mass for each eye + leftEyeCenter = leftEyePts.mean(axis=0).astype("int") + rightEyeCenter = rightEyePts.mean(axis=0).astype("int") + + # compute the angle between the eye centroids + dY = rightEyeCenter[1] - leftEyeCenter[1] + dX = rightEyeCenter[0] - leftEyeCenter[0] + angle = np.degrees(np.arctan2(dY, dX)) - 180 + + # compute the desired right eye x-coordinate based on the + # desired x-coordinate of the left eye + desiredRightEyeX = 1.0 - 0.35 + + # determine the scale of the new resulting image by taking + # the ratio of the distance between eyes in the *current* + # image to the ratio of distance between eyes in the + # *desired* image + dist = np.sqrt((dX**2) + (dY**2)) + desiredDist = desiredRightEyeX - 0.35 + desiredDist *= output_width + scale = desiredDist / dist + + # compute center (x, y)-coordinates (i.e., the median point) + # between the two eyes in the input image + # grab the rotation matrix for rotating and scaling the face + eyesCenter = ( + int((leftEyeCenter[0] + rightEyeCenter[0]) // 2), + int((leftEyeCenter[1] + rightEyeCenter[1]) // 2), + ) + M = cv2.getRotationMatrix2D(eyesCenter, angle, scale) + + # update the translation component of the matrix + tX = output_width * 0.5 + tY = output_height * 0.35 + M[0, 2] += tX - eyesCenter[0] + M[1, 2] += tY - eyesCenter[1] + + # apply the affine transformation + return cv2.warpAffine( + image, M, (output_width, output_height), flags=cv2.INTER_CUBIC + ) + + def __clear_classifier(self) -> None: + self.face_recognizer = None + self.label_map = {} + + def __detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]: + """Detect faces in input image.""" + if not self.face_detector: + return None + + self.face_detector.setInputSize((input.shape[1], input.shape[0])) + faces = self.face_detector.detect(input) + + if faces is None or faces[1] is None: + return None + + face = None + + for _, potential_face in enumerate(faces[1]): + raw_bbox = potential_face[0:4].astype(np.uint16) + x: int = max(raw_bbox[0], 0) + y: int = max(raw_bbox[1], 0) + w: int = raw_bbox[2] + h: int = raw_bbox[3] + bbox = (x, y, x + w, y + h) + + if face is None or area(bbox) > area(face): + face = bbox + + return face + + def __classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None: + if not self.landmark_detector: + return None + + if not self.label_map: + self.__build_classifier() + + img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY) + img = self.__align_face(img, img.shape[1], img.shape[0]) + index, distance = self.recognizer.predict(img) + + if index == -1: + return None + + score = 1.0 - (distance / 1000) + return self.label_map[index], round(score, 2) + + def __update_metrics(self, duration: float) -> None: + self.metrics.face_rec_fps.value = ( + self.metrics.face_rec_fps.value * 9 + duration + ) / 10 + + def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): + """Look for faces in image.""" + start = datetime.datetime.now().timestamp() + id = obj_data["id"] + + # don't run for non person objects + if obj_data.get("label") != "person": + logger.debug("Not a processing face for non person object.") + return + + # don't overwrite sub label for objects that have a sub label + # that is not a face + if obj_data.get("sub_label") and id not in self.detected_faces: + logger.debug( + f"Not processing face due to existing sub label: {obj_data.get('sub_label')}." + ) + return + + face: Optional[dict[str, any]] = None + + if self.requires_face_detection: + logger.debug("Running manual face detection.") + person_box = obj_data.get("box") + + if not person_box: + return + + rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) + left, top, right, bottom = person_box + person = rgb[top:bottom, left:right] + face_box = self.__detect_face(person) + + if not face_box: + logger.debug("Detected no faces for person object.") + return + + face_frame = person[ + max(0, face_box[1]) : min(frame.shape[0], face_box[3]), + max(0, face_box[0]) : min(frame.shape[1], face_box[2]), + ] + face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR) + else: + # don't run for object without attributes + if not obj_data.get("current_attributes"): + logger.debug("No attributes to parse.") + return + + attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) + for attr in attributes: + if attr.get("label") != "face": + continue + + if face is None or attr.get("score", 0.0) > face.get("score", 0.0): + face = attr + + # no faces detected in this frame + if not face: + return + + face_box = face.get("box") + + # check that face is valid + if not face_box or area(face_box) < self.config.face_recognition.min_area: + logger.debug(f"Invalid face box {face}") + return + + face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) + + face_frame = face_frame[ + max(0, face_box[1]) : min(frame.shape[0], face_box[3]), + max(0, face_box[0]) : min(frame.shape[1], face_box[2]), + ] + + res = self.__classify_face(face_frame) + + if not res: + return + + sub_label, score = res + + # calculate the overall face score as the probability * area of face + # this will help to reduce false positives from small side-angle faces + # if a large front-on face image may have scored slightly lower but + # is more likely to be accurate due to the larger face area + face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2) + + logger.debug( + f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}" + ) + + if self.config.face_recognition.save_attempts: + # write face to library + folder = os.path.join(FACE_DIR, "train") + file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp") + os.makedirs(folder, exist_ok=True) + cv2.imwrite(file, face_frame) + + if score < self.config.face_recognition.threshold: + logger.debug( + f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}" + ) + self.__update_metrics(datetime.datetime.now().timestamp() - start) + return + + if id in self.detected_faces and face_score <= self.detected_faces[id]: + logger.debug( + f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})." + ) + self.__update_metrics(datetime.datetime.now().timestamp() - start) + return + + resp = requests.post( + f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", + json={ + "camera": obj_data.get("camera"), + "subLabel": sub_label, + "subLabelScore": score, + }, + ) + + if resp.status_code == 200: + self.detected_faces[id] = face_score + + self.__update_metrics(datetime.datetime.now().timestamp() - start) + + def handle_request(self, request_data) -> dict[str, any] | None: + rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) + label = request_data["face_name"] + id = f"{label}-{rand_id}" + + if request_data.get("cropped"): + thumbnail = request_data["image"] + else: + img = cv2.imdecode( + np.frombuffer(base64.b64decode(request_data["image"]), dtype=np.uint8), + cv2.IMREAD_COLOR, + ) + face_box = self.__detect_face(img) + + if not face_box: + return { + "message": "No face was detected.", + "success": False, + } + + face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]] + ret, thumbnail = cv2.imencode( + ".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100] + ) + + # write face to library + folder = os.path.join(FACE_DIR, label) + file = os.path.join(folder, f"{id}.webp") + os.makedirs(folder, exist_ok=True) + + # save face image + with open(file, "wb") as output: + output.write(thumbnail.tobytes()) + + self.__clear_classifier() + return { + "message": "Successfully registered face.", + "success": True, + } + + def expire_object(self, object_id: str): + if object_id in self.detected_faces: + self.detected_faces.pop(object_id) diff --git a/frigate/postprocessing/processor_api.py b/frigate/postprocessing/processor_api.py new file mode 100644 index 000000000..974b6f1ee --- /dev/null +++ b/frigate/postprocessing/processor_api.py @@ -0,0 +1,52 @@ +import logging +from abc import ABC, abstractmethod + +import numpy as np + +from frigate.config import FrigateConfig + +from .types import PostProcessingMetrics + +logger = logging.getLogger(__name__) + + +class ProcessorApi(ABC): + @abstractmethod + def __init__(self, config: FrigateConfig, metrics: PostProcessingMetrics) -> None: + self.config = config + self.metrics = metrics + pass + + @abstractmethod + def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None: + """Processes the frame with object data. + Args: + obj_data (dict): containing data about focused object in frame. + frame (ndarray): full yuv frame. + + Returns: + None. + """ + pass + + @abstractmethod + def handle_request(self, request_data: dict[str, any]) -> dict[str, any] | None: + """Handle metadata requests. + Args: + request_data (dict): containing data about requested change to process. + + Returns: + None if request was not handled, otherwise return response. + """ + pass + + @abstractmethod + def expire_object(self, object_id: str) -> None: + """Handle objects that are no longer detected. + Args: + object_id (str): id of object that is no longer detected. + + Returns: + None. + """ + pass diff --git a/frigate/embeddings/types.py b/frigate/postprocessing/types.py similarity index 94% rename from frigate/embeddings/types.py rename to frigate/postprocessing/types.py index bd994246c..464658219 100644 --- a/frigate/embeddings/types.py +++ b/frigate/postprocessing/types.py @@ -4,7 +4,7 @@ import multiprocessing as mp from multiprocessing.sharedctypes import Synchronized -class EmbeddingsMetrics: +class PostProcessingMetrics: image_embeddings_fps: Synchronized text_embeddings_sps: Synchronized face_rec_fps: Synchronized diff --git a/frigate/stats/util.py b/frigate/stats/util.py index d62ac2ee4..ec1bc0683 100644 --- a/frigate/stats/util.py +++ b/frigate/stats/util.py @@ -14,8 +14,8 @@ from requests.exceptions import RequestException from frigate.camera import CameraMetrics from frigate.config import FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR -from frigate.embeddings.types import EmbeddingsMetrics from frigate.object_detection import ObjectDetectProcess +from frigate.postprocessing.types import PostProcessingMetrics from frigate.types import StatsTrackingTypes from frigate.util.services import ( get_amd_gpu_stats, @@ -52,7 +52,7 @@ def get_latest_version(config: FrigateConfig) -> str: def stats_init( config: FrigateConfig, camera_metrics: dict[str, CameraMetrics], - embeddings_metrics: EmbeddingsMetrics | None, + embeddings_metrics: PostProcessingMetrics | None, detectors: dict[str, ObjectDetectProcess], processes: dict[str, int], ) -> StatsTrackingTypes: diff --git a/frigate/types.py b/frigate/types.py index 7c32646cc..f375430e2 100644 --- a/frigate/types.py +++ b/frigate/types.py @@ -2,13 +2,13 @@ from enum import Enum from typing import TypedDict from frigate.camera import CameraMetrics -from frigate.embeddings.types import EmbeddingsMetrics from frigate.object_detection import ObjectDetectProcess +from frigate.postprocessing.types import PostProcessingMetrics class StatsTrackingTypes(TypedDict): camera_metrics: dict[str, CameraMetrics] - embeddings_metrics: EmbeddingsMetrics | None + embeddings_metrics: PostProcessingMetrics | None detectors: dict[str, ObjectDetectProcess] started: int latest_frigate_version: str diff --git a/frigate/util/model.py b/frigate/util/model.py index 0e990426a..ce2c9538c 100644 --- a/frigate/util/model.py +++ b/frigate/util/model.py @@ -4,13 +4,7 @@ import logging import os from typing import Any -import cv2 -import numpy as np import onnxruntime as ort -from playhouse.sqliteq import SqliteQueueDatabase - -from frigate.config.semantic_search import FaceRecognitionConfig -from frigate.const import MODEL_CACHE_DIR try: import openvino as ov @@ -21,9 +15,6 @@ except ImportError: logger = logging.getLogger(__name__) -MIN_MATCHING_FACES = 2 - - def get_ort_providers( force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False ) -> tuple[list[str], list[dict[str, any]]]: @@ -157,181 +148,3 @@ class ONNXModelRunner: return [infer_request.get_output_tensor().data] elif self.type == "ort": return self.ort.run(None, input) - - -class FaceClassificationModel: - def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase): - self.config = config - self.db = db - self.face_detector: cv2.FaceDetectorYN = None - self.landmark_detector: cv2.face.FacemarkLBF = None - self.face_recognizer: cv2.face.LBPHFaceRecognizer = None - - download_path = os.path.join(MODEL_CACHE_DIR, "facedet") - self.model_files = { - "facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx", - "landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml", - } - - if not all( - os.path.exists(os.path.join(download_path, n)) - for n in self.model_files.keys() - ): - # conditionally import ModelDownloader - from frigate.util.downloader import ModelDownloader - - self.downloader = ModelDownloader( - model_name="facedet", - download_path=download_path, - file_names=self.model_files.keys(), - download_func=self.__download_models, - complete_func=self.__build_detector, - ) - self.downloader.ensure_model_files() - else: - self.__build_detector() - - self.label_map: dict[int, str] = {} - self.__build_classifier() - - def __download_models(self, path: str) -> None: - try: - file_name = os.path.basename(path) - # conditionally import ModelDownloader - from frigate.util.downloader import ModelDownloader - - ModelDownloader.download_from_url(self.model_files[file_name], path) - except Exception as e: - logger.error(f"Failed to download {path}: {e}") - - def __build_detector(self) -> None: - self.face_detector = cv2.FaceDetectorYN.create( - "/config/model_cache/facedet/facedet.onnx", - config="", - input_size=(320, 320), - score_threshold=0.8, - nms_threshold=0.3, - ) - self.landmark_detector = cv2.face.createFacemarkLBF() - self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml") - - def __build_classifier(self) -> None: - if not self.landmark_detector: - return None - - labels = [] - faces = [] - - dir = "/media/frigate/clips/faces" - for idx, name in enumerate(os.listdir(dir)): - if name == "train": - continue - - face_folder = os.path.join(dir, name) - - if not os.path.isdir(face_folder): - continue - - self.label_map[idx] = name - for image in os.listdir(face_folder): - img = cv2.imread(os.path.join(face_folder, image)) - - if img is None: - continue - - img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - img = self.__align_face(img, img.shape[1], img.shape[0]) - faces.append(img) - labels.append(idx) - - self.recognizer: cv2.face.LBPHFaceRecognizer = ( - cv2.face.LBPHFaceRecognizer_create( - radius=2, threshold=(1 - self.config.min_score) * 1000 - ) - ) - self.recognizer.train(faces, np.array(labels)) - - def __align_face( - self, - image: np.ndarray, - output_width: int, - output_height: int, - ) -> np.ndarray: - _, lands = self.landmark_detector.fit( - image, np.array([(0, 0, image.shape[1], image.shape[0])]) - ) - landmarks = lands[0][0] - - # get landmarks for eyes - leftEyePts = landmarks[42:48] - rightEyePts = landmarks[36:42] - - # compute the center of mass for each eye - leftEyeCenter = leftEyePts.mean(axis=0).astype("int") - rightEyeCenter = rightEyePts.mean(axis=0).astype("int") - - # compute the angle between the eye centroids - dY = rightEyeCenter[1] - leftEyeCenter[1] - dX = rightEyeCenter[0] - leftEyeCenter[0] - angle = np.degrees(np.arctan2(dY, dX)) - 180 - - # compute the desired right eye x-coordinate based on the - # desired x-coordinate of the left eye - desiredRightEyeX = 1.0 - 0.35 - - # determine the scale of the new resulting image by taking - # the ratio of the distance between eyes in the *current* - # image to the ratio of distance between eyes in the - # *desired* image - dist = np.sqrt((dX**2) + (dY**2)) - desiredDist = desiredRightEyeX - 0.35 - desiredDist *= output_width - scale = desiredDist / dist - - # compute center (x, y)-coordinates (i.e., the median point) - # between the two eyes in the input image - # grab the rotation matrix for rotating and scaling the face - eyesCenter = ( - int((leftEyeCenter[0] + rightEyeCenter[0]) // 2), - int((leftEyeCenter[1] + rightEyeCenter[1]) // 2), - ) - M = cv2.getRotationMatrix2D(eyesCenter, angle, scale) - - # update the translation component of the matrix - tX = output_width * 0.5 - tY = output_height * 0.35 - M[0, 2] += tX - eyesCenter[0] - M[1, 2] += tY - eyesCenter[1] - - # apply the affine transformation - return cv2.warpAffine( - image, M, (output_width, output_height), flags=cv2.INTER_CUBIC - ) - - def clear_classifier(self) -> None: - self.face_recognizer = None - self.label_map = {} - - def detect_faces(self, input: np.ndarray) -> tuple[int, cv2.typing.MatLike] | None: - if not self.face_detector: - return None - - self.face_detector.setInputSize((input.shape[1], input.shape[0])) - return self.face_detector.detect(input) - - def classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None: - if not self.landmark_detector: - return None - - if not self.label_map: - self.__build_classifier() - - img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY) - img = self.__align_face(img, img.shape[1], img.shape[0]) - index, distance = self.recognizer.predict(img) - - if index == -1: - return None - - score = 1.0 - (distance / 1000) - return self.label_map[index], round(score, 2)