mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	Generalize postprocessing (#15931)
* Actually send result to face registration * Define postprocessing api and move face processing to fit * Standardize request handling * Standardize handling of processors * Rename processing metrics * Cleanup * Standardize object end * Update to newer formatting * One more * One more
This commit is contained in:
		
							parent
							
								
									8430d5626a
								
							
						
					
					
						commit
						eed292c73e
					
				| @ -87,7 +87,7 @@ def main() -> None: | ||||
|             if current != full_config: | ||||
|                 print(f"Line #  : {line_number}") | ||||
|                 print(f"Key     : {' -> '.join(map(str, error_path))}") | ||||
|                 print(f"Value   : {error.get('input','-')}") | ||||
|                 print(f"Value   : {error.get('input', '-')}") | ||||
|             print(f"Message : {error.get('msg', error.get('type', 'Unknown'))}\n") | ||||
| 
 | ||||
|         print("*************************************************************") | ||||
|  | ||||
| @ -39,16 +39,28 @@ def get_faces(): | ||||
| 
 | ||||
| @router.post("/faces/{name}") | ||||
| async def register_face(request: Request, name: str, file: UploadFile): | ||||
|     context: EmbeddingsContext = request.app.embeddings | ||||
|     context.register_face(name, await file.read()) | ||||
|     if not request.app.frigate_config.face_recognition.enabled: | ||||
|         return JSONResponse( | ||||
|         status_code=200, | ||||
|         content={"success": True, "message": "Successfully registered face."}, | ||||
|             status_code=400, | ||||
|             content={"message": "Face recognition is not enabled.", "success": False}, | ||||
|         ) | ||||
| 
 | ||||
|     context: EmbeddingsContext = request.app.embeddings | ||||
|     result = context.register_face(name, await file.read()) | ||||
|     return JSONResponse( | ||||
|         status_code=200 if result.get("success", True) else 400, | ||||
|         content=result, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| @router.post("/faces/train/{name}/classify") | ||||
| def train_face(name: str, body: dict = None): | ||||
| def train_face(request: Request, name: str, body: dict = None): | ||||
|     if not request.app.frigate_config.face_recognition.enabled: | ||||
|         return JSONResponse( | ||||
|             status_code=400, | ||||
|             content={"message": "Face recognition is not enabled.", "success": False}, | ||||
|         ) | ||||
| 
 | ||||
|     json: dict[str, any] = body or {} | ||||
|     training_file = os.path.join( | ||||
|         FACE_DIR, f"train/{sanitize_filename(json.get('training_file', ''))}" | ||||
| @ -82,6 +94,12 @@ def train_face(name: str, body: dict = None): | ||||
| 
 | ||||
| @router.post("/faces/{name}/delete") | ||||
| def deregister_faces(request: Request, name: str, body: dict = None): | ||||
|     if not request.app.frigate_config.face_recognition.enabled: | ||||
|         return JSONResponse( | ||||
|             status_code=400, | ||||
|             content={"message": "Face recognition is not enabled.", "success": False}, | ||||
|         ) | ||||
| 
 | ||||
|     json: dict[str, any] = body or {} | ||||
|     list_of_ids = json.get("ids", "") | ||||
| 
 | ||||
|  | ||||
| @ -41,7 +41,6 @@ from frigate.const import ( | ||||
| ) | ||||
| from frigate.db.sqlitevecq import SqliteVecQueueDatabase | ||||
| from frigate.embeddings import EmbeddingsContext, manage_embeddings | ||||
| from frigate.embeddings.types import EmbeddingsMetrics | ||||
| from frigate.events.audio import AudioProcessor | ||||
| from frigate.events.cleanup import EventCleanup | ||||
| from frigate.events.external import ExternalEventProcessor | ||||
| @ -60,6 +59,7 @@ from frigate.models import ( | ||||
| from frigate.object_detection import ObjectDetectProcess | ||||
| from frigate.object_processing import TrackedObjectProcessor | ||||
| from frigate.output.output import output_frames | ||||
| from frigate.postprocessing.types import PostProcessingMetrics | ||||
| from frigate.ptz.autotrack import PtzAutoTrackerThread | ||||
| from frigate.ptz.onvif import OnvifController | ||||
| from frigate.record.cleanup import RecordingCleanup | ||||
| @ -90,8 +90,8 @@ class FrigateApp: | ||||
|         self.detection_shms: list[mp.shared_memory.SharedMemory] = [] | ||||
|         self.log_queue: Queue = mp.Queue() | ||||
|         self.camera_metrics: dict[str, CameraMetrics] = {} | ||||
|         self.embeddings_metrics: EmbeddingsMetrics | None = ( | ||||
|             EmbeddingsMetrics() if config.semantic_search.enabled else None | ||||
|         self.embeddings_metrics: PostProcessingMetrics | None = ( | ||||
|             PostProcessingMetrics() if config.semantic_search.enabled else None | ||||
|         ) | ||||
|         self.ptz_metrics: dict[str, PTZMetrics] = {} | ||||
|         self.processes: dict[str, int] = {} | ||||
|  | ||||
| @ -20,14 +20,14 @@ from frigate.models import Event | ||||
| from frigate.util.builtin import serialize | ||||
| from frigate.util.services import listen | ||||
| 
 | ||||
| from ..postprocessing.types import PostProcessingMetrics | ||||
| from .maintainer import EmbeddingMaintainer | ||||
| from .types import EmbeddingsMetrics | ||||
| from .util import ZScoreNormalization | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| def manage_embeddings(config: FrigateConfig, metrics: EmbeddingsMetrics) -> None: | ||||
| def manage_embeddings(config: FrigateConfig, metrics: PostProcessingMetrics) -> None: | ||||
|     # Only initialize embeddings if semantic search is enabled | ||||
|     if not config.semantic_search.enabled: | ||||
|         return | ||||
| @ -192,8 +192,8 @@ class EmbeddingsContext: | ||||
| 
 | ||||
|         return results | ||||
| 
 | ||||
|     def register_face(self, face_name: str, image_data: bytes) -> None: | ||||
|         self.requestor.send_data( | ||||
|     def register_face(self, face_name: str, image_data: bytes) -> dict[str, any]: | ||||
|         return self.requestor.send_data( | ||||
|             EmbeddingsRequestEnum.register_face.value, | ||||
|             { | ||||
|                 "face_name": face_name, | ||||
|  | ||||
| @ -21,8 +21,8 @@ from frigate.models import Event | ||||
| from frigate.types import ModelStatusTypesEnum | ||||
| from frigate.util.builtin import serialize | ||||
| 
 | ||||
| from ..postprocessing.types import PostProcessingMetrics | ||||
| from .functions.onnx import GenericONNXEmbedding, ModelTypeEnum | ||||
| from .types import EmbeddingsMetrics | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| @ -65,7 +65,7 @@ class Embeddings: | ||||
|         self, | ||||
|         config: FrigateConfig, | ||||
|         db: SqliteVecQueueDatabase, | ||||
|         metrics: EmbeddingsMetrics, | ||||
|         metrics: PostProcessingMetrics, | ||||
|     ) -> None: | ||||
|         self.config = config | ||||
|         self.db = db | ||||
|  | ||||
| @ -4,9 +4,7 @@ import base64 | ||||
| import datetime | ||||
| import logging | ||||
| import os | ||||
| import random | ||||
| import re | ||||
| import string | ||||
| import threading | ||||
| from multiprocessing.synchronize import Event as MpEvent | ||||
| from pathlib import Path | ||||
| @ -28,7 +26,6 @@ from frigate.comms.inter_process import InterProcessRequestor | ||||
| from frigate.config import FrigateConfig | ||||
| from frigate.const import ( | ||||
|     CLIPS_DIR, | ||||
|     FACE_DIR, | ||||
|     FRIGATE_LOCALHOST, | ||||
|     UPDATE_EVENT_DESCRIPTION, | ||||
| ) | ||||
| @ -36,13 +33,14 @@ from frigate.embeddings.lpr.lpr import LicensePlateRecognition | ||||
| from frigate.events.types import EventTypeEnum | ||||
| from frigate.genai import get_genai_client | ||||
| from frigate.models import Event | ||||
| from frigate.postprocessing.face_processor import FaceProcessor | ||||
| from frigate.postprocessing.processor_api import ProcessorApi | ||||
| from frigate.types import TrackedObjectUpdateTypesEnum | ||||
| from frigate.util.builtin import serialize | ||||
| from frigate.util.image import SharedMemoryFrameManager, area, calculate_region | ||||
| from frigate.util.model import FaceClassificationModel | ||||
| 
 | ||||
| from ..postprocessing.types import PostProcessingMetrics | ||||
| from .embeddings import Embeddings | ||||
| from .types import EmbeddingsMetrics | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| @ -56,7 +54,7 @@ class EmbeddingMaintainer(threading.Thread): | ||||
|         self, | ||||
|         db: SqliteQueueDatabase, | ||||
|         config: FrigateConfig, | ||||
|         metrics: EmbeddingsMetrics, | ||||
|         metrics: PostProcessingMetrics, | ||||
|         stop_event: MpEvent, | ||||
|     ) -> None: | ||||
|         super().__init__(name="embeddings_maintainer") | ||||
| @ -75,16 +73,10 @@ class EmbeddingMaintainer(threading.Thread): | ||||
|         ) | ||||
|         self.embeddings_responder = EmbeddingsResponder() | ||||
|         self.frame_manager = SharedMemoryFrameManager() | ||||
|         self.processors: list[ProcessorApi] = [] | ||||
| 
 | ||||
|         # set face recognition conditions | ||||
|         self.face_recognition_enabled = self.config.face_recognition.enabled | ||||
|         self.requires_face_detection = "face" not in self.config.objects.all_objects | ||||
|         self.detected_faces: dict[str, float] = {} | ||||
|         self.face_classifier = ( | ||||
|             FaceClassificationModel(self.config.face_recognition, db) | ||||
|             if self.face_recognition_enabled | ||||
|             else None | ||||
|         ) | ||||
|         if self.config.face_recognition.enabled: | ||||
|             self.processors.append(FaceProcessor(self.config, metrics)) | ||||
| 
 | ||||
|         # create communication for updating event descriptions | ||||
|         self.requestor = InterProcessRequestor() | ||||
| @ -142,46 +134,12 @@ class EmbeddingMaintainer(threading.Thread): | ||||
|                         self.embeddings.embed_description("", data, upsert=False), | ||||
|                         pack=False, | ||||
|                     ) | ||||
|                 elif topic == EmbeddingsRequestEnum.register_face.value: | ||||
|                     if not self.face_recognition_enabled: | ||||
|                         return False | ||||
| 
 | ||||
|                     rand_id = "".join( | ||||
|                         random.choices(string.ascii_lowercase + string.digits, k=6) | ||||
|                     ) | ||||
|                     label = data["face_name"] | ||||
|                     id = f"{label}-{rand_id}" | ||||
| 
 | ||||
|                     if data.get("cropped"): | ||||
|                         pass | ||||
|                 else: | ||||
|                         img = cv2.imdecode( | ||||
|                             np.frombuffer( | ||||
|                                 base64.b64decode(data["image"]), dtype=np.uint8 | ||||
|                             ), | ||||
|                             cv2.IMREAD_COLOR, | ||||
|                         ) | ||||
|                         face_box = self._detect_face(img) | ||||
|                     for processor in self.processors: | ||||
|                         resp = processor.handle_request(data) | ||||
| 
 | ||||
|                         if not face_box: | ||||
|                             return False | ||||
| 
 | ||||
|                         face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]] | ||||
|                         ret, thumbnail = cv2.imencode( | ||||
|                             ".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100] | ||||
|                         ) | ||||
| 
 | ||||
|                     # write face to library | ||||
|                     folder = os.path.join(FACE_DIR, label) | ||||
|                     file = os.path.join(folder, f"{id}.webp") | ||||
|                     os.makedirs(folder, exist_ok=True) | ||||
| 
 | ||||
|                     # save face image | ||||
|                     with open(file, "wb") as output: | ||||
|                         output.write(thumbnail.tobytes()) | ||||
| 
 | ||||
|                 self.face_classifier.clear_classifier() | ||||
|                 return True | ||||
|                         if resp is not None: | ||||
|                             return resp | ||||
|             except Exception as e: | ||||
|                 logger.error(f"Unable to handle embeddings request {e}") | ||||
| 
 | ||||
| @ -204,8 +162,8 @@ class EmbeddingMaintainer(threading.Thread): | ||||
|         # no need to process updated objects if face recognition, lpr, genai are disabled | ||||
|         if ( | ||||
|             not camera_config.genai.enabled | ||||
|             and not self.face_recognition_enabled | ||||
|             and not self.lpr_config.enabled | ||||
|             and len(self.processors) == 0 | ||||
|         ): | ||||
|             return | ||||
| 
 | ||||
| @ -223,15 +181,8 @@ class EmbeddingMaintainer(threading.Thread): | ||||
|             ) | ||||
|             return | ||||
| 
 | ||||
|         if self.face_recognition_enabled: | ||||
|             start = datetime.datetime.now().timestamp() | ||||
|             processed = self._process_face(data, yuv_frame) | ||||
| 
 | ||||
|             if processed: | ||||
|                 duration = datetime.datetime.now().timestamp() - start | ||||
|                 self.metrics.face_rec_fps.value = ( | ||||
|                     self.metrics.face_rec_fps.value * 9 + duration | ||||
|                 ) / 10 | ||||
|         for processor in self.processors: | ||||
|             processor.process_frame(data, yuv_frame) | ||||
| 
 | ||||
|         if self.lpr_config.enabled: | ||||
|             start = datetime.datetime.now().timestamp() | ||||
| @ -271,8 +222,8 @@ class EmbeddingMaintainer(threading.Thread): | ||||
|             event_id, camera, updated_db = ended | ||||
|             camera_config = self.config.cameras[camera] | ||||
| 
 | ||||
|             if event_id in self.detected_faces: | ||||
|                 self.detected_faces.pop(event_id) | ||||
|             for processor in self.processors: | ||||
|                 processor.expire_object(event_id) | ||||
| 
 | ||||
|             if event_id in self.detected_license_plates: | ||||
|                 self.detected_license_plates.pop(event_id) | ||||
| @ -399,150 +350,6 @@ class EmbeddingMaintainer(threading.Thread): | ||||
|         if event_id: | ||||
|             self.handle_regenerate_description(event_id, source) | ||||
| 
 | ||||
|     def _detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]: | ||||
|         """Detect faces in input image.""" | ||||
|         faces = self.face_classifier.detect_faces(input) | ||||
| 
 | ||||
|         if faces is None or faces[1] is None: | ||||
|             return None | ||||
| 
 | ||||
|         face = None | ||||
| 
 | ||||
|         for _, potential_face in enumerate(faces[1]): | ||||
|             raw_bbox = potential_face[0:4].astype(np.uint16) | ||||
|             x: int = max(raw_bbox[0], 0) | ||||
|             y: int = max(raw_bbox[1], 0) | ||||
|             w: int = raw_bbox[2] | ||||
|             h: int = raw_bbox[3] | ||||
|             bbox = (x, y, x + w, y + h) | ||||
| 
 | ||||
|             if face is None or area(bbox) > area(face): | ||||
|                 face = bbox | ||||
| 
 | ||||
|         return face | ||||
| 
 | ||||
|     def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> bool: | ||||
|         """Look for faces in image.""" | ||||
|         id = obj_data["id"] | ||||
| 
 | ||||
|         # don't run for non person objects | ||||
|         if obj_data.get("label") != "person": | ||||
|             logger.debug("Not a processing face for non person object.") | ||||
|             return False | ||||
| 
 | ||||
|         # don't overwrite sub label for objects that have a sub label | ||||
|         # that is not a face | ||||
|         if obj_data.get("sub_label") and id not in self.detected_faces: | ||||
|             logger.debug( | ||||
|                 f"Not processing face due to existing sub label: {obj_data.get('sub_label')}." | ||||
|             ) | ||||
|             return False | ||||
| 
 | ||||
|         face: Optional[dict[str, any]] = None | ||||
| 
 | ||||
|         if self.requires_face_detection: | ||||
|             logger.debug("Running manual face detection.") | ||||
|             person_box = obj_data.get("box") | ||||
| 
 | ||||
|             if not person_box: | ||||
|                 return False | ||||
| 
 | ||||
|             rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) | ||||
|             left, top, right, bottom = person_box | ||||
|             person = rgb[top:bottom, left:right] | ||||
|             face_box = self._detect_face(person) | ||||
| 
 | ||||
|             if not face_box: | ||||
|                 logger.debug("Detected no faces for person object.") | ||||
|                 return False | ||||
| 
 | ||||
|             face_frame = person[ | ||||
|                 max(0, face_box[1]) : min(frame.shape[0], face_box[3]), | ||||
|                 max(0, face_box[0]) : min(frame.shape[1], face_box[2]), | ||||
|             ] | ||||
|             face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR) | ||||
|         else: | ||||
|             # don't run for object without attributes | ||||
|             if not obj_data.get("current_attributes"): | ||||
|                 logger.debug("No attributes to parse.") | ||||
|                 return False | ||||
| 
 | ||||
|             attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) | ||||
|             for attr in attributes: | ||||
|                 if attr.get("label") != "face": | ||||
|                     continue | ||||
| 
 | ||||
|                 if face is None or attr.get("score", 0.0) > face.get("score", 0.0): | ||||
|                     face = attr | ||||
| 
 | ||||
|             # no faces detected in this frame | ||||
|             if not face: | ||||
|                 return False | ||||
| 
 | ||||
|             face_box = face.get("box") | ||||
| 
 | ||||
|             # check that face is valid | ||||
|             if not face_box or area(face_box) < self.config.face_recognition.min_area: | ||||
|                 logger.debug(f"Invalid face box {face}") | ||||
|                 return False | ||||
| 
 | ||||
|             face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) | ||||
| 
 | ||||
|             face_frame = face_frame[ | ||||
|                 max(0, face_box[1]) : min(frame.shape[0], face_box[3]), | ||||
|                 max(0, face_box[0]) : min(frame.shape[1], face_box[2]), | ||||
|             ] | ||||
| 
 | ||||
|         res = self.face_classifier.classify_face(face_frame) | ||||
| 
 | ||||
|         if not res: | ||||
|             return False | ||||
| 
 | ||||
|         sub_label, score = res | ||||
| 
 | ||||
|         # calculate the overall face score as the probability * area of face | ||||
|         # this will help to reduce false positives from small side-angle faces | ||||
|         # if a large front-on face image may have scored slightly lower but | ||||
|         # is more likely to be accurate due to the larger face area | ||||
|         face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2) | ||||
| 
 | ||||
|         logger.debug( | ||||
|             f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}" | ||||
|         ) | ||||
| 
 | ||||
|         if self.config.face_recognition.save_attempts: | ||||
|             # write face to library | ||||
|             folder = os.path.join(FACE_DIR, "train") | ||||
|             file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp") | ||||
|             os.makedirs(folder, exist_ok=True) | ||||
|             cv2.imwrite(file, face_frame) | ||||
| 
 | ||||
|         if score < self.config.face_recognition.threshold: | ||||
|             logger.debug( | ||||
|                 f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}" | ||||
|             ) | ||||
|             return True | ||||
| 
 | ||||
|         if id in self.detected_faces and face_score <= self.detected_faces[id]: | ||||
|             logger.debug( | ||||
|                 f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})." | ||||
|             ) | ||||
|             return True | ||||
| 
 | ||||
|         resp = requests.post( | ||||
|             f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", | ||||
|             json={ | ||||
|                 "camera": obj_data.get("camera"), | ||||
|                 "subLabel": sub_label, | ||||
|                 "subLabelScore": score, | ||||
|             }, | ||||
|         ) | ||||
| 
 | ||||
|         if resp.status_code == 200: | ||||
|             self.detected_faces[id] = face_score | ||||
| 
 | ||||
|         return True | ||||
| 
 | ||||
|     def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: | ||||
|         """Return the dimensions of the input image as [x, y, width, height].""" | ||||
|         height, width = input.shape[:2] | ||||
|  | ||||
							
								
								
									
										398
									
								
								frigate/postprocessing/face_processor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										398
									
								
								frigate/postprocessing/face_processor.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,398 @@ | ||||
| """Handle processing images for face detection and recognition.""" | ||||
| 
 | ||||
| import base64 | ||||
| import datetime | ||||
| import logging | ||||
| import os | ||||
| import random | ||||
| import string | ||||
| from typing import Optional | ||||
| 
 | ||||
| import cv2 | ||||
| import numpy as np | ||||
| import requests | ||||
| 
 | ||||
| from frigate.config import FrigateConfig | ||||
| from frigate.const import FACE_DIR, FRIGATE_LOCALHOST, MODEL_CACHE_DIR | ||||
| from frigate.util.image import area | ||||
| 
 | ||||
| from .processor_api import ProcessorApi | ||||
| from .types import PostProcessingMetrics | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| MIN_MATCHING_FACES = 2 | ||||
| 
 | ||||
| 
 | ||||
| class FaceProcessor(ProcessorApi): | ||||
|     def __init__(self, config: FrigateConfig, metrics: PostProcessingMetrics): | ||||
|         super().__init__(config, metrics) | ||||
|         self.face_config = config.face_recognition | ||||
|         self.face_detector: cv2.FaceDetectorYN = None | ||||
|         self.landmark_detector: cv2.face.FacemarkLBF = None | ||||
|         self.face_recognizer: cv2.face.LBPHFaceRecognizer = None | ||||
|         self.requires_face_detection = "face" not in self.config.objects.all_objects | ||||
|         self.detected_faces: dict[str, float] = {} | ||||
| 
 | ||||
|         download_path = os.path.join(MODEL_CACHE_DIR, "facedet") | ||||
|         self.model_files = { | ||||
|             "facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx", | ||||
|             "landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml", | ||||
|         } | ||||
| 
 | ||||
|         if not all( | ||||
|             os.path.exists(os.path.join(download_path, n)) | ||||
|             for n in self.model_files.keys() | ||||
|         ): | ||||
|             # conditionally import ModelDownloader | ||||
|             from frigate.util.downloader import ModelDownloader | ||||
| 
 | ||||
|             self.downloader = ModelDownloader( | ||||
|                 model_name="facedet", | ||||
|                 download_path=download_path, | ||||
|                 file_names=self.model_files.keys(), | ||||
|                 download_func=self.__download_models, | ||||
|                 complete_func=self.__build_detector, | ||||
|             ) | ||||
|             self.downloader.ensure_model_files() | ||||
|         else: | ||||
|             self.__build_detector() | ||||
| 
 | ||||
|         self.label_map: dict[int, str] = {} | ||||
|         self.__build_classifier() | ||||
| 
 | ||||
|     def __download_models(self, path: str) -> None: | ||||
|         try: | ||||
|             file_name = os.path.basename(path) | ||||
|             # conditionally import ModelDownloader | ||||
|             from frigate.util.downloader import ModelDownloader | ||||
| 
 | ||||
|             ModelDownloader.download_from_url(self.model_files[file_name], path) | ||||
|         except Exception as e: | ||||
|             logger.error(f"Failed to download {path}: {e}") | ||||
| 
 | ||||
|     def __build_detector(self) -> None: | ||||
|         self.face_detector = cv2.FaceDetectorYN.create( | ||||
|             "/config/model_cache/facedet/facedet.onnx", | ||||
|             config="", | ||||
|             input_size=(320, 320), | ||||
|             score_threshold=0.8, | ||||
|             nms_threshold=0.3, | ||||
|         ) | ||||
|         self.landmark_detector = cv2.face.createFacemarkLBF() | ||||
|         self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml") | ||||
| 
 | ||||
|     def __build_classifier(self) -> None: | ||||
|         if not self.landmark_detector: | ||||
|             return None | ||||
| 
 | ||||
|         labels = [] | ||||
|         faces = [] | ||||
| 
 | ||||
|         dir = "/media/frigate/clips/faces" | ||||
|         for idx, name in enumerate(os.listdir(dir)): | ||||
|             if name == "train": | ||||
|                 continue | ||||
| 
 | ||||
|             face_folder = os.path.join(dir, name) | ||||
| 
 | ||||
|             if not os.path.isdir(face_folder): | ||||
|                 continue | ||||
| 
 | ||||
|             self.label_map[idx] = name | ||||
|             for image in os.listdir(face_folder): | ||||
|                 img = cv2.imread(os.path.join(face_folder, image)) | ||||
| 
 | ||||
|                 if img is None: | ||||
|                     continue | ||||
| 
 | ||||
|                 img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | ||||
|                 img = self.__align_face(img, img.shape[1], img.shape[0]) | ||||
|                 faces.append(img) | ||||
|                 labels.append(idx) | ||||
| 
 | ||||
|         self.recognizer: cv2.face.LBPHFaceRecognizer = ( | ||||
|             cv2.face.LBPHFaceRecognizer_create( | ||||
|                 radius=2, threshold=(1 - self.face_config.min_score) * 1000 | ||||
|             ) | ||||
|         ) | ||||
|         self.recognizer.train(faces, np.array(labels)) | ||||
| 
 | ||||
|     def __align_face( | ||||
|         self, | ||||
|         image: np.ndarray, | ||||
|         output_width: int, | ||||
|         output_height: int, | ||||
|     ) -> np.ndarray: | ||||
|         _, lands = self.landmark_detector.fit( | ||||
|             image, np.array([(0, 0, image.shape[1], image.shape[0])]) | ||||
|         ) | ||||
|         landmarks: np.ndarray = lands[0][0] | ||||
| 
 | ||||
|         # get landmarks for eyes | ||||
|         leftEyePts = landmarks[42:48] | ||||
|         rightEyePts = landmarks[36:42] | ||||
| 
 | ||||
|         # compute the center of mass for each eye | ||||
|         leftEyeCenter = leftEyePts.mean(axis=0).astype("int") | ||||
|         rightEyeCenter = rightEyePts.mean(axis=0).astype("int") | ||||
| 
 | ||||
|         # compute the angle between the eye centroids | ||||
|         dY = rightEyeCenter[1] - leftEyeCenter[1] | ||||
|         dX = rightEyeCenter[0] - leftEyeCenter[0] | ||||
|         angle = np.degrees(np.arctan2(dY, dX)) - 180 | ||||
| 
 | ||||
|         # compute the desired right eye x-coordinate based on the | ||||
|         # desired x-coordinate of the left eye | ||||
|         desiredRightEyeX = 1.0 - 0.35 | ||||
| 
 | ||||
|         # determine the scale of the new resulting image by taking | ||||
|         # the ratio of the distance between eyes in the *current* | ||||
|         # image to the ratio of distance between eyes in the | ||||
|         # *desired* image | ||||
|         dist = np.sqrt((dX**2) + (dY**2)) | ||||
|         desiredDist = desiredRightEyeX - 0.35 | ||||
|         desiredDist *= output_width | ||||
|         scale = desiredDist / dist | ||||
| 
 | ||||
|         # compute center (x, y)-coordinates (i.e., the median point) | ||||
|         # between the two eyes in the input image | ||||
|         # grab the rotation matrix for rotating and scaling the face | ||||
|         eyesCenter = ( | ||||
|             int((leftEyeCenter[0] + rightEyeCenter[0]) // 2), | ||||
|             int((leftEyeCenter[1] + rightEyeCenter[1]) // 2), | ||||
|         ) | ||||
|         M = cv2.getRotationMatrix2D(eyesCenter, angle, scale) | ||||
| 
 | ||||
|         # update the translation component of the matrix | ||||
|         tX = output_width * 0.5 | ||||
|         tY = output_height * 0.35 | ||||
|         M[0, 2] += tX - eyesCenter[0] | ||||
|         M[1, 2] += tY - eyesCenter[1] | ||||
| 
 | ||||
|         # apply the affine transformation | ||||
|         return cv2.warpAffine( | ||||
|             image, M, (output_width, output_height), flags=cv2.INTER_CUBIC | ||||
|         ) | ||||
| 
 | ||||
|     def __clear_classifier(self) -> None: | ||||
|         self.face_recognizer = None | ||||
|         self.label_map = {} | ||||
| 
 | ||||
|     def __detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]: | ||||
|         """Detect faces in input image.""" | ||||
|         if not self.face_detector: | ||||
|             return None | ||||
| 
 | ||||
|         self.face_detector.setInputSize((input.shape[1], input.shape[0])) | ||||
|         faces = self.face_detector.detect(input) | ||||
| 
 | ||||
|         if faces is None or faces[1] is None: | ||||
|             return None | ||||
| 
 | ||||
|         face = None | ||||
| 
 | ||||
|         for _, potential_face in enumerate(faces[1]): | ||||
|             raw_bbox = potential_face[0:4].astype(np.uint16) | ||||
|             x: int = max(raw_bbox[0], 0) | ||||
|             y: int = max(raw_bbox[1], 0) | ||||
|             w: int = raw_bbox[2] | ||||
|             h: int = raw_bbox[3] | ||||
|             bbox = (x, y, x + w, y + h) | ||||
| 
 | ||||
|             if face is None or area(bbox) > area(face): | ||||
|                 face = bbox | ||||
| 
 | ||||
|         return face | ||||
| 
 | ||||
|     def __classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None: | ||||
|         if not self.landmark_detector: | ||||
|             return None | ||||
| 
 | ||||
|         if not self.label_map: | ||||
|             self.__build_classifier() | ||||
| 
 | ||||
|         img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY) | ||||
|         img = self.__align_face(img, img.shape[1], img.shape[0]) | ||||
|         index, distance = self.recognizer.predict(img) | ||||
| 
 | ||||
|         if index == -1: | ||||
|             return None | ||||
| 
 | ||||
|         score = 1.0 - (distance / 1000) | ||||
|         return self.label_map[index], round(score, 2) | ||||
| 
 | ||||
|     def __update_metrics(self, duration: float) -> None: | ||||
|         self.metrics.face_rec_fps.value = ( | ||||
|             self.metrics.face_rec_fps.value * 9 + duration | ||||
|         ) / 10 | ||||
| 
 | ||||
|     def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): | ||||
|         """Look for faces in image.""" | ||||
|         start = datetime.datetime.now().timestamp() | ||||
|         id = obj_data["id"] | ||||
| 
 | ||||
|         # don't run for non person objects | ||||
|         if obj_data.get("label") != "person": | ||||
|             logger.debug("Not a processing face for non person object.") | ||||
|             return | ||||
| 
 | ||||
|         # don't overwrite sub label for objects that have a sub label | ||||
|         # that is not a face | ||||
|         if obj_data.get("sub_label") and id not in self.detected_faces: | ||||
|             logger.debug( | ||||
|                 f"Not processing face due to existing sub label: {obj_data.get('sub_label')}." | ||||
|             ) | ||||
|             return | ||||
| 
 | ||||
|         face: Optional[dict[str, any]] = None | ||||
| 
 | ||||
|         if self.requires_face_detection: | ||||
|             logger.debug("Running manual face detection.") | ||||
|             person_box = obj_data.get("box") | ||||
| 
 | ||||
|             if not person_box: | ||||
|                 return | ||||
| 
 | ||||
|             rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) | ||||
|             left, top, right, bottom = person_box | ||||
|             person = rgb[top:bottom, left:right] | ||||
|             face_box = self.__detect_face(person) | ||||
| 
 | ||||
|             if not face_box: | ||||
|                 logger.debug("Detected no faces for person object.") | ||||
|                 return | ||||
| 
 | ||||
|             face_frame = person[ | ||||
|                 max(0, face_box[1]) : min(frame.shape[0], face_box[3]), | ||||
|                 max(0, face_box[0]) : min(frame.shape[1], face_box[2]), | ||||
|             ] | ||||
|             face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR) | ||||
|         else: | ||||
|             # don't run for object without attributes | ||||
|             if not obj_data.get("current_attributes"): | ||||
|                 logger.debug("No attributes to parse.") | ||||
|                 return | ||||
| 
 | ||||
|             attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) | ||||
|             for attr in attributes: | ||||
|                 if attr.get("label") != "face": | ||||
|                     continue | ||||
| 
 | ||||
|                 if face is None or attr.get("score", 0.0) > face.get("score", 0.0): | ||||
|                     face = attr | ||||
| 
 | ||||
|             # no faces detected in this frame | ||||
|             if not face: | ||||
|                 return | ||||
| 
 | ||||
|             face_box = face.get("box") | ||||
| 
 | ||||
|             # check that face is valid | ||||
|             if not face_box or area(face_box) < self.config.face_recognition.min_area: | ||||
|                 logger.debug(f"Invalid face box {face}") | ||||
|                 return | ||||
| 
 | ||||
|             face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) | ||||
| 
 | ||||
|             face_frame = face_frame[ | ||||
|                 max(0, face_box[1]) : min(frame.shape[0], face_box[3]), | ||||
|                 max(0, face_box[0]) : min(frame.shape[1], face_box[2]), | ||||
|             ] | ||||
| 
 | ||||
|         res = self.__classify_face(face_frame) | ||||
| 
 | ||||
|         if not res: | ||||
|             return | ||||
| 
 | ||||
|         sub_label, score = res | ||||
| 
 | ||||
|         # calculate the overall face score as the probability * area of face | ||||
|         # this will help to reduce false positives from small side-angle faces | ||||
|         # if a large front-on face image may have scored slightly lower but | ||||
|         # is more likely to be accurate due to the larger face area | ||||
|         face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2) | ||||
| 
 | ||||
|         logger.debug( | ||||
|             f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}" | ||||
|         ) | ||||
| 
 | ||||
|         if self.config.face_recognition.save_attempts: | ||||
|             # write face to library | ||||
|             folder = os.path.join(FACE_DIR, "train") | ||||
|             file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp") | ||||
|             os.makedirs(folder, exist_ok=True) | ||||
|             cv2.imwrite(file, face_frame) | ||||
| 
 | ||||
|         if score < self.config.face_recognition.threshold: | ||||
|             logger.debug( | ||||
|                 f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}" | ||||
|             ) | ||||
|             self.__update_metrics(datetime.datetime.now().timestamp() - start) | ||||
|             return | ||||
| 
 | ||||
|         if id in self.detected_faces and face_score <= self.detected_faces[id]: | ||||
|             logger.debug( | ||||
|                 f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})." | ||||
|             ) | ||||
|             self.__update_metrics(datetime.datetime.now().timestamp() - start) | ||||
|             return | ||||
| 
 | ||||
|         resp = requests.post( | ||||
|             f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", | ||||
|             json={ | ||||
|                 "camera": obj_data.get("camera"), | ||||
|                 "subLabel": sub_label, | ||||
|                 "subLabelScore": score, | ||||
|             }, | ||||
|         ) | ||||
| 
 | ||||
|         if resp.status_code == 200: | ||||
|             self.detected_faces[id] = face_score | ||||
| 
 | ||||
|         self.__update_metrics(datetime.datetime.now().timestamp() - start) | ||||
| 
 | ||||
|     def handle_request(self, request_data) -> dict[str, any] | None: | ||||
|         rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) | ||||
|         label = request_data["face_name"] | ||||
|         id = f"{label}-{rand_id}" | ||||
| 
 | ||||
|         if request_data.get("cropped"): | ||||
|             thumbnail = request_data["image"] | ||||
|         else: | ||||
|             img = cv2.imdecode( | ||||
|                 np.frombuffer(base64.b64decode(request_data["image"]), dtype=np.uint8), | ||||
|                 cv2.IMREAD_COLOR, | ||||
|             ) | ||||
|             face_box = self.__detect_face(img) | ||||
| 
 | ||||
|             if not face_box: | ||||
|                 return { | ||||
|                     "message": "No face was detected.", | ||||
|                     "success": False, | ||||
|                 } | ||||
| 
 | ||||
|             face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]] | ||||
|             ret, thumbnail = cv2.imencode( | ||||
|                 ".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100] | ||||
|             ) | ||||
| 
 | ||||
|         # write face to library | ||||
|         folder = os.path.join(FACE_DIR, label) | ||||
|         file = os.path.join(folder, f"{id}.webp") | ||||
|         os.makedirs(folder, exist_ok=True) | ||||
| 
 | ||||
|         # save face image | ||||
|         with open(file, "wb") as output: | ||||
|             output.write(thumbnail.tobytes()) | ||||
| 
 | ||||
|         self.__clear_classifier() | ||||
|         return { | ||||
|             "message": "Successfully registered face.", | ||||
|             "success": True, | ||||
|         } | ||||
| 
 | ||||
|     def expire_object(self, object_id: str): | ||||
|         if object_id in self.detected_faces: | ||||
|             self.detected_faces.pop(object_id) | ||||
							
								
								
									
										52
									
								
								frigate/postprocessing/processor_api.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								frigate/postprocessing/processor_api.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,52 @@ | ||||
| import logging | ||||
| from abc import ABC, abstractmethod | ||||
| 
 | ||||
| import numpy as np | ||||
| 
 | ||||
| from frigate.config import FrigateConfig | ||||
| 
 | ||||
| from .types import PostProcessingMetrics | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class ProcessorApi(ABC): | ||||
|     @abstractmethod | ||||
|     def __init__(self, config: FrigateConfig, metrics: PostProcessingMetrics) -> None: | ||||
|         self.config = config | ||||
|         self.metrics = metrics | ||||
|         pass | ||||
| 
 | ||||
|     @abstractmethod | ||||
|     def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None: | ||||
|         """Processes the frame with object data. | ||||
|         Args: | ||||
|             obj_data (dict): containing data about focused object in frame. | ||||
|             frame (ndarray): full yuv frame. | ||||
| 
 | ||||
|         Returns: | ||||
|             None. | ||||
|         """ | ||||
|         pass | ||||
| 
 | ||||
|     @abstractmethod | ||||
|     def handle_request(self, request_data: dict[str, any]) -> dict[str, any] | None: | ||||
|         """Handle metadata requests. | ||||
|         Args: | ||||
|             request_data (dict): containing data about requested change to process. | ||||
| 
 | ||||
|         Returns: | ||||
|             None if request was not handled, otherwise return response. | ||||
|         """ | ||||
|         pass | ||||
| 
 | ||||
|     @abstractmethod | ||||
|     def expire_object(self, object_id: str) -> None: | ||||
|         """Handle objects that are no longer detected. | ||||
|         Args: | ||||
|             object_id (str): id of object that is no longer detected. | ||||
| 
 | ||||
|         Returns: | ||||
|             None. | ||||
|         """ | ||||
|         pass | ||||
| @ -4,7 +4,7 @@ import multiprocessing as mp | ||||
| from multiprocessing.sharedctypes import Synchronized | ||||
| 
 | ||||
| 
 | ||||
| class EmbeddingsMetrics: | ||||
| class PostProcessingMetrics: | ||||
|     image_embeddings_fps: Synchronized | ||||
|     text_embeddings_sps: Synchronized | ||||
|     face_rec_fps: Synchronized | ||||
| @ -14,8 +14,8 @@ from requests.exceptions import RequestException | ||||
| from frigate.camera import CameraMetrics | ||||
| from frigate.config import FrigateConfig | ||||
| from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR | ||||
| from frigate.embeddings.types import EmbeddingsMetrics | ||||
| from frigate.object_detection import ObjectDetectProcess | ||||
| from frigate.postprocessing.types import PostProcessingMetrics | ||||
| from frigate.types import StatsTrackingTypes | ||||
| from frigate.util.services import ( | ||||
|     get_amd_gpu_stats, | ||||
| @ -52,7 +52,7 @@ def get_latest_version(config: FrigateConfig) -> str: | ||||
| def stats_init( | ||||
|     config: FrigateConfig, | ||||
|     camera_metrics: dict[str, CameraMetrics], | ||||
|     embeddings_metrics: EmbeddingsMetrics | None, | ||||
|     embeddings_metrics: PostProcessingMetrics | None, | ||||
|     detectors: dict[str, ObjectDetectProcess], | ||||
|     processes: dict[str, int], | ||||
| ) -> StatsTrackingTypes: | ||||
|  | ||||
| @ -2,13 +2,13 @@ from enum import Enum | ||||
| from typing import TypedDict | ||||
| 
 | ||||
| from frigate.camera import CameraMetrics | ||||
| from frigate.embeddings.types import EmbeddingsMetrics | ||||
| from frigate.object_detection import ObjectDetectProcess | ||||
| from frigate.postprocessing.types import PostProcessingMetrics | ||||
| 
 | ||||
| 
 | ||||
| class StatsTrackingTypes(TypedDict): | ||||
|     camera_metrics: dict[str, CameraMetrics] | ||||
|     embeddings_metrics: EmbeddingsMetrics | None | ||||
|     embeddings_metrics: PostProcessingMetrics | None | ||||
|     detectors: dict[str, ObjectDetectProcess] | ||||
|     started: int | ||||
|     latest_frigate_version: str | ||||
|  | ||||
| @ -4,13 +4,7 @@ import logging | ||||
| import os | ||||
| from typing import Any | ||||
| 
 | ||||
| import cv2 | ||||
| import numpy as np | ||||
| import onnxruntime as ort | ||||
| from playhouse.sqliteq import SqliteQueueDatabase | ||||
| 
 | ||||
| from frigate.config.semantic_search import FaceRecognitionConfig | ||||
| from frigate.const import MODEL_CACHE_DIR | ||||
| 
 | ||||
| try: | ||||
|     import openvino as ov | ||||
| @ -21,9 +15,6 @@ except ImportError: | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| MIN_MATCHING_FACES = 2 | ||||
| 
 | ||||
| 
 | ||||
| def get_ort_providers( | ||||
|     force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False | ||||
| ) -> tuple[list[str], list[dict[str, any]]]: | ||||
| @ -157,181 +148,3 @@ class ONNXModelRunner: | ||||
|             return [infer_request.get_output_tensor().data] | ||||
|         elif self.type == "ort": | ||||
|             return self.ort.run(None, input) | ||||
| 
 | ||||
| 
 | ||||
| class FaceClassificationModel: | ||||
|     def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase): | ||||
|         self.config = config | ||||
|         self.db = db | ||||
|         self.face_detector: cv2.FaceDetectorYN = None | ||||
|         self.landmark_detector: cv2.face.FacemarkLBF = None | ||||
|         self.face_recognizer: cv2.face.LBPHFaceRecognizer = None | ||||
| 
 | ||||
|         download_path = os.path.join(MODEL_CACHE_DIR, "facedet") | ||||
|         self.model_files = { | ||||
|             "facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx", | ||||
|             "landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml", | ||||
|         } | ||||
| 
 | ||||
|         if not all( | ||||
|             os.path.exists(os.path.join(download_path, n)) | ||||
|             for n in self.model_files.keys() | ||||
|         ): | ||||
|             # conditionally import ModelDownloader | ||||
|             from frigate.util.downloader import ModelDownloader | ||||
| 
 | ||||
|             self.downloader = ModelDownloader( | ||||
|                 model_name="facedet", | ||||
|                 download_path=download_path, | ||||
|                 file_names=self.model_files.keys(), | ||||
|                 download_func=self.__download_models, | ||||
|                 complete_func=self.__build_detector, | ||||
|             ) | ||||
|             self.downloader.ensure_model_files() | ||||
|         else: | ||||
|             self.__build_detector() | ||||
| 
 | ||||
|         self.label_map: dict[int, str] = {} | ||||
|         self.__build_classifier() | ||||
| 
 | ||||
|     def __download_models(self, path: str) -> None: | ||||
|         try: | ||||
|             file_name = os.path.basename(path) | ||||
|             # conditionally import ModelDownloader | ||||
|             from frigate.util.downloader import ModelDownloader | ||||
| 
 | ||||
|             ModelDownloader.download_from_url(self.model_files[file_name], path) | ||||
|         except Exception as e: | ||||
|             logger.error(f"Failed to download {path}: {e}") | ||||
| 
 | ||||
|     def __build_detector(self) -> None: | ||||
|         self.face_detector = cv2.FaceDetectorYN.create( | ||||
|             "/config/model_cache/facedet/facedet.onnx", | ||||
|             config="", | ||||
|             input_size=(320, 320), | ||||
|             score_threshold=0.8, | ||||
|             nms_threshold=0.3, | ||||
|         ) | ||||
|         self.landmark_detector = cv2.face.createFacemarkLBF() | ||||
|         self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml") | ||||
| 
 | ||||
|     def __build_classifier(self) -> None: | ||||
|         if not self.landmark_detector: | ||||
|             return None | ||||
| 
 | ||||
|         labels = [] | ||||
|         faces = [] | ||||
| 
 | ||||
|         dir = "/media/frigate/clips/faces" | ||||
|         for idx, name in enumerate(os.listdir(dir)): | ||||
|             if name == "train": | ||||
|                 continue | ||||
| 
 | ||||
|             face_folder = os.path.join(dir, name) | ||||
| 
 | ||||
|             if not os.path.isdir(face_folder): | ||||
|                 continue | ||||
| 
 | ||||
|             self.label_map[idx] = name | ||||
|             for image in os.listdir(face_folder): | ||||
|                 img = cv2.imread(os.path.join(face_folder, image)) | ||||
| 
 | ||||
|                 if img is None: | ||||
|                     continue | ||||
| 
 | ||||
|                 img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | ||||
|                 img = self.__align_face(img, img.shape[1], img.shape[0]) | ||||
|                 faces.append(img) | ||||
|                 labels.append(idx) | ||||
| 
 | ||||
|         self.recognizer: cv2.face.LBPHFaceRecognizer = ( | ||||
|             cv2.face.LBPHFaceRecognizer_create( | ||||
|                 radius=2, threshold=(1 - self.config.min_score) * 1000 | ||||
|             ) | ||||
|         ) | ||||
|         self.recognizer.train(faces, np.array(labels)) | ||||
| 
 | ||||
|     def __align_face( | ||||
|         self, | ||||
|         image: np.ndarray, | ||||
|         output_width: int, | ||||
|         output_height: int, | ||||
|     ) -> np.ndarray: | ||||
|         _, lands = self.landmark_detector.fit( | ||||
|             image, np.array([(0, 0, image.shape[1], image.shape[0])]) | ||||
|         ) | ||||
|         landmarks = lands[0][0] | ||||
| 
 | ||||
|         # get landmarks for eyes | ||||
|         leftEyePts = landmarks[42:48] | ||||
|         rightEyePts = landmarks[36:42] | ||||
| 
 | ||||
|         # compute the center of mass for each eye | ||||
|         leftEyeCenter = leftEyePts.mean(axis=0).astype("int") | ||||
|         rightEyeCenter = rightEyePts.mean(axis=0).astype("int") | ||||
| 
 | ||||
|         # compute the angle between the eye centroids | ||||
|         dY = rightEyeCenter[1] - leftEyeCenter[1] | ||||
|         dX = rightEyeCenter[0] - leftEyeCenter[0] | ||||
|         angle = np.degrees(np.arctan2(dY, dX)) - 180 | ||||
| 
 | ||||
|         # compute the desired right eye x-coordinate based on the | ||||
|         # desired x-coordinate of the left eye | ||||
|         desiredRightEyeX = 1.0 - 0.35 | ||||
| 
 | ||||
|         # determine the scale of the new resulting image by taking | ||||
|         # the ratio of the distance between eyes in the *current* | ||||
|         # image to the ratio of distance between eyes in the | ||||
|         # *desired* image | ||||
|         dist = np.sqrt((dX**2) + (dY**2)) | ||||
|         desiredDist = desiredRightEyeX - 0.35 | ||||
|         desiredDist *= output_width | ||||
|         scale = desiredDist / dist | ||||
| 
 | ||||
|         # compute center (x, y)-coordinates (i.e., the median point) | ||||
|         # between the two eyes in the input image | ||||
|         # grab the rotation matrix for rotating and scaling the face | ||||
|         eyesCenter = ( | ||||
|             int((leftEyeCenter[0] + rightEyeCenter[0]) // 2), | ||||
|             int((leftEyeCenter[1] + rightEyeCenter[1]) // 2), | ||||
|         ) | ||||
|         M = cv2.getRotationMatrix2D(eyesCenter, angle, scale) | ||||
| 
 | ||||
|         # update the translation component of the matrix | ||||
|         tX = output_width * 0.5 | ||||
|         tY = output_height * 0.35 | ||||
|         M[0, 2] += tX - eyesCenter[0] | ||||
|         M[1, 2] += tY - eyesCenter[1] | ||||
| 
 | ||||
|         # apply the affine transformation | ||||
|         return cv2.warpAffine( | ||||
|             image, M, (output_width, output_height), flags=cv2.INTER_CUBIC | ||||
|         ) | ||||
| 
 | ||||
|     def clear_classifier(self) -> None: | ||||
|         self.face_recognizer = None | ||||
|         self.label_map = {} | ||||
| 
 | ||||
|     def detect_faces(self, input: np.ndarray) -> tuple[int, cv2.typing.MatLike] | None: | ||||
|         if not self.face_detector: | ||||
|             return None | ||||
| 
 | ||||
|         self.face_detector.setInputSize((input.shape[1], input.shape[0])) | ||||
|         return self.face_detector.detect(input) | ||||
| 
 | ||||
|     def classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None: | ||||
|         if not self.landmark_detector: | ||||
|             return None | ||||
| 
 | ||||
|         if not self.label_map: | ||||
|             self.__build_classifier() | ||||
| 
 | ||||
|         img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY) | ||||
|         img = self.__align_face(img, img.shape[1], img.shape[0]) | ||||
|         index, distance = self.recognizer.predict(img) | ||||
| 
 | ||||
|         if index == -1: | ||||
|             return None | ||||
| 
 | ||||
|         score = 1.0 - (distance / 1000) | ||||
|         return self.label_map[index], round(score, 2) | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user