From e23c0461360828b41351f89eb8829d5f0143c805 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Fri, 10 Jan 2025 12:44:30 -0700 Subject: [PATCH] Processing refactor (#15935) * Refactor post processor to be real time processor * Build out generic API for post processing * Cleanup * Fix --- frigate/app.py | 6 +-- frigate/data_processing/post/api.py | 43 +++++++++++++++++++ .../real_time/api.py} | 8 ++-- .../real_time}/face_processor.py | 8 ++-- .../types.py | 9 +++- frigate/embeddings/__init__.py | 4 +- frigate/embeddings/embeddings.py | 4 +- frigate/embeddings/maintainer.py | 10 ++--- frigate/stats/util.py | 4 +- frigate/types.py | 4 +- 10 files changed, 76 insertions(+), 24 deletions(-) create mode 100644 frigate/data_processing/post/api.py rename frigate/{postprocessing/processor_api.py => data_processing/real_time/api.py} (84%) rename frigate/{postprocessing => data_processing/real_time}/face_processor.py (98%) rename frigate/{postprocessing => data_processing}/types.py (72%) diff --git a/frigate/app.py b/frigate/app.py index e3f2f9d7f..1aecce2c3 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -39,6 +39,7 @@ from frigate.const import ( RECORD_DIR, SHM_FRAMES_VAR, ) +from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.embeddings import EmbeddingsContext, manage_embeddings from frigate.events.audio import AudioProcessor @@ -59,7 +60,6 @@ from frigate.models import ( from frigate.object_detection import ObjectDetectProcess from frigate.object_processing import TrackedObjectProcessor from frigate.output.output import output_frames -from frigate.postprocessing.types import PostProcessingMetrics from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController from frigate.record.cleanup import RecordingCleanup @@ -90,8 +90,8 @@ class FrigateApp: self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() self.camera_metrics: dict[str, CameraMetrics] = {} - self.embeddings_metrics: PostProcessingMetrics | None = ( - PostProcessingMetrics() if config.semantic_search.enabled else None + self.embeddings_metrics: DataProcessorMetrics | None = ( + DataProcessorMetrics() if config.semantic_search.enabled else None ) self.ptz_metrics: dict[str, PTZMetrics] = {} self.processes: dict[str, int] = {} diff --git a/frigate/data_processing/post/api.py b/frigate/data_processing/post/api.py new file mode 100644 index 000000000..5c88221c2 --- /dev/null +++ b/frigate/data_processing/post/api.py @@ -0,0 +1,43 @@ +"""Local or remote processors to handle post processing.""" + +import logging +from abc import ABC, abstractmethod + +from frigate.config import FrigateConfig + +from ..types import DataProcessorMetrics, PostProcessDataEnum + +logger = logging.getLogger(__name__) + + +class PostProcessorApi(ABC): + @abstractmethod + def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None: + self.config = config + self.metrics = metrics + pass + + @abstractmethod + def process_data( + self, data: dict[str, any], data_type: PostProcessDataEnum + ) -> None: + """Processes the data of data type. + Args: + data (dict): containing data about the input. + data_type (enum): Describing the data that is being processed. + + Returns: + None. + """ + pass + + @abstractmethod + def handle_request(self, request_data: dict[str, any]) -> dict[str, any] | None: + """Handle metadata requests. + Args: + request_data (dict): containing data about requested change to process. + + Returns: + None if request was not handled, otherwise return response. + """ + pass diff --git a/frigate/postprocessing/processor_api.py b/frigate/data_processing/real_time/api.py similarity index 84% rename from frigate/postprocessing/processor_api.py rename to frigate/data_processing/real_time/api.py index 974b6f1ee..7f80b5287 100644 --- a/frigate/postprocessing/processor_api.py +++ b/frigate/data_processing/real_time/api.py @@ -1,3 +1,5 @@ +"""Local only processors for handling real time object processing.""" + import logging from abc import ABC, abstractmethod @@ -5,14 +7,14 @@ import numpy as np from frigate.config import FrigateConfig -from .types import PostProcessingMetrics +from ..types import DataProcessorMetrics logger = logging.getLogger(__name__) -class ProcessorApi(ABC): +class RealTimeProcessorApi(ABC): @abstractmethod - def __init__(self, config: FrigateConfig, metrics: PostProcessingMetrics) -> None: + def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None: self.config = config self.metrics = metrics pass diff --git a/frigate/postprocessing/face_processor.py b/frigate/data_processing/real_time/face_processor.py similarity index 98% rename from frigate/postprocessing/face_processor.py rename to frigate/data_processing/real_time/face_processor.py index a75158eb2..2b12e9994 100644 --- a/frigate/postprocessing/face_processor.py +++ b/frigate/data_processing/real_time/face_processor.py @@ -16,8 +16,8 @@ from frigate.config import FrigateConfig from frigate.const import FACE_DIR, FRIGATE_LOCALHOST, MODEL_CACHE_DIR from frigate.util.image import area -from .processor_api import ProcessorApi -from .types import PostProcessingMetrics +from ..types import DataProcessorMetrics +from .api import RealTimeProcessorApi logger = logging.getLogger(__name__) @@ -25,8 +25,8 @@ logger = logging.getLogger(__name__) MIN_MATCHING_FACES = 2 -class FaceProcessor(ProcessorApi): - def __init__(self, config: FrigateConfig, metrics: PostProcessingMetrics): +class FaceProcessor(RealTimeProcessorApi): + def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): super().__init__(config, metrics) self.face_config = config.face_recognition self.face_detector: cv2.FaceDetectorYN = None diff --git a/frigate/postprocessing/types.py b/frigate/data_processing/types.py similarity index 72% rename from frigate/postprocessing/types.py rename to frigate/data_processing/types.py index 464658219..39f355667 100644 --- a/frigate/postprocessing/types.py +++ b/frigate/data_processing/types.py @@ -1,10 +1,11 @@ """Embeddings types.""" import multiprocessing as mp +from enum import Enum from multiprocessing.sharedctypes import Synchronized -class PostProcessingMetrics: +class DataProcessorMetrics: image_embeddings_fps: Synchronized text_embeddings_sps: Synchronized face_rec_fps: Synchronized @@ -15,3 +16,9 @@ class PostProcessingMetrics: self.text_embeddings_sps = mp.Value("d", 0.01) self.face_rec_fps = mp.Value("d", 0.01) self.alpr_pps = mp.Value("d", 0.01) + + +class PostProcessDataEnum(str, Enum): + recording = "recording" + review = "review" + tracked_object = "tracked_object" diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index d75d88500..dd05fb0ca 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -15,19 +15,19 @@ from setproctitle import setproctitle from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor from frigate.config import FrigateConfig from frigate.const import CONFIG_DIR, FACE_DIR +from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.models import Event from frigate.util.builtin import serialize from frigate.util.services import listen -from ..postprocessing.types import PostProcessingMetrics from .maintainer import EmbeddingMaintainer from .util import ZScoreNormalization logger = logging.getLogger(__name__) -def manage_embeddings(config: FrigateConfig, metrics: PostProcessingMetrics) -> None: +def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> None: # Only initialize embeddings if semantic search is enabled if not config.semantic_search.enabled: return diff --git a/frigate/embeddings/embeddings.py b/frigate/embeddings/embeddings.py index 3f046d0c3..852806a8d 100644 --- a/frigate/embeddings/embeddings.py +++ b/frigate/embeddings/embeddings.py @@ -16,12 +16,12 @@ from frigate.const import ( UPDATE_EMBEDDINGS_REINDEX_PROGRESS, UPDATE_MODEL_STATE, ) +from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.models import Event from frigate.types import ModelStatusTypesEnum from frigate.util.builtin import serialize -from ..postprocessing.types import PostProcessingMetrics from .functions.onnx import GenericONNXEmbedding, ModelTypeEnum logger = logging.getLogger(__name__) @@ -65,7 +65,7 @@ class Embeddings: self, config: FrigateConfig, db: SqliteVecQueueDatabase, - metrics: PostProcessingMetrics, + metrics: DataProcessorMetrics, ) -> None: self.config = config self.db = db diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 5eb06358d..a7e25469b 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -29,17 +29,17 @@ from frigate.const import ( FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION, ) +from frigate.data_processing.real_time.api import RealTimeProcessorApi +from frigate.data_processing.real_time.face_processor import FaceProcessor +from frigate.data_processing.types import DataProcessorMetrics from frigate.embeddings.lpr.lpr import LicensePlateRecognition from frigate.events.types import EventTypeEnum from frigate.genai import get_genai_client from frigate.models import Event -from frigate.postprocessing.face_processor import FaceProcessor -from frigate.postprocessing.processor_api import ProcessorApi from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import serialize from frigate.util.image import SharedMemoryFrameManager, area, calculate_region -from ..postprocessing.types import PostProcessingMetrics from .embeddings import Embeddings logger = logging.getLogger(__name__) @@ -54,7 +54,7 @@ class EmbeddingMaintainer(threading.Thread): self, db: SqliteQueueDatabase, config: FrigateConfig, - metrics: PostProcessingMetrics, + metrics: DataProcessorMetrics, stop_event: MpEvent, ) -> None: super().__init__(name="embeddings_maintainer") @@ -73,7 +73,7 @@ class EmbeddingMaintainer(threading.Thread): ) self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() - self.processors: list[ProcessorApi] = [] + self.processors: list[RealTimeProcessorApi] = [] if self.config.face_recognition.enabled: self.processors.append(FaceProcessor(self.config, metrics)) diff --git a/frigate/stats/util.py b/frigate/stats/util.py index ec1bc0683..262cec3d2 100644 --- a/frigate/stats/util.py +++ b/frigate/stats/util.py @@ -14,8 +14,8 @@ from requests.exceptions import RequestException from frigate.camera import CameraMetrics from frigate.config import FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR +from frigate.data_processing.types import DataProcessorMetrics from frigate.object_detection import ObjectDetectProcess -from frigate.postprocessing.types import PostProcessingMetrics from frigate.types import StatsTrackingTypes from frigate.util.services import ( get_amd_gpu_stats, @@ -52,7 +52,7 @@ def get_latest_version(config: FrigateConfig) -> str: def stats_init( config: FrigateConfig, camera_metrics: dict[str, CameraMetrics], - embeddings_metrics: PostProcessingMetrics | None, + embeddings_metrics: DataProcessorMetrics | None, detectors: dict[str, ObjectDetectProcess], processes: dict[str, int], ) -> StatsTrackingTypes: diff --git a/frigate/types.py b/frigate/types.py index f375430e2..4d3fe96b3 100644 --- a/frigate/types.py +++ b/frigate/types.py @@ -2,13 +2,13 @@ from enum import Enum from typing import TypedDict from frigate.camera import CameraMetrics +from frigate.data_processing.types import DataProcessorMetrics from frigate.object_detection import ObjectDetectProcess -from frigate.postprocessing.types import PostProcessingMetrics class StatsTrackingTypes(TypedDict): camera_metrics: dict[str, CameraMetrics] - embeddings_metrics: PostProcessingMetrics | None + embeddings_metrics: DataProcessorMetrics | None detectors: dict[str, ObjectDetectProcess] started: int latest_frigate_version: str