mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	Generalize postprocessing (#15931)
* Actually send result to face registration * Define postprocessing api and move face processing to fit * Standardize request handling * Standardize handling of processors * Rename processing metrics * Cleanup * Standardize object end * Update to newer formatting * One more * One more
This commit is contained in:
		
							parent
							
								
									3f1d85e189
								
							
						
					
					
						commit
						88686c44fe
					
				@ -39,16 +39,28 @@ def get_faces():
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
@router.post("/faces/{name}")
 | 
					@router.post("/faces/{name}")
 | 
				
			||||||
async def register_face(request: Request, name: str, file: UploadFile):
 | 
					async def register_face(request: Request, name: str, file: UploadFile):
 | 
				
			||||||
    context: EmbeddingsContext = request.app.embeddings
 | 
					    if not request.app.frigate_config.face_recognition.enabled:
 | 
				
			||||||
    context.register_face(name, await file.read())
 | 
					 | 
				
			||||||
        return JSONResponse(
 | 
					        return JSONResponse(
 | 
				
			||||||
        status_code=200,
 | 
					            status_code=400,
 | 
				
			||||||
        content={"success": True, "message": "Successfully registered face."},
 | 
					            content={"message": "Face recognition is not enabled.", "success": False},
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    context: EmbeddingsContext = request.app.embeddings
 | 
				
			||||||
 | 
					    result = context.register_face(name, await file.read())
 | 
				
			||||||
 | 
					    return JSONResponse(
 | 
				
			||||||
 | 
					        status_code=200 if result.get("success", True) else 400,
 | 
				
			||||||
 | 
					        content=result,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@router.post("/faces/train/{name}/classify")
 | 
					@router.post("/faces/train/{name}/classify")
 | 
				
			||||||
def train_face(name: str, body: dict = None):
 | 
					def train_face(request: Request, name: str, body: dict = None):
 | 
				
			||||||
 | 
					    if not request.app.frigate_config.face_recognition.enabled:
 | 
				
			||||||
 | 
					        return JSONResponse(
 | 
				
			||||||
 | 
					            status_code=400,
 | 
				
			||||||
 | 
					            content={"message": "Face recognition is not enabled.", "success": False},
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    json: dict[str, any] = body or {}
 | 
					    json: dict[str, any] = body or {}
 | 
				
			||||||
    training_file = os.path.join(
 | 
					    training_file = os.path.join(
 | 
				
			||||||
        FACE_DIR, f"train/{sanitize_filename(json.get('training_file', ''))}"
 | 
					        FACE_DIR, f"train/{sanitize_filename(json.get('training_file', ''))}"
 | 
				
			||||||
@ -82,6 +94,12 @@ def train_face(name: str, body: dict = None):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
@router.post("/faces/{name}/delete")
 | 
					@router.post("/faces/{name}/delete")
 | 
				
			||||||
def deregister_faces(request: Request, name: str, body: dict = None):
 | 
					def deregister_faces(request: Request, name: str, body: dict = None):
 | 
				
			||||||
 | 
					    if not request.app.frigate_config.face_recognition.enabled:
 | 
				
			||||||
 | 
					        return JSONResponse(
 | 
				
			||||||
 | 
					            status_code=400,
 | 
				
			||||||
 | 
					            content={"message": "Face recognition is not enabled.", "success": False},
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    json: dict[str, any] = body or {}
 | 
					    json: dict[str, any] = body or {}
 | 
				
			||||||
    list_of_ids = json.get("ids", "")
 | 
					    list_of_ids = json.get("ids", "")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -41,7 +41,6 @@ from frigate.const import (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
 | 
					from frigate.db.sqlitevecq import SqliteVecQueueDatabase
 | 
				
			||||||
from frigate.embeddings import EmbeddingsContext, manage_embeddings
 | 
					from frigate.embeddings import EmbeddingsContext, manage_embeddings
 | 
				
			||||||
from frigate.embeddings.types import EmbeddingsMetrics
 | 
					 | 
				
			||||||
from frigate.events.audio import AudioProcessor
 | 
					from frigate.events.audio import AudioProcessor
 | 
				
			||||||
from frigate.events.cleanup import EventCleanup
 | 
					from frigate.events.cleanup import EventCleanup
 | 
				
			||||||
from frigate.events.external import ExternalEventProcessor
 | 
					from frigate.events.external import ExternalEventProcessor
 | 
				
			||||||
@ -60,6 +59,7 @@ from frigate.models import (
 | 
				
			|||||||
from frigate.object_detection import ObjectDetectProcess
 | 
					from frigate.object_detection import ObjectDetectProcess
 | 
				
			||||||
from frigate.object_processing import TrackedObjectProcessor
 | 
					from frigate.object_processing import TrackedObjectProcessor
 | 
				
			||||||
from frigate.output.output import output_frames
 | 
					from frigate.output.output import output_frames
 | 
				
			||||||
 | 
					from frigate.postprocessing.types import PostProcessingMetrics
 | 
				
			||||||
from frigate.ptz.autotrack import PtzAutoTrackerThread
 | 
					from frigate.ptz.autotrack import PtzAutoTrackerThread
 | 
				
			||||||
from frigate.ptz.onvif import OnvifController
 | 
					from frigate.ptz.onvif import OnvifController
 | 
				
			||||||
from frigate.record.cleanup import RecordingCleanup
 | 
					from frigate.record.cleanup import RecordingCleanup
 | 
				
			||||||
@ -90,8 +90,8 @@ class FrigateApp:
 | 
				
			|||||||
        self.detection_shms: list[mp.shared_memory.SharedMemory] = []
 | 
					        self.detection_shms: list[mp.shared_memory.SharedMemory] = []
 | 
				
			||||||
        self.log_queue: Queue = mp.Queue()
 | 
					        self.log_queue: Queue = mp.Queue()
 | 
				
			||||||
        self.camera_metrics: dict[str, CameraMetrics] = {}
 | 
					        self.camera_metrics: dict[str, CameraMetrics] = {}
 | 
				
			||||||
        self.embeddings_metrics: EmbeddingsMetrics | None = (
 | 
					        self.embeddings_metrics: PostProcessingMetrics | None = (
 | 
				
			||||||
            EmbeddingsMetrics() if config.semantic_search.enabled else None
 | 
					            PostProcessingMetrics() if config.semantic_search.enabled else None
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        self.ptz_metrics: dict[str, PTZMetrics] = {}
 | 
					        self.ptz_metrics: dict[str, PTZMetrics] = {}
 | 
				
			||||||
        self.processes: dict[str, int] = {}
 | 
					        self.processes: dict[str, int] = {}
 | 
				
			||||||
 | 
				
			|||||||
@ -20,14 +20,14 @@ from frigate.models import Event
 | 
				
			|||||||
from frigate.util.builtin import serialize
 | 
					from frigate.util.builtin import serialize
 | 
				
			||||||
from frigate.util.services import listen
 | 
					from frigate.util.services import listen
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from ..postprocessing.types import PostProcessingMetrics
 | 
				
			||||||
from .maintainer import EmbeddingMaintainer
 | 
					from .maintainer import EmbeddingMaintainer
 | 
				
			||||||
from .types import EmbeddingsMetrics
 | 
					 | 
				
			||||||
from .util import ZScoreNormalization
 | 
					from .util import ZScoreNormalization
 | 
				
			||||||
 | 
					
 | 
				
			||||||
logger = logging.getLogger(__name__)
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def manage_embeddings(config: FrigateConfig, metrics: EmbeddingsMetrics) -> None:
 | 
					def manage_embeddings(config: FrigateConfig, metrics: PostProcessingMetrics) -> None:
 | 
				
			||||||
    # Only initialize embeddings if semantic search is enabled
 | 
					    # Only initialize embeddings if semantic search is enabled
 | 
				
			||||||
    if not config.semantic_search.enabled:
 | 
					    if not config.semantic_search.enabled:
 | 
				
			||||||
        return
 | 
					        return
 | 
				
			||||||
@ -192,8 +192,8 @@ class EmbeddingsContext:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        return results
 | 
					        return results
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def register_face(self, face_name: str, image_data: bytes) -> None:
 | 
					    def register_face(self, face_name: str, image_data: bytes) -> dict[str, any]:
 | 
				
			||||||
        self.requestor.send_data(
 | 
					        return self.requestor.send_data(
 | 
				
			||||||
            EmbeddingsRequestEnum.register_face.value,
 | 
					            EmbeddingsRequestEnum.register_face.value,
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                "face_name": face_name,
 | 
					                "face_name": face_name,
 | 
				
			||||||
 | 
				
			|||||||
@ -21,8 +21,8 @@ from frigate.models import Event
 | 
				
			|||||||
from frigate.types import ModelStatusTypesEnum
 | 
					from frigate.types import ModelStatusTypesEnum
 | 
				
			||||||
from frigate.util.builtin import serialize
 | 
					from frigate.util.builtin import serialize
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from ..postprocessing.types import PostProcessingMetrics
 | 
				
			||||||
from .functions.onnx import GenericONNXEmbedding, ModelTypeEnum
 | 
					from .functions.onnx import GenericONNXEmbedding, ModelTypeEnum
 | 
				
			||||||
from .types import EmbeddingsMetrics
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
logger = logging.getLogger(__name__)
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -65,7 +65,7 @@ class Embeddings:
 | 
				
			|||||||
        self,
 | 
					        self,
 | 
				
			||||||
        config: FrigateConfig,
 | 
					        config: FrigateConfig,
 | 
				
			||||||
        db: SqliteVecQueueDatabase,
 | 
					        db: SqliteVecQueueDatabase,
 | 
				
			||||||
        metrics: EmbeddingsMetrics,
 | 
					        metrics: PostProcessingMetrics,
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
        self.config = config
 | 
					        self.config = config
 | 
				
			||||||
        self.db = db
 | 
					        self.db = db
 | 
				
			||||||
 | 
				
			|||||||
@ -4,9 +4,7 @@ import base64
 | 
				
			|||||||
import datetime
 | 
					import datetime
 | 
				
			||||||
import logging
 | 
					import logging
 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
import random
 | 
					 | 
				
			||||||
import re
 | 
					import re
 | 
				
			||||||
import string
 | 
					 | 
				
			||||||
import threading
 | 
					import threading
 | 
				
			||||||
from multiprocessing.synchronize import Event as MpEvent
 | 
					from multiprocessing.synchronize import Event as MpEvent
 | 
				
			||||||
from pathlib import Path
 | 
					from pathlib import Path
 | 
				
			||||||
@ -28,7 +26,6 @@ from frigate.comms.inter_process import InterProcessRequestor
 | 
				
			|||||||
from frigate.config import FrigateConfig
 | 
					from frigate.config import FrigateConfig
 | 
				
			||||||
from frigate.const import (
 | 
					from frigate.const import (
 | 
				
			||||||
    CLIPS_DIR,
 | 
					    CLIPS_DIR,
 | 
				
			||||||
    FACE_DIR,
 | 
					 | 
				
			||||||
    FRIGATE_LOCALHOST,
 | 
					    FRIGATE_LOCALHOST,
 | 
				
			||||||
    UPDATE_EVENT_DESCRIPTION,
 | 
					    UPDATE_EVENT_DESCRIPTION,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@ -36,13 +33,14 @@ from frigate.embeddings.lpr.lpr import LicensePlateRecognition
 | 
				
			|||||||
from frigate.events.types import EventTypeEnum
 | 
					from frigate.events.types import EventTypeEnum
 | 
				
			||||||
from frigate.genai import get_genai_client
 | 
					from frigate.genai import get_genai_client
 | 
				
			||||||
from frigate.models import Event
 | 
					from frigate.models import Event
 | 
				
			||||||
 | 
					from frigate.postprocessing.face_processor import FaceProcessor
 | 
				
			||||||
 | 
					from frigate.postprocessing.processor_api import ProcessorApi
 | 
				
			||||||
from frigate.types import TrackedObjectUpdateTypesEnum
 | 
					from frigate.types import TrackedObjectUpdateTypesEnum
 | 
				
			||||||
from frigate.util.builtin import serialize
 | 
					from frigate.util.builtin import serialize
 | 
				
			||||||
from frigate.util.image import SharedMemoryFrameManager, area, calculate_region
 | 
					from frigate.util.image import SharedMemoryFrameManager, area, calculate_region
 | 
				
			||||||
from frigate.util.model import FaceClassificationModel
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from ..postprocessing.types import PostProcessingMetrics
 | 
				
			||||||
from .embeddings import Embeddings
 | 
					from .embeddings import Embeddings
 | 
				
			||||||
from .types import EmbeddingsMetrics
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
logger = logging.getLogger(__name__)
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -56,7 +54,7 @@ class EmbeddingMaintainer(threading.Thread):
 | 
				
			|||||||
        self,
 | 
					        self,
 | 
				
			||||||
        db: SqliteQueueDatabase,
 | 
					        db: SqliteQueueDatabase,
 | 
				
			||||||
        config: FrigateConfig,
 | 
					        config: FrigateConfig,
 | 
				
			||||||
        metrics: EmbeddingsMetrics,
 | 
					        metrics: PostProcessingMetrics,
 | 
				
			||||||
        stop_event: MpEvent,
 | 
					        stop_event: MpEvent,
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
        super().__init__(name="embeddings_maintainer")
 | 
					        super().__init__(name="embeddings_maintainer")
 | 
				
			||||||
@ -75,16 +73,10 @@ class EmbeddingMaintainer(threading.Thread):
 | 
				
			|||||||
        )
 | 
					        )
 | 
				
			||||||
        self.embeddings_responder = EmbeddingsResponder()
 | 
					        self.embeddings_responder = EmbeddingsResponder()
 | 
				
			||||||
        self.frame_manager = SharedMemoryFrameManager()
 | 
					        self.frame_manager = SharedMemoryFrameManager()
 | 
				
			||||||
 | 
					        self.processors: list[ProcessorApi] = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # set face recognition conditions
 | 
					        if self.config.face_recognition.enabled:
 | 
				
			||||||
        self.face_recognition_enabled = self.config.face_recognition.enabled
 | 
					            self.processors.append(FaceProcessor(self.config, metrics))
 | 
				
			||||||
        self.requires_face_detection = "face" not in self.config.objects.all_objects
 | 
					 | 
				
			||||||
        self.detected_faces: dict[str, float] = {}
 | 
					 | 
				
			||||||
        self.face_classifier = (
 | 
					 | 
				
			||||||
            FaceClassificationModel(self.config.face_recognition, db)
 | 
					 | 
				
			||||||
            if self.face_recognition_enabled
 | 
					 | 
				
			||||||
            else None
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # create communication for updating event descriptions
 | 
					        # create communication for updating event descriptions
 | 
				
			||||||
        self.requestor = InterProcessRequestor()
 | 
					        self.requestor = InterProcessRequestor()
 | 
				
			||||||
@ -142,46 +134,12 @@ class EmbeddingMaintainer(threading.Thread):
 | 
				
			|||||||
                        self.embeddings.embed_description("", data, upsert=False),
 | 
					                        self.embeddings.embed_description("", data, upsert=False),
 | 
				
			||||||
                        pack=False,
 | 
					                        pack=False,
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                elif topic == EmbeddingsRequestEnum.register_face.value:
 | 
					 | 
				
			||||||
                    if not self.face_recognition_enabled:
 | 
					 | 
				
			||||||
                        return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    rand_id = "".join(
 | 
					 | 
				
			||||||
                        random.choices(string.ascii_lowercase + string.digits, k=6)
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    label = data["face_name"]
 | 
					 | 
				
			||||||
                    id = f"{label}-{rand_id}"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    if data.get("cropped"):
 | 
					 | 
				
			||||||
                        pass
 | 
					 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                        img = cv2.imdecode(
 | 
					                    for processor in self.processors:
 | 
				
			||||||
                            np.frombuffer(
 | 
					                        resp = processor.handle_request(data)
 | 
				
			||||||
                                base64.b64decode(data["image"]), dtype=np.uint8
 | 
					 | 
				
			||||||
                            ),
 | 
					 | 
				
			||||||
                            cv2.IMREAD_COLOR,
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        face_box = self._detect_face(img)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        if not face_box:
 | 
					                        if resp is not None:
 | 
				
			||||||
                            return False
 | 
					                            return resp
 | 
				
			||||||
 | 
					 | 
				
			||||||
                        face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
 | 
					 | 
				
			||||||
                        ret, thumbnail = cv2.imencode(
 | 
					 | 
				
			||||||
                            ".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # write face to library
 | 
					 | 
				
			||||||
                    folder = os.path.join(FACE_DIR, label)
 | 
					 | 
				
			||||||
                    file = os.path.join(folder, f"{id}.webp")
 | 
					 | 
				
			||||||
                    os.makedirs(folder, exist_ok=True)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # save face image
 | 
					 | 
				
			||||||
                    with open(file, "wb") as output:
 | 
					 | 
				
			||||||
                        output.write(thumbnail.tobytes())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                self.face_classifier.clear_classifier()
 | 
					 | 
				
			||||||
                return True
 | 
					 | 
				
			||||||
            except Exception as e:
 | 
					            except Exception as e:
 | 
				
			||||||
                logger.error(f"Unable to handle embeddings request {e}")
 | 
					                logger.error(f"Unable to handle embeddings request {e}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -204,8 +162,8 @@ class EmbeddingMaintainer(threading.Thread):
 | 
				
			|||||||
        # no need to process updated objects if face recognition, lpr, genai are disabled
 | 
					        # no need to process updated objects if face recognition, lpr, genai are disabled
 | 
				
			||||||
        if (
 | 
					        if (
 | 
				
			||||||
            not camera_config.genai.enabled
 | 
					            not camera_config.genai.enabled
 | 
				
			||||||
            and not self.face_recognition_enabled
 | 
					 | 
				
			||||||
            and not self.lpr_config.enabled
 | 
					            and not self.lpr_config.enabled
 | 
				
			||||||
 | 
					            and len(self.processors) == 0
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -223,15 +181,8 @@ class EmbeddingMaintainer(threading.Thread):
 | 
				
			|||||||
            )
 | 
					            )
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if self.face_recognition_enabled:
 | 
					        for processor in self.processors:
 | 
				
			||||||
            start = datetime.datetime.now().timestamp()
 | 
					            processor.process_frame(data, yuv_frame)
 | 
				
			||||||
            processed = self._process_face(data, yuv_frame)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if processed:
 | 
					 | 
				
			||||||
                duration = datetime.datetime.now().timestamp() - start
 | 
					 | 
				
			||||||
                self.metrics.face_rec_fps.value = (
 | 
					 | 
				
			||||||
                    self.metrics.face_rec_fps.value * 9 + duration
 | 
					 | 
				
			||||||
                ) / 10
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if self.lpr_config.enabled:
 | 
					        if self.lpr_config.enabled:
 | 
				
			||||||
            start = datetime.datetime.now().timestamp()
 | 
					            start = datetime.datetime.now().timestamp()
 | 
				
			||||||
@ -271,8 +222,8 @@ class EmbeddingMaintainer(threading.Thread):
 | 
				
			|||||||
            event_id, camera, updated_db = ended
 | 
					            event_id, camera, updated_db = ended
 | 
				
			||||||
            camera_config = self.config.cameras[camera]
 | 
					            camera_config = self.config.cameras[camera]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if event_id in self.detected_faces:
 | 
					            for processor in self.processors:
 | 
				
			||||||
                self.detected_faces.pop(event_id)
 | 
					                processor.expire_object(event_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if event_id in self.detected_license_plates:
 | 
					            if event_id in self.detected_license_plates:
 | 
				
			||||||
                self.detected_license_plates.pop(event_id)
 | 
					                self.detected_license_plates.pop(event_id)
 | 
				
			||||||
@ -399,150 +350,6 @@ class EmbeddingMaintainer(threading.Thread):
 | 
				
			|||||||
        if event_id:
 | 
					        if event_id:
 | 
				
			||||||
            self.handle_regenerate_description(event_id, source)
 | 
					            self.handle_regenerate_description(event_id, source)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]:
 | 
					 | 
				
			||||||
        """Detect faces in input image."""
 | 
					 | 
				
			||||||
        faces = self.face_classifier.detect_faces(input)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if faces is None or faces[1] is None:
 | 
					 | 
				
			||||||
            return None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        face = None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for _, potential_face in enumerate(faces[1]):
 | 
					 | 
				
			||||||
            raw_bbox = potential_face[0:4].astype(np.uint16)
 | 
					 | 
				
			||||||
            x: int = max(raw_bbox[0], 0)
 | 
					 | 
				
			||||||
            y: int = max(raw_bbox[1], 0)
 | 
					 | 
				
			||||||
            w: int = raw_bbox[2]
 | 
					 | 
				
			||||||
            h: int = raw_bbox[3]
 | 
					 | 
				
			||||||
            bbox = (x, y, x + w, y + h)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if face is None or area(bbox) > area(face):
 | 
					 | 
				
			||||||
                face = bbox
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        return face
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> bool:
 | 
					 | 
				
			||||||
        """Look for faces in image."""
 | 
					 | 
				
			||||||
        id = obj_data["id"]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # don't run for non person objects
 | 
					 | 
				
			||||||
        if obj_data.get("label") != "person":
 | 
					 | 
				
			||||||
            logger.debug("Not a processing face for non person object.")
 | 
					 | 
				
			||||||
            return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # don't overwrite sub label for objects that have a sub label
 | 
					 | 
				
			||||||
        # that is not a face
 | 
					 | 
				
			||||||
        if obj_data.get("sub_label") and id not in self.detected_faces:
 | 
					 | 
				
			||||||
            logger.debug(
 | 
					 | 
				
			||||||
                f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        face: Optional[dict[str, any]] = None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if self.requires_face_detection:
 | 
					 | 
				
			||||||
            logger.debug("Running manual face detection.")
 | 
					 | 
				
			||||||
            person_box = obj_data.get("box")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if not person_box:
 | 
					 | 
				
			||||||
                return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
 | 
					 | 
				
			||||||
            left, top, right, bottom = person_box
 | 
					 | 
				
			||||||
            person = rgb[top:bottom, left:right]
 | 
					 | 
				
			||||||
            face_box = self._detect_face(person)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if not face_box:
 | 
					 | 
				
			||||||
                logger.debug("Detected no faces for person object.")
 | 
					 | 
				
			||||||
                return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            face_frame = person[
 | 
					 | 
				
			||||||
                max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
 | 
					 | 
				
			||||||
                max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
 | 
					 | 
				
			||||||
            ]
 | 
					 | 
				
			||||||
            face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR)
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            # don't run for object without attributes
 | 
					 | 
				
			||||||
            if not obj_data.get("current_attributes"):
 | 
					 | 
				
			||||||
                logger.debug("No attributes to parse.")
 | 
					 | 
				
			||||||
                return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
 | 
					 | 
				
			||||||
            for attr in attributes:
 | 
					 | 
				
			||||||
                if attr.get("label") != "face":
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
 | 
					 | 
				
			||||||
                    face = attr
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # no faces detected in this frame
 | 
					 | 
				
			||||||
            if not face:
 | 
					 | 
				
			||||||
                return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            face_box = face.get("box")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # check that face is valid
 | 
					 | 
				
			||||||
            if not face_box or area(face_box) < self.config.face_recognition.min_area:
 | 
					 | 
				
			||||||
                logger.debug(f"Invalid face box {face}")
 | 
					 | 
				
			||||||
                return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            face_frame = face_frame[
 | 
					 | 
				
			||||||
                max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
 | 
					 | 
				
			||||||
                max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
 | 
					 | 
				
			||||||
            ]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        res = self.face_classifier.classify_face(face_frame)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if not res:
 | 
					 | 
				
			||||||
            return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        sub_label, score = res
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # calculate the overall face score as the probability * area of face
 | 
					 | 
				
			||||||
        # this will help to reduce false positives from small side-angle faces
 | 
					 | 
				
			||||||
        # if a large front-on face image may have scored slightly lower but
 | 
					 | 
				
			||||||
        # is more likely to be accurate due to the larger face area
 | 
					 | 
				
			||||||
        face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        logger.debug(
 | 
					 | 
				
			||||||
            f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}"
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if self.config.face_recognition.save_attempts:
 | 
					 | 
				
			||||||
            # write face to library
 | 
					 | 
				
			||||||
            folder = os.path.join(FACE_DIR, "train")
 | 
					 | 
				
			||||||
            file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp")
 | 
					 | 
				
			||||||
            os.makedirs(folder, exist_ok=True)
 | 
					 | 
				
			||||||
            cv2.imwrite(file, face_frame)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if score < self.config.face_recognition.threshold:
 | 
					 | 
				
			||||||
            logger.debug(
 | 
					 | 
				
			||||||
                f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}"
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            return True
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if id in self.detected_faces and face_score <= self.detected_faces[id]:
 | 
					 | 
				
			||||||
            logger.debug(
 | 
					 | 
				
			||||||
                f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})."
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            return True
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        resp = requests.post(
 | 
					 | 
				
			||||||
            f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
 | 
					 | 
				
			||||||
            json={
 | 
					 | 
				
			||||||
                "camera": obj_data.get("camera"),
 | 
					 | 
				
			||||||
                "subLabel": sub_label,
 | 
					 | 
				
			||||||
                "subLabelScore": score,
 | 
					 | 
				
			||||||
            },
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if resp.status_code == 200:
 | 
					 | 
				
			||||||
            self.detected_faces[id] = face_score
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        return True
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
 | 
					    def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
 | 
				
			||||||
        """Return the dimensions of the input image as [x, y, width, height]."""
 | 
					        """Return the dimensions of the input image as [x, y, width, height]."""
 | 
				
			||||||
        height, width = input.shape[:2]
 | 
					        height, width = input.shape[:2]
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										398
									
								
								frigate/postprocessing/face_processor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										398
									
								
								frigate/postprocessing/face_processor.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,398 @@
 | 
				
			|||||||
 | 
					"""Handle processing images for face detection and recognition."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import base64
 | 
				
			||||||
 | 
					import datetime
 | 
				
			||||||
 | 
					import logging
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import random
 | 
				
			||||||
 | 
					import string
 | 
				
			||||||
 | 
					from typing import Optional
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import cv2
 | 
				
			||||||
 | 
					import numpy as np
 | 
				
			||||||
 | 
					import requests
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from frigate.config import FrigateConfig
 | 
				
			||||||
 | 
					from frigate.const import FACE_DIR, FRIGATE_LOCALHOST, MODEL_CACHE_DIR
 | 
				
			||||||
 | 
					from frigate.util.image import area
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from .processor_api import ProcessorApi
 | 
				
			||||||
 | 
					from .types import PostProcessingMetrics
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					MIN_MATCHING_FACES = 2
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class FaceProcessor(ProcessorApi):
 | 
				
			||||||
 | 
					    def __init__(self, config: FrigateConfig, metrics: PostProcessingMetrics):
 | 
				
			||||||
 | 
					        super().__init__(config, metrics)
 | 
				
			||||||
 | 
					        self.face_config = config.face_recognition
 | 
				
			||||||
 | 
					        self.face_detector: cv2.FaceDetectorYN = None
 | 
				
			||||||
 | 
					        self.landmark_detector: cv2.face.FacemarkLBF = None
 | 
				
			||||||
 | 
					        self.face_recognizer: cv2.face.LBPHFaceRecognizer = None
 | 
				
			||||||
 | 
					        self.requires_face_detection = "face" not in self.config.objects.all_objects
 | 
				
			||||||
 | 
					        self.detected_faces: dict[str, float] = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        download_path = os.path.join(MODEL_CACHE_DIR, "facedet")
 | 
				
			||||||
 | 
					        self.model_files = {
 | 
				
			||||||
 | 
					            "facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx",
 | 
				
			||||||
 | 
					            "landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml",
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not all(
 | 
				
			||||||
 | 
					            os.path.exists(os.path.join(download_path, n))
 | 
				
			||||||
 | 
					            for n in self.model_files.keys()
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					            # conditionally import ModelDownloader
 | 
				
			||||||
 | 
					            from frigate.util.downloader import ModelDownloader
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            self.downloader = ModelDownloader(
 | 
				
			||||||
 | 
					                model_name="facedet",
 | 
				
			||||||
 | 
					                download_path=download_path,
 | 
				
			||||||
 | 
					                file_names=self.model_files.keys(),
 | 
				
			||||||
 | 
					                download_func=self.__download_models,
 | 
				
			||||||
 | 
					                complete_func=self.__build_detector,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            self.downloader.ensure_model_files()
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            self.__build_detector()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.label_map: dict[int, str] = {}
 | 
				
			||||||
 | 
					        self.__build_classifier()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __download_models(self, path: str) -> None:
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            file_name = os.path.basename(path)
 | 
				
			||||||
 | 
					            # conditionally import ModelDownloader
 | 
				
			||||||
 | 
					            from frigate.util.downloader import ModelDownloader
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            ModelDownloader.download_from_url(self.model_files[file_name], path)
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            logger.error(f"Failed to download {path}: {e}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __build_detector(self) -> None:
 | 
				
			||||||
 | 
					        self.face_detector = cv2.FaceDetectorYN.create(
 | 
				
			||||||
 | 
					            "/config/model_cache/facedet/facedet.onnx",
 | 
				
			||||||
 | 
					            config="",
 | 
				
			||||||
 | 
					            input_size=(320, 320),
 | 
				
			||||||
 | 
					            score_threshold=0.8,
 | 
				
			||||||
 | 
					            nms_threshold=0.3,
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.landmark_detector = cv2.face.createFacemarkLBF()
 | 
				
			||||||
 | 
					        self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __build_classifier(self) -> None:
 | 
				
			||||||
 | 
					        if not self.landmark_detector:
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        labels = []
 | 
				
			||||||
 | 
					        faces = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        dir = "/media/frigate/clips/faces"
 | 
				
			||||||
 | 
					        for idx, name in enumerate(os.listdir(dir)):
 | 
				
			||||||
 | 
					            if name == "train":
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            face_folder = os.path.join(dir, name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if not os.path.isdir(face_folder):
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            self.label_map[idx] = name
 | 
				
			||||||
 | 
					            for image in os.listdir(face_folder):
 | 
				
			||||||
 | 
					                img = cv2.imread(os.path.join(face_folder, image))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if img is None:
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
 | 
				
			||||||
 | 
					                img = self.__align_face(img, img.shape[1], img.shape[0])
 | 
				
			||||||
 | 
					                faces.append(img)
 | 
				
			||||||
 | 
					                labels.append(idx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.recognizer: cv2.face.LBPHFaceRecognizer = (
 | 
				
			||||||
 | 
					            cv2.face.LBPHFaceRecognizer_create(
 | 
				
			||||||
 | 
					                radius=2, threshold=(1 - self.face_config.min_score) * 1000
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.recognizer.train(faces, np.array(labels))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __align_face(
 | 
				
			||||||
 | 
					        self,
 | 
				
			||||||
 | 
					        image: np.ndarray,
 | 
				
			||||||
 | 
					        output_width: int,
 | 
				
			||||||
 | 
					        output_height: int,
 | 
				
			||||||
 | 
					    ) -> np.ndarray:
 | 
				
			||||||
 | 
					        _, lands = self.landmark_detector.fit(
 | 
				
			||||||
 | 
					            image, np.array([(0, 0, image.shape[1], image.shape[0])])
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        landmarks: np.ndarray = lands[0][0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # get landmarks for eyes
 | 
				
			||||||
 | 
					        leftEyePts = landmarks[42:48]
 | 
				
			||||||
 | 
					        rightEyePts = landmarks[36:42]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # compute the center of mass for each eye
 | 
				
			||||||
 | 
					        leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
 | 
				
			||||||
 | 
					        rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # compute the angle between the eye centroids
 | 
				
			||||||
 | 
					        dY = rightEyeCenter[1] - leftEyeCenter[1]
 | 
				
			||||||
 | 
					        dX = rightEyeCenter[0] - leftEyeCenter[0]
 | 
				
			||||||
 | 
					        angle = np.degrees(np.arctan2(dY, dX)) - 180
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # compute the desired right eye x-coordinate based on the
 | 
				
			||||||
 | 
					        # desired x-coordinate of the left eye
 | 
				
			||||||
 | 
					        desiredRightEyeX = 1.0 - 0.35
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # determine the scale of the new resulting image by taking
 | 
				
			||||||
 | 
					        # the ratio of the distance between eyes in the *current*
 | 
				
			||||||
 | 
					        # image to the ratio of distance between eyes in the
 | 
				
			||||||
 | 
					        # *desired* image
 | 
				
			||||||
 | 
					        dist = np.sqrt((dX**2) + (dY**2))
 | 
				
			||||||
 | 
					        desiredDist = desiredRightEyeX - 0.35
 | 
				
			||||||
 | 
					        desiredDist *= output_width
 | 
				
			||||||
 | 
					        scale = desiredDist / dist
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # compute center (x, y)-coordinates (i.e., the median point)
 | 
				
			||||||
 | 
					        # between the two eyes in the input image
 | 
				
			||||||
 | 
					        # grab the rotation matrix for rotating and scaling the face
 | 
				
			||||||
 | 
					        eyesCenter = (
 | 
				
			||||||
 | 
					            int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
 | 
				
			||||||
 | 
					            int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # update the translation component of the matrix
 | 
				
			||||||
 | 
					        tX = output_width * 0.5
 | 
				
			||||||
 | 
					        tY = output_height * 0.35
 | 
				
			||||||
 | 
					        M[0, 2] += tX - eyesCenter[0]
 | 
				
			||||||
 | 
					        M[1, 2] += tY - eyesCenter[1]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # apply the affine transformation
 | 
				
			||||||
 | 
					        return cv2.warpAffine(
 | 
				
			||||||
 | 
					            image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __clear_classifier(self) -> None:
 | 
				
			||||||
 | 
					        self.face_recognizer = None
 | 
				
			||||||
 | 
					        self.label_map = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]:
 | 
				
			||||||
 | 
					        """Detect faces in input image."""
 | 
				
			||||||
 | 
					        if not self.face_detector:
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.face_detector.setInputSize((input.shape[1], input.shape[0]))
 | 
				
			||||||
 | 
					        faces = self.face_detector.detect(input)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if faces is None or faces[1] is None:
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        face = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for _, potential_face in enumerate(faces[1]):
 | 
				
			||||||
 | 
					            raw_bbox = potential_face[0:4].astype(np.uint16)
 | 
				
			||||||
 | 
					            x: int = max(raw_bbox[0], 0)
 | 
				
			||||||
 | 
					            y: int = max(raw_bbox[1], 0)
 | 
				
			||||||
 | 
					            w: int = raw_bbox[2]
 | 
				
			||||||
 | 
					            h: int = raw_bbox[3]
 | 
				
			||||||
 | 
					            bbox = (x, y, x + w, y + h)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if face is None or area(bbox) > area(face):
 | 
				
			||||||
 | 
					                face = bbox
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return face
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None:
 | 
				
			||||||
 | 
					        if not self.landmark_detector:
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not self.label_map:
 | 
				
			||||||
 | 
					            self.__build_classifier()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
 | 
				
			||||||
 | 
					        img = self.__align_face(img, img.shape[1], img.shape[0])
 | 
				
			||||||
 | 
					        index, distance = self.recognizer.predict(img)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if index == -1:
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        score = 1.0 - (distance / 1000)
 | 
				
			||||||
 | 
					        return self.label_map[index], round(score, 2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __update_metrics(self, duration: float) -> None:
 | 
				
			||||||
 | 
					        self.metrics.face_rec_fps.value = (
 | 
				
			||||||
 | 
					            self.metrics.face_rec_fps.value * 9 + duration
 | 
				
			||||||
 | 
					        ) / 10
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def process_frame(self, obj_data: dict[str, any], frame: np.ndarray):
 | 
				
			||||||
 | 
					        """Look for faces in image."""
 | 
				
			||||||
 | 
					        start = datetime.datetime.now().timestamp()
 | 
				
			||||||
 | 
					        id = obj_data["id"]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # don't run for non person objects
 | 
				
			||||||
 | 
					        if obj_data.get("label") != "person":
 | 
				
			||||||
 | 
					            logger.debug("Not a processing face for non person object.")
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # don't overwrite sub label for objects that have a sub label
 | 
				
			||||||
 | 
					        # that is not a face
 | 
				
			||||||
 | 
					        if obj_data.get("sub_label") and id not in self.detected_faces:
 | 
				
			||||||
 | 
					            logger.debug(
 | 
				
			||||||
 | 
					                f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        face: Optional[dict[str, any]] = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if self.requires_face_detection:
 | 
				
			||||||
 | 
					            logger.debug("Running manual face detection.")
 | 
				
			||||||
 | 
					            person_box = obj_data.get("box")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if not person_box:
 | 
				
			||||||
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
 | 
				
			||||||
 | 
					            left, top, right, bottom = person_box
 | 
				
			||||||
 | 
					            person = rgb[top:bottom, left:right]
 | 
				
			||||||
 | 
					            face_box = self.__detect_face(person)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if not face_box:
 | 
				
			||||||
 | 
					                logger.debug("Detected no faces for person object.")
 | 
				
			||||||
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            face_frame = person[
 | 
				
			||||||
 | 
					                max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
 | 
				
			||||||
 | 
					                max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
 | 
				
			||||||
 | 
					            ]
 | 
				
			||||||
 | 
					            face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR)
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            # don't run for object without attributes
 | 
				
			||||||
 | 
					            if not obj_data.get("current_attributes"):
 | 
				
			||||||
 | 
					                logger.debug("No attributes to parse.")
 | 
				
			||||||
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
 | 
				
			||||||
 | 
					            for attr in attributes:
 | 
				
			||||||
 | 
					                if attr.get("label") != "face":
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
 | 
				
			||||||
 | 
					                    face = attr
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # no faces detected in this frame
 | 
				
			||||||
 | 
					            if not face:
 | 
				
			||||||
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            face_box = face.get("box")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # check that face is valid
 | 
				
			||||||
 | 
					            if not face_box or area(face_box) < self.config.face_recognition.min_area:
 | 
				
			||||||
 | 
					                logger.debug(f"Invalid face box {face}")
 | 
				
			||||||
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            face_frame = face_frame[
 | 
				
			||||||
 | 
					                max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
 | 
				
			||||||
 | 
					                max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
 | 
				
			||||||
 | 
					            ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        res = self.__classify_face(face_frame)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not res:
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        sub_label, score = res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # calculate the overall face score as the probability * area of face
 | 
				
			||||||
 | 
					        # this will help to reduce false positives from small side-angle faces
 | 
				
			||||||
 | 
					        # if a large front-on face image may have scored slightly lower but
 | 
				
			||||||
 | 
					        # is more likely to be accurate due to the larger face area
 | 
				
			||||||
 | 
					        face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger.debug(
 | 
				
			||||||
 | 
					            f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}"
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if self.config.face_recognition.save_attempts:
 | 
				
			||||||
 | 
					            # write face to library
 | 
				
			||||||
 | 
					            folder = os.path.join(FACE_DIR, "train")
 | 
				
			||||||
 | 
					            file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp")
 | 
				
			||||||
 | 
					            os.makedirs(folder, exist_ok=True)
 | 
				
			||||||
 | 
					            cv2.imwrite(file, face_frame)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if score < self.config.face_recognition.threshold:
 | 
				
			||||||
 | 
					            logger.debug(
 | 
				
			||||||
 | 
					                f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}"
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            self.__update_metrics(datetime.datetime.now().timestamp() - start)
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if id in self.detected_faces and face_score <= self.detected_faces[id]:
 | 
				
			||||||
 | 
					            logger.debug(
 | 
				
			||||||
 | 
					                f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})."
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            self.__update_metrics(datetime.datetime.now().timestamp() - start)
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        resp = requests.post(
 | 
				
			||||||
 | 
					            f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
 | 
				
			||||||
 | 
					            json={
 | 
				
			||||||
 | 
					                "camera": obj_data.get("camera"),
 | 
				
			||||||
 | 
					                "subLabel": sub_label,
 | 
				
			||||||
 | 
					                "subLabelScore": score,
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if resp.status_code == 200:
 | 
				
			||||||
 | 
					            self.detected_faces[id] = face_score
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.__update_metrics(datetime.datetime.now().timestamp() - start)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def handle_request(self, request_data) -> dict[str, any] | None:
 | 
				
			||||||
 | 
					        rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
 | 
				
			||||||
 | 
					        label = request_data["face_name"]
 | 
				
			||||||
 | 
					        id = f"{label}-{rand_id}"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if request_data.get("cropped"):
 | 
				
			||||||
 | 
					            thumbnail = request_data["image"]
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            img = cv2.imdecode(
 | 
				
			||||||
 | 
					                np.frombuffer(base64.b64decode(request_data["image"]), dtype=np.uint8),
 | 
				
			||||||
 | 
					                cv2.IMREAD_COLOR,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            face_box = self.__detect_face(img)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if not face_box:
 | 
				
			||||||
 | 
					                return {
 | 
				
			||||||
 | 
					                    "message": "No face was detected.",
 | 
				
			||||||
 | 
					                    "success": False,
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
 | 
				
			||||||
 | 
					            ret, thumbnail = cv2.imencode(
 | 
				
			||||||
 | 
					                ".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # write face to library
 | 
				
			||||||
 | 
					        folder = os.path.join(FACE_DIR, label)
 | 
				
			||||||
 | 
					        file = os.path.join(folder, f"{id}.webp")
 | 
				
			||||||
 | 
					        os.makedirs(folder, exist_ok=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # save face image
 | 
				
			||||||
 | 
					        with open(file, "wb") as output:
 | 
				
			||||||
 | 
					            output.write(thumbnail.tobytes())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.__clear_classifier()
 | 
				
			||||||
 | 
					        return {
 | 
				
			||||||
 | 
					            "message": "Successfully registered face.",
 | 
				
			||||||
 | 
					            "success": True,
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def expire_object(self, object_id: str):
 | 
				
			||||||
 | 
					        if object_id in self.detected_faces:
 | 
				
			||||||
 | 
					            self.detected_faces.pop(object_id)
 | 
				
			||||||
							
								
								
									
										52
									
								
								frigate/postprocessing/processor_api.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								frigate/postprocessing/processor_api.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,52 @@
 | 
				
			|||||||
 | 
					import logging
 | 
				
			||||||
 | 
					from abc import ABC, abstractmethod
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import numpy as np
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from frigate.config import FrigateConfig
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from .types import PostProcessingMetrics
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class ProcessorApi(ABC):
 | 
				
			||||||
 | 
					    @abstractmethod
 | 
				
			||||||
 | 
					    def __init__(self, config: FrigateConfig, metrics: PostProcessingMetrics) -> None:
 | 
				
			||||||
 | 
					        self.config = config
 | 
				
			||||||
 | 
					        self.metrics = metrics
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @abstractmethod
 | 
				
			||||||
 | 
					    def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None:
 | 
				
			||||||
 | 
					        """Processes the frame with object data.
 | 
				
			||||||
 | 
					        Args:
 | 
				
			||||||
 | 
					            obj_data (dict): containing data about focused object in frame.
 | 
				
			||||||
 | 
					            frame (ndarray): full yuv frame.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            None.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @abstractmethod
 | 
				
			||||||
 | 
					    def handle_request(self, request_data: dict[str, any]) -> dict[str, any] | None:
 | 
				
			||||||
 | 
					        """Handle metadata requests.
 | 
				
			||||||
 | 
					        Args:
 | 
				
			||||||
 | 
					            request_data (dict): containing data about requested change to process.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            None if request was not handled, otherwise return response.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @abstractmethod
 | 
				
			||||||
 | 
					    def expire_object(self, object_id: str) -> None:
 | 
				
			||||||
 | 
					        """Handle objects that are no longer detected.
 | 
				
			||||||
 | 
					        Args:
 | 
				
			||||||
 | 
					            object_id (str): id of object that is no longer detected.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            None.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
@ -4,7 +4,7 @@ import multiprocessing as mp
 | 
				
			|||||||
from multiprocessing.sharedctypes import Synchronized
 | 
					from multiprocessing.sharedctypes import Synchronized
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class EmbeddingsMetrics:
 | 
					class PostProcessingMetrics:
 | 
				
			||||||
    image_embeddings_fps: Synchronized
 | 
					    image_embeddings_fps: Synchronized
 | 
				
			||||||
    text_embeddings_sps: Synchronized
 | 
					    text_embeddings_sps: Synchronized
 | 
				
			||||||
    face_rec_fps: Synchronized
 | 
					    face_rec_fps: Synchronized
 | 
				
			||||||
@ -14,8 +14,8 @@ from requests.exceptions import RequestException
 | 
				
			|||||||
from frigate.camera import CameraMetrics
 | 
					from frigate.camera import CameraMetrics
 | 
				
			||||||
from frigate.config import FrigateConfig
 | 
					from frigate.config import FrigateConfig
 | 
				
			||||||
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
 | 
					from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
 | 
				
			||||||
from frigate.embeddings.types import EmbeddingsMetrics
 | 
					 | 
				
			||||||
from frigate.object_detection import ObjectDetectProcess
 | 
					from frigate.object_detection import ObjectDetectProcess
 | 
				
			||||||
 | 
					from frigate.postprocessing.types import PostProcessingMetrics
 | 
				
			||||||
from frigate.types import StatsTrackingTypes
 | 
					from frigate.types import StatsTrackingTypes
 | 
				
			||||||
from frigate.util.services import (
 | 
					from frigate.util.services import (
 | 
				
			||||||
    get_amd_gpu_stats,
 | 
					    get_amd_gpu_stats,
 | 
				
			||||||
@ -52,7 +52,7 @@ def get_latest_version(config: FrigateConfig) -> str:
 | 
				
			|||||||
def stats_init(
 | 
					def stats_init(
 | 
				
			||||||
    config: FrigateConfig,
 | 
					    config: FrigateConfig,
 | 
				
			||||||
    camera_metrics: dict[str, CameraMetrics],
 | 
					    camera_metrics: dict[str, CameraMetrics],
 | 
				
			||||||
    embeddings_metrics: EmbeddingsMetrics | None,
 | 
					    embeddings_metrics: PostProcessingMetrics | None,
 | 
				
			||||||
    detectors: dict[str, ObjectDetectProcess],
 | 
					    detectors: dict[str, ObjectDetectProcess],
 | 
				
			||||||
    processes: dict[str, int],
 | 
					    processes: dict[str, int],
 | 
				
			||||||
) -> StatsTrackingTypes:
 | 
					) -> StatsTrackingTypes:
 | 
				
			||||||
 | 
				
			|||||||
@ -2,13 +2,13 @@ from enum import Enum
 | 
				
			|||||||
from typing import TypedDict
 | 
					from typing import TypedDict
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from frigate.camera import CameraMetrics
 | 
					from frigate.camera import CameraMetrics
 | 
				
			||||||
from frigate.embeddings.types import EmbeddingsMetrics
 | 
					 | 
				
			||||||
from frigate.object_detection import ObjectDetectProcess
 | 
					from frigate.object_detection import ObjectDetectProcess
 | 
				
			||||||
 | 
					from frigate.postprocessing.types import PostProcessingMetrics
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class StatsTrackingTypes(TypedDict):
 | 
					class StatsTrackingTypes(TypedDict):
 | 
				
			||||||
    camera_metrics: dict[str, CameraMetrics]
 | 
					    camera_metrics: dict[str, CameraMetrics]
 | 
				
			||||||
    embeddings_metrics: EmbeddingsMetrics | None
 | 
					    embeddings_metrics: PostProcessingMetrics | None
 | 
				
			||||||
    detectors: dict[str, ObjectDetectProcess]
 | 
					    detectors: dict[str, ObjectDetectProcess]
 | 
				
			||||||
    started: int
 | 
					    started: int
 | 
				
			||||||
    latest_frigate_version: str
 | 
					    latest_frigate_version: str
 | 
				
			||||||
 | 
				
			|||||||
@ -4,13 +4,7 @@ import logging
 | 
				
			|||||||
import os
 | 
					import os
 | 
				
			||||||
from typing import Any
 | 
					from typing import Any
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import cv2
 | 
					 | 
				
			||||||
import numpy as np
 | 
					 | 
				
			||||||
import onnxruntime as ort
 | 
					import onnxruntime as ort
 | 
				
			||||||
from playhouse.sqliteq import SqliteQueueDatabase
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
from frigate.config.semantic_search import FaceRecognitionConfig
 | 
					 | 
				
			||||||
from frigate.const import MODEL_CACHE_DIR
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
try:
 | 
					try:
 | 
				
			||||||
    import openvino as ov
 | 
					    import openvino as ov
 | 
				
			||||||
@ -21,9 +15,6 @@ except ImportError:
 | 
				
			|||||||
logger = logging.getLogger(__name__)
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
MIN_MATCHING_FACES = 2
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def get_ort_providers(
 | 
					def get_ort_providers(
 | 
				
			||||||
    force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
 | 
					    force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
 | 
				
			||||||
) -> tuple[list[str], list[dict[str, any]]]:
 | 
					) -> tuple[list[str], list[dict[str, any]]]:
 | 
				
			||||||
@ -157,181 +148,3 @@ class ONNXModelRunner:
 | 
				
			|||||||
            return [infer_request.get_output_tensor().data]
 | 
					            return [infer_request.get_output_tensor().data]
 | 
				
			||||||
        elif self.type == "ort":
 | 
					        elif self.type == "ort":
 | 
				
			||||||
            return self.ort.run(None, input)
 | 
					            return self.ort.run(None, input)
 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class FaceClassificationModel:
 | 
					 | 
				
			||||||
    def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase):
 | 
					 | 
				
			||||||
        self.config = config
 | 
					 | 
				
			||||||
        self.db = db
 | 
					 | 
				
			||||||
        self.face_detector: cv2.FaceDetectorYN = None
 | 
					 | 
				
			||||||
        self.landmark_detector: cv2.face.FacemarkLBF = None
 | 
					 | 
				
			||||||
        self.face_recognizer: cv2.face.LBPHFaceRecognizer = None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        download_path = os.path.join(MODEL_CACHE_DIR, "facedet")
 | 
					 | 
				
			||||||
        self.model_files = {
 | 
					 | 
				
			||||||
            "facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx",
 | 
					 | 
				
			||||||
            "landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml",
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if not all(
 | 
					 | 
				
			||||||
            os.path.exists(os.path.join(download_path, n))
 | 
					 | 
				
			||||||
            for n in self.model_files.keys()
 | 
					 | 
				
			||||||
        ):
 | 
					 | 
				
			||||||
            # conditionally import ModelDownloader
 | 
					 | 
				
			||||||
            from frigate.util.downloader import ModelDownloader
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            self.downloader = ModelDownloader(
 | 
					 | 
				
			||||||
                model_name="facedet",
 | 
					 | 
				
			||||||
                download_path=download_path,
 | 
					 | 
				
			||||||
                file_names=self.model_files.keys(),
 | 
					 | 
				
			||||||
                download_func=self.__download_models,
 | 
					 | 
				
			||||||
                complete_func=self.__build_detector,
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            self.downloader.ensure_model_files()
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            self.__build_detector()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        self.label_map: dict[int, str] = {}
 | 
					 | 
				
			||||||
        self.__build_classifier()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def __download_models(self, path: str) -> None:
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            file_name = os.path.basename(path)
 | 
					 | 
				
			||||||
            # conditionally import ModelDownloader
 | 
					 | 
				
			||||||
            from frigate.util.downloader import ModelDownloader
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            ModelDownloader.download_from_url(self.model_files[file_name], path)
 | 
					 | 
				
			||||||
        except Exception as e:
 | 
					 | 
				
			||||||
            logger.error(f"Failed to download {path}: {e}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def __build_detector(self) -> None:
 | 
					 | 
				
			||||||
        self.face_detector = cv2.FaceDetectorYN.create(
 | 
					 | 
				
			||||||
            "/config/model_cache/facedet/facedet.onnx",
 | 
					 | 
				
			||||||
            config="",
 | 
					 | 
				
			||||||
            input_size=(320, 320),
 | 
					 | 
				
			||||||
            score_threshold=0.8,
 | 
					 | 
				
			||||||
            nms_threshold=0.3,
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        self.landmark_detector = cv2.face.createFacemarkLBF()
 | 
					 | 
				
			||||||
        self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def __build_classifier(self) -> None:
 | 
					 | 
				
			||||||
        if not self.landmark_detector:
 | 
					 | 
				
			||||||
            return None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        labels = []
 | 
					 | 
				
			||||||
        faces = []
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        dir = "/media/frigate/clips/faces"
 | 
					 | 
				
			||||||
        for idx, name in enumerate(os.listdir(dir)):
 | 
					 | 
				
			||||||
            if name == "train":
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            face_folder = os.path.join(dir, name)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if not os.path.isdir(face_folder):
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            self.label_map[idx] = name
 | 
					 | 
				
			||||||
            for image in os.listdir(face_folder):
 | 
					 | 
				
			||||||
                img = cv2.imread(os.path.join(face_folder, image))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                if img is None:
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
 | 
					 | 
				
			||||||
                img = self.__align_face(img, img.shape[1], img.shape[0])
 | 
					 | 
				
			||||||
                faces.append(img)
 | 
					 | 
				
			||||||
                labels.append(idx)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        self.recognizer: cv2.face.LBPHFaceRecognizer = (
 | 
					 | 
				
			||||||
            cv2.face.LBPHFaceRecognizer_create(
 | 
					 | 
				
			||||||
                radius=2, threshold=(1 - self.config.min_score) * 1000
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        self.recognizer.train(faces, np.array(labels))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def __align_face(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        image: np.ndarray,
 | 
					 | 
				
			||||||
        output_width: int,
 | 
					 | 
				
			||||||
        output_height: int,
 | 
					 | 
				
			||||||
    ) -> np.ndarray:
 | 
					 | 
				
			||||||
        _, lands = self.landmark_detector.fit(
 | 
					 | 
				
			||||||
            image, np.array([(0, 0, image.shape[1], image.shape[0])])
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        landmarks = lands[0][0]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # get landmarks for eyes
 | 
					 | 
				
			||||||
        leftEyePts = landmarks[42:48]
 | 
					 | 
				
			||||||
        rightEyePts = landmarks[36:42]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # compute the center of mass for each eye
 | 
					 | 
				
			||||||
        leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
 | 
					 | 
				
			||||||
        rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # compute the angle between the eye centroids
 | 
					 | 
				
			||||||
        dY = rightEyeCenter[1] - leftEyeCenter[1]
 | 
					 | 
				
			||||||
        dX = rightEyeCenter[0] - leftEyeCenter[0]
 | 
					 | 
				
			||||||
        angle = np.degrees(np.arctan2(dY, dX)) - 180
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # compute the desired right eye x-coordinate based on the
 | 
					 | 
				
			||||||
        # desired x-coordinate of the left eye
 | 
					 | 
				
			||||||
        desiredRightEyeX = 1.0 - 0.35
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # determine the scale of the new resulting image by taking
 | 
					 | 
				
			||||||
        # the ratio of the distance between eyes in the *current*
 | 
					 | 
				
			||||||
        # image to the ratio of distance between eyes in the
 | 
					 | 
				
			||||||
        # *desired* image
 | 
					 | 
				
			||||||
        dist = np.sqrt((dX**2) + (dY**2))
 | 
					 | 
				
			||||||
        desiredDist = desiredRightEyeX - 0.35
 | 
					 | 
				
			||||||
        desiredDist *= output_width
 | 
					 | 
				
			||||||
        scale = desiredDist / dist
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # compute center (x, y)-coordinates (i.e., the median point)
 | 
					 | 
				
			||||||
        # between the two eyes in the input image
 | 
					 | 
				
			||||||
        # grab the rotation matrix for rotating and scaling the face
 | 
					 | 
				
			||||||
        eyesCenter = (
 | 
					 | 
				
			||||||
            int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
 | 
					 | 
				
			||||||
            int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # update the translation component of the matrix
 | 
					 | 
				
			||||||
        tX = output_width * 0.5
 | 
					 | 
				
			||||||
        tY = output_height * 0.35
 | 
					 | 
				
			||||||
        M[0, 2] += tX - eyesCenter[0]
 | 
					 | 
				
			||||||
        M[1, 2] += tY - eyesCenter[1]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # apply the affine transformation
 | 
					 | 
				
			||||||
        return cv2.warpAffine(
 | 
					 | 
				
			||||||
            image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def clear_classifier(self) -> None:
 | 
					 | 
				
			||||||
        self.face_recognizer = None
 | 
					 | 
				
			||||||
        self.label_map = {}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def detect_faces(self, input: np.ndarray) -> tuple[int, cv2.typing.MatLike] | None:
 | 
					 | 
				
			||||||
        if not self.face_detector:
 | 
					 | 
				
			||||||
            return None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        self.face_detector.setInputSize((input.shape[1], input.shape[0]))
 | 
					 | 
				
			||||||
        return self.face_detector.detect(input)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None:
 | 
					 | 
				
			||||||
        if not self.landmark_detector:
 | 
					 | 
				
			||||||
            return None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if not self.label_map:
 | 
					 | 
				
			||||||
            self.__build_classifier()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
 | 
					 | 
				
			||||||
        img = self.__align_face(img, img.shape[1], img.shape[0])
 | 
					 | 
				
			||||||
        index, distance = self.recognizer.predict(img)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if index == -1:
 | 
					 | 
				
			||||||
            return None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        score = 1.0 - (distance / 1000)
 | 
					 | 
				
			||||||
        return self.label_map[index], round(score, 2)
 | 
					 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user