From 8a1da3a89fc16a4ea8d75422068de162b97bbbd1 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Fri, 23 May 2025 08:46:53 -0600 Subject: [PATCH] Initial custom classification model config support (#18362) * Add basic config for defining a teachable machine model * Add model type * Add basic config for teachable machine models * Adjust config for state and object * Use config to process * Correctly check for objects * Remove debug * Rename to not be teachable machine specific * Cleanup --- frigate/config/classification.py | 27 +++ .../real_time/custom_classification.py | 178 ++++++++++++++++++ frigate/embeddings/maintainer.py | 29 ++- 3 files changed, 229 insertions(+), 5 deletions(-) create mode 100644 frigate/data_processing/real_time/custom_classification.py diff --git a/frigate/config/classification.py b/frigate/config/classification.py index 4c34f8ab3..284136076 100644 --- a/frigate/config/classification.py +++ b/frigate/config/classification.py @@ -34,10 +34,37 @@ class BirdClassificationConfig(FrigateBaseModel): ) +class CustomClassificationStateCameraConfig(FrigateBaseModel): + crop: list[int, int, int, int] = Field( + title="Crop of image frame on this camera to run classification on." + ) + + +class CustomClassificationStateConfig(FrigateBaseModel): + cameras: Dict[str, CustomClassificationStateCameraConfig] = Field( + title="Cameras to run classification on." + ) + + +class CustomClassificationObjectConfig(FrigateBaseModel): + objects: list[str] = Field(title="Object types to classify.") + + +class CustomClassificationConfig(FrigateBaseModel): + enabled: bool = Field(default=True, title="Enable running the model.") + model_path: str = Field(title="Path to custom classification tflite model.") + labelmap_path: str = Field(title="Path to custom classification model labelmap.") + object_config: CustomClassificationObjectConfig | None = Field(default=None) + state_config: CustomClassificationStateConfig | None = Field(default=None) + + class ClassificationConfig(FrigateBaseModel): bird: BirdClassificationConfig = Field( default_factory=BirdClassificationConfig, title="Bird classification config." ) + custom: Dict[str, CustomClassificationConfig] = Field( + default={}, title="Custom Classification Model Configs." + ) class SemanticSearchConfig(FrigateBaseModel): diff --git a/frigate/data_processing/real_time/custom_classification.py b/frigate/data_processing/real_time/custom_classification.py new file mode 100644 index 000000000..1848968bb --- /dev/null +++ b/frigate/data_processing/real_time/custom_classification.py @@ -0,0 +1,178 @@ +"""Real time processor that works with classification tflite models.""" + +import logging +from typing import Any + +import cv2 +import numpy as np + +from frigate.comms.event_metadata_updater import ( + EventMetadataPublisher, + EventMetadataTypeEnum, +) +from frigate.config import FrigateConfig +from frigate.config.classification import CustomClassificationConfig +from frigate.util.builtin import load_labels +from frigate.util.object import calculate_region + +from ..types import DataProcessorMetrics +from .api import RealTimeProcessorApi + +try: + from tflite_runtime.interpreter import Interpreter +except ModuleNotFoundError: + from tensorflow.lite.python.interpreter import Interpreter + +logger = logging.getLogger(__name__) + + +class CustomStateClassificationProcessor(RealTimeProcessorApi): + def __init__( + self, + config: FrigateConfig, + model_config: CustomClassificationConfig, + metrics: DataProcessorMetrics, + ): + super().__init__(config, metrics) + self.model_config = model_config + self.interpreter: Interpreter = None + self.tensor_input_details: dict[str, Any] = None + self.tensor_output_details: dict[str, Any] = None + self.labelmap: dict[int, str] = {} + self.__build_detector() + + def __build_detector(self) -> None: + self.interpreter = Interpreter( + model_path=self.model_config.model_path, + num_threads=2, + ) + self.interpreter.allocate_tensors() + self.tensor_input_details = self.interpreter.get_input_details() + self.tensor_output_details = self.interpreter.get_output_details() + self.labelmap = load_labels(self.model_config.labelmap_path, prefill=0) + + def process_frame(self, frame_data: dict[str, Any], frame: np.ndarray): + camera = frame_data.get("camera") + if camera not in self.model_config.state_config.cameras: + return + + camera_config = self.model_config.state_config.cameras[camera] + x, y, x2, y2 = calculate_region( + frame.shape, + camera_config.crop[0], + camera_config.crop[1], + camera_config.crop[2], + camera_config.crop[3], + 224, + 1.0, + ) + + rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) + input = rgb[ + y:y2, + x:x2, + ] + + if input.shape != (224, 224): + input = cv2.resize(input, (224, 224)) + + input = np.expand_dims(input, axis=0) + self.interpreter.set_tensor(self.tensor_input_details[0]["index"], input) + self.interpreter.invoke() + res: np.ndarray = self.interpreter.get_tensor( + self.tensor_output_details[0]["index"] + )[0] + print(f"the gate res is {res}") + probs = res / res.sum(axis=0) + best_id = np.argmax(probs) + score = round(probs[best_id], 2) + + print(f"got {self.labelmap[best_id]} with score {score}") + + def handle_request(self, topic, request_data): + return None + + def expire_object(self, object_id, camera): + pass + + +class CustomObjectClassificationProcessor(RealTimeProcessorApi): + def __init__( + self, + config: FrigateConfig, + model_config: CustomClassificationConfig, + sub_label_publisher: EventMetadataPublisher, + metrics: DataProcessorMetrics, + ): + super().__init__(config, metrics) + self.model_config = model_config + self.interpreter: Interpreter = None + self.sub_label_publisher = sub_label_publisher + self.tensor_input_details: dict[str, Any] = None + self.tensor_output_details: dict[str, Any] = None + self.detected_objects: dict[str, float] = {} + self.labelmap: dict[int, str] = {} + self.__build_detector() + + def __build_detector(self) -> None: + self.interpreter = Interpreter( + model_path=self.model_config.model_path, + num_threads=2, + ) + self.interpreter.allocate_tensors() + self.tensor_input_details = self.interpreter.get_input_details() + self.tensor_output_details = self.interpreter.get_output_details() + self.labelmap = load_labels(self.model_config.labelmap_path, prefill=0) + + def process_frame(self, obj_data, frame): + if obj_data["label"] not in self.model_config.object_config.objects: + return + + x, y, x2, y2 = calculate_region( + frame.shape, + obj_data["box"][0], + obj_data["box"][1], + obj_data["box"][2], + obj_data["box"][3], + 224, + 1.0, + ) + + rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) + input = rgb[ + y:y2, + x:x2, + ] + + if input.shape != (224, 224): + input = cv2.resize(input, (224, 224)) + + input = np.expand_dims(input, axis=0) + self.interpreter.set_tensor(self.tensor_input_details[0]["index"], input) + self.interpreter.invoke() + res: np.ndarray = self.interpreter.get_tensor( + self.tensor_output_details[0]["index"] + )[0] + probs = res / res.sum(axis=0) + best_id = np.argmax(probs) + + score = round(probs[best_id], 2) + + previous_score = self.detected_objects.get(obj_data["id"], 0.0) + + if score <= previous_score: + logger.debug(f"Score {score} is worse than previous score {previous_score}") + return + + self.sub_label_publisher.publish( + EventMetadataTypeEnum.sub_label, + (obj_data["id"], self.labelmap[best_id], score), + ) + self.detected_objects[obj_data["id"]] = score + + def handle_request(self, topic, request_data): + return None + + def expire_object(self, object_id, camera): + if object_id in self.detected_objects: + self.detected_objects.pop(object_id) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 86bc75737..9838f4a21 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -42,6 +42,10 @@ from frigate.data_processing.post.license_plate import ( ) from frigate.data_processing.real_time.api import RealTimeProcessorApi from frigate.data_processing.real_time.bird import BirdRealTimeProcessor +from frigate.data_processing.real_time.custom_classification import ( + CustomObjectClassificationProcessor, + CustomStateClassificationProcessor, +) from frigate.data_processing.real_time.face import FaceRealTimeProcessor from frigate.data_processing.real_time.license_plate import ( LicensePlateRealTimeProcessor, @@ -143,6 +147,18 @@ class EmbeddingMaintainer(threading.Thread): ) ) + for model in self.config.classification.custom.values(): + self.realtime_processors.append( + CustomStateClassificationProcessor(self.config, model, self.metrics) + if model.state_config != None + else CustomObjectClassificationProcessor( + self.config, + model, + self.event_metadata_publisher, + self.metrics, + ) + ) + # post processors self.post_processors: list[PostProcessorApi] = [] @@ -172,7 +188,7 @@ class EmbeddingMaintainer(threading.Thread): self._process_requests() self._process_updates() self._process_recordings_updates() - self._process_dedicated_lpr() + self._process_frame_updates() self._expire_dedicated_lpr() self._process_finalized() self._process_event_metadata() @@ -449,7 +465,7 @@ class EmbeddingMaintainer(threading.Thread): event_id, RegenerateDescriptionEnum(source) ) - def _process_dedicated_lpr(self) -> None: + def _process_frame_updates(self) -> None: """Process event updates""" (topic, data) = self.detection_subscriber.check_for_update() @@ -458,7 +474,7 @@ class EmbeddingMaintainer(threading.Thread): camera, frame_name, _, _, motion_boxes, _ = data - if not camera or not self.config.lpr.enabled or len(motion_boxes) == 0: + if not camera or len(motion_boxes) == 0: return camera_config = self.config.cameras[camera] @@ -466,8 +482,8 @@ class EmbeddingMaintainer(threading.Thread): if ( camera_config.type != CameraTypeEnum.lpr or "license_plate" in camera_config.objects.track - ): - # we're not a dedicated lpr camera or we are one but we're using frigate+ + ) and len(self.config.classification.custom) == 0: + # no active features that use this data return try: @@ -487,6 +503,9 @@ class EmbeddingMaintainer(threading.Thread): if isinstance(processor, LicensePlateRealTimeProcessor): processor.process_frame(camera, yuv_frame, True) + if isinstance(processor, CustomStateClassificationProcessor): + processor.process_frame({"camera": camera}, yuv_frame) + self.frame_manager.close(frame_name) def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: