mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-08-27 13:47:50 +02:00
Initial custom classification model config support (#18362)
* Add basic config for defining a teachable machine model * Add model type * Add basic config for teachable machine models * Adjust config for state and object * Use config to process * Correctly check for objects * Remove debug * Rename to not be teachable machine specific * Cleanup
This commit is contained in:
parent
f709042481
commit
492ef4eb10
@ -34,10 +34,37 @@ class BirdClassificationConfig(FrigateBaseModel):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CustomClassificationStateCameraConfig(FrigateBaseModel):
|
||||||
|
crop: list[int, int, int, int] = Field(
|
||||||
|
title="Crop of image frame on this camera to run classification on."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CustomClassificationStateConfig(FrigateBaseModel):
|
||||||
|
cameras: Dict[str, CustomClassificationStateCameraConfig] = Field(
|
||||||
|
title="Cameras to run classification on."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CustomClassificationObjectConfig(FrigateBaseModel):
|
||||||
|
objects: list[str] = Field(title="Object types to classify.")
|
||||||
|
|
||||||
|
|
||||||
|
class CustomClassificationConfig(FrigateBaseModel):
|
||||||
|
enabled: bool = Field(default=True, title="Enable running the model.")
|
||||||
|
model_path: str = Field(title="Path to custom classification tflite model.")
|
||||||
|
labelmap_path: str = Field(title="Path to custom classification model labelmap.")
|
||||||
|
object_config: CustomClassificationObjectConfig | None = Field(default=None)
|
||||||
|
state_config: CustomClassificationStateConfig | None = Field(default=None)
|
||||||
|
|
||||||
|
|
||||||
class ClassificationConfig(FrigateBaseModel):
|
class ClassificationConfig(FrigateBaseModel):
|
||||||
bird: BirdClassificationConfig = Field(
|
bird: BirdClassificationConfig = Field(
|
||||||
default_factory=BirdClassificationConfig, title="Bird classification config."
|
default_factory=BirdClassificationConfig, title="Bird classification config."
|
||||||
)
|
)
|
||||||
|
custom: Dict[str, CustomClassificationConfig] = Field(
|
||||||
|
default={}, title="Custom Classification Model Configs."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SemanticSearchConfig(FrigateBaseModel):
|
class SemanticSearchConfig(FrigateBaseModel):
|
||||||
|
178
frigate/data_processing/real_time/custom_classification.py
Normal file
178
frigate/data_processing/real_time/custom_classification.py
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
"""Real time processor that works with classification tflite models."""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from frigate.comms.event_metadata_updater import (
|
||||||
|
EventMetadataPublisher,
|
||||||
|
EventMetadataTypeEnum,
|
||||||
|
)
|
||||||
|
from frigate.config import FrigateConfig
|
||||||
|
from frigate.config.classification import CustomClassificationConfig
|
||||||
|
from frigate.util.builtin import load_labels
|
||||||
|
from frigate.util.object import calculate_region
|
||||||
|
|
||||||
|
from ..types import DataProcessorMetrics
|
||||||
|
from .api import RealTimeProcessorApi
|
||||||
|
|
||||||
|
try:
|
||||||
|
from tflite_runtime.interpreter import Interpreter
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
from tensorflow.lite.python.interpreter import Interpreter
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class CustomStateClassificationProcessor(RealTimeProcessorApi):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: FrigateConfig,
|
||||||
|
model_config: CustomClassificationConfig,
|
||||||
|
metrics: DataProcessorMetrics,
|
||||||
|
):
|
||||||
|
super().__init__(config, metrics)
|
||||||
|
self.model_config = model_config
|
||||||
|
self.interpreter: Interpreter = None
|
||||||
|
self.tensor_input_details: dict[str, Any] = None
|
||||||
|
self.tensor_output_details: dict[str, Any] = None
|
||||||
|
self.labelmap: dict[int, str] = {}
|
||||||
|
self.__build_detector()
|
||||||
|
|
||||||
|
def __build_detector(self) -> None:
|
||||||
|
self.interpreter = Interpreter(
|
||||||
|
model_path=self.model_config.model_path,
|
||||||
|
num_threads=2,
|
||||||
|
)
|
||||||
|
self.interpreter.allocate_tensors()
|
||||||
|
self.tensor_input_details = self.interpreter.get_input_details()
|
||||||
|
self.tensor_output_details = self.interpreter.get_output_details()
|
||||||
|
self.labelmap = load_labels(self.model_config.labelmap_path, prefill=0)
|
||||||
|
|
||||||
|
def process_frame(self, frame_data: dict[str, Any], frame: np.ndarray):
|
||||||
|
camera = frame_data.get("camera")
|
||||||
|
if camera not in self.model_config.state_config.cameras:
|
||||||
|
return
|
||||||
|
|
||||||
|
camera_config = self.model_config.state_config.cameras[camera]
|
||||||
|
x, y, x2, y2 = calculate_region(
|
||||||
|
frame.shape,
|
||||||
|
camera_config.crop[0],
|
||||||
|
camera_config.crop[1],
|
||||||
|
camera_config.crop[2],
|
||||||
|
camera_config.crop[3],
|
||||||
|
224,
|
||||||
|
1.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
|
||||||
|
input = rgb[
|
||||||
|
y:y2,
|
||||||
|
x:x2,
|
||||||
|
]
|
||||||
|
|
||||||
|
if input.shape != (224, 224):
|
||||||
|
input = cv2.resize(input, (224, 224))
|
||||||
|
|
||||||
|
input = np.expand_dims(input, axis=0)
|
||||||
|
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], input)
|
||||||
|
self.interpreter.invoke()
|
||||||
|
res: np.ndarray = self.interpreter.get_tensor(
|
||||||
|
self.tensor_output_details[0]["index"]
|
||||||
|
)[0]
|
||||||
|
print(f"the gate res is {res}")
|
||||||
|
probs = res / res.sum(axis=0)
|
||||||
|
best_id = np.argmax(probs)
|
||||||
|
score = round(probs[best_id], 2)
|
||||||
|
|
||||||
|
print(f"got {self.labelmap[best_id]} with score {score}")
|
||||||
|
|
||||||
|
def handle_request(self, topic, request_data):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def expire_object(self, object_id, camera):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CustomObjectClassificationProcessor(RealTimeProcessorApi):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: FrigateConfig,
|
||||||
|
model_config: CustomClassificationConfig,
|
||||||
|
sub_label_publisher: EventMetadataPublisher,
|
||||||
|
metrics: DataProcessorMetrics,
|
||||||
|
):
|
||||||
|
super().__init__(config, metrics)
|
||||||
|
self.model_config = model_config
|
||||||
|
self.interpreter: Interpreter = None
|
||||||
|
self.sub_label_publisher = sub_label_publisher
|
||||||
|
self.tensor_input_details: dict[str, Any] = None
|
||||||
|
self.tensor_output_details: dict[str, Any] = None
|
||||||
|
self.detected_objects: dict[str, float] = {}
|
||||||
|
self.labelmap: dict[int, str] = {}
|
||||||
|
self.__build_detector()
|
||||||
|
|
||||||
|
def __build_detector(self) -> None:
|
||||||
|
self.interpreter = Interpreter(
|
||||||
|
model_path=self.model_config.model_path,
|
||||||
|
num_threads=2,
|
||||||
|
)
|
||||||
|
self.interpreter.allocate_tensors()
|
||||||
|
self.tensor_input_details = self.interpreter.get_input_details()
|
||||||
|
self.tensor_output_details = self.interpreter.get_output_details()
|
||||||
|
self.labelmap = load_labels(self.model_config.labelmap_path, prefill=0)
|
||||||
|
|
||||||
|
def process_frame(self, obj_data, frame):
|
||||||
|
if obj_data["label"] not in self.model_config.object_config.objects:
|
||||||
|
return
|
||||||
|
|
||||||
|
x, y, x2, y2 = calculate_region(
|
||||||
|
frame.shape,
|
||||||
|
obj_data["box"][0],
|
||||||
|
obj_data["box"][1],
|
||||||
|
obj_data["box"][2],
|
||||||
|
obj_data["box"][3],
|
||||||
|
224,
|
||||||
|
1.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
|
||||||
|
input = rgb[
|
||||||
|
y:y2,
|
||||||
|
x:x2,
|
||||||
|
]
|
||||||
|
|
||||||
|
if input.shape != (224, 224):
|
||||||
|
input = cv2.resize(input, (224, 224))
|
||||||
|
|
||||||
|
input = np.expand_dims(input, axis=0)
|
||||||
|
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], input)
|
||||||
|
self.interpreter.invoke()
|
||||||
|
res: np.ndarray = self.interpreter.get_tensor(
|
||||||
|
self.tensor_output_details[0]["index"]
|
||||||
|
)[0]
|
||||||
|
probs = res / res.sum(axis=0)
|
||||||
|
best_id = np.argmax(probs)
|
||||||
|
|
||||||
|
score = round(probs[best_id], 2)
|
||||||
|
|
||||||
|
previous_score = self.detected_objects.get(obj_data["id"], 0.0)
|
||||||
|
|
||||||
|
if score <= previous_score:
|
||||||
|
logger.debug(f"Score {score} is worse than previous score {previous_score}")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.sub_label_publisher.publish(
|
||||||
|
EventMetadataTypeEnum.sub_label,
|
||||||
|
(obj_data["id"], self.labelmap[best_id], score),
|
||||||
|
)
|
||||||
|
self.detected_objects[obj_data["id"]] = score
|
||||||
|
|
||||||
|
def handle_request(self, topic, request_data):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def expire_object(self, object_id, camera):
|
||||||
|
if object_id in self.detected_objects:
|
||||||
|
self.detected_objects.pop(object_id)
|
@ -42,6 +42,10 @@ from frigate.data_processing.post.license_plate import (
|
|||||||
)
|
)
|
||||||
from frigate.data_processing.real_time.api import RealTimeProcessorApi
|
from frigate.data_processing.real_time.api import RealTimeProcessorApi
|
||||||
from frigate.data_processing.real_time.bird import BirdRealTimeProcessor
|
from frigate.data_processing.real_time.bird import BirdRealTimeProcessor
|
||||||
|
from frigate.data_processing.real_time.custom_classification import (
|
||||||
|
CustomObjectClassificationProcessor,
|
||||||
|
CustomStateClassificationProcessor,
|
||||||
|
)
|
||||||
from frigate.data_processing.real_time.face import FaceRealTimeProcessor
|
from frigate.data_processing.real_time.face import FaceRealTimeProcessor
|
||||||
from frigate.data_processing.real_time.license_plate import (
|
from frigate.data_processing.real_time.license_plate import (
|
||||||
LicensePlateRealTimeProcessor,
|
LicensePlateRealTimeProcessor,
|
||||||
@ -143,6 +147,18 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for model in self.config.classification.custom.values():
|
||||||
|
self.realtime_processors.append(
|
||||||
|
CustomStateClassificationProcessor(self.config, model, self.metrics)
|
||||||
|
if model.state_config != None
|
||||||
|
else CustomObjectClassificationProcessor(
|
||||||
|
self.config,
|
||||||
|
model,
|
||||||
|
self.event_metadata_publisher,
|
||||||
|
self.metrics,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# post processors
|
# post processors
|
||||||
self.post_processors: list[PostProcessorApi] = []
|
self.post_processors: list[PostProcessorApi] = []
|
||||||
|
|
||||||
@ -172,7 +188,7 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
self._process_requests()
|
self._process_requests()
|
||||||
self._process_updates()
|
self._process_updates()
|
||||||
self._process_recordings_updates()
|
self._process_recordings_updates()
|
||||||
self._process_dedicated_lpr()
|
self._process_frame_updates()
|
||||||
self._expire_dedicated_lpr()
|
self._expire_dedicated_lpr()
|
||||||
self._process_finalized()
|
self._process_finalized()
|
||||||
self._process_event_metadata()
|
self._process_event_metadata()
|
||||||
@ -449,7 +465,7 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
event_id, RegenerateDescriptionEnum(source)
|
event_id, RegenerateDescriptionEnum(source)
|
||||||
)
|
)
|
||||||
|
|
||||||
def _process_dedicated_lpr(self) -> None:
|
def _process_frame_updates(self) -> None:
|
||||||
"""Process event updates"""
|
"""Process event updates"""
|
||||||
(topic, data) = self.detection_subscriber.check_for_update()
|
(topic, data) = self.detection_subscriber.check_for_update()
|
||||||
|
|
||||||
@ -458,7 +474,7 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
|
|
||||||
camera, frame_name, _, _, motion_boxes, _ = data
|
camera, frame_name, _, _, motion_boxes, _ = data
|
||||||
|
|
||||||
if not camera or not self.config.lpr.enabled or len(motion_boxes) == 0:
|
if not camera or len(motion_boxes) == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
camera_config = self.config.cameras[camera]
|
camera_config = self.config.cameras[camera]
|
||||||
@ -466,8 +482,8 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
if (
|
if (
|
||||||
camera_config.type != CameraTypeEnum.lpr
|
camera_config.type != CameraTypeEnum.lpr
|
||||||
or "license_plate" in camera_config.objects.track
|
or "license_plate" in camera_config.objects.track
|
||||||
):
|
) and len(self.config.classification.custom) == 0:
|
||||||
# we're not a dedicated lpr camera or we are one but we're using frigate+
|
# no active features that use this data
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -487,6 +503,9 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
if isinstance(processor, LicensePlateRealTimeProcessor):
|
if isinstance(processor, LicensePlateRealTimeProcessor):
|
||||||
processor.process_frame(camera, yuv_frame, True)
|
processor.process_frame(camera, yuv_frame, True)
|
||||||
|
|
||||||
|
if isinstance(processor, CustomStateClassificationProcessor):
|
||||||
|
processor.process_frame({"camera": camera}, yuv_frame)
|
||||||
|
|
||||||
self.frame_manager.close(frame_name)
|
self.frame_manager.close(frame_name)
|
||||||
|
|
||||||
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
|
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
|
||||||
|
Loading…
Reference in New Issue
Block a user