From 0cc5d66e5b85e2f67cdd61dfdb27d1ab1ce93953 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 10 Mar 2025 16:29:29 -0600 Subject: [PATCH] Refactor sub label api (#17079) * Use event metadata updater to handle sub label operations * Use event metadata publisher for sub label setting * Formatting * fix tests * Cleanup --- frigate/api/event.py | 50 +++++---------- frigate/app.py | 9 +-- frigate/comms/event_metadata_updater.py | 28 ++++----- .../common/license_plate/mixin.py | 26 +++----- frigate/data_processing/post/license_plate.py | 3 + frigate/data_processing/real_time/bird.py | 28 +++++---- frigate/data_processing/real_time/face.py | 29 ++++----- .../real_time/license_plate.py | 3 + frigate/embeddings/maintainer.py | 39 +++++++++--- frigate/object_processing.py | 62 +++++++++++++++++++ frigate/test/test_http.py | 22 ++++++- 11 files changed, 184 insertions(+), 115 deletions(-) diff --git a/frigate/api/event.py b/frigate/api/event.py index 100bdfd9e..b47fe23c5 100644 --- a/frigate/api/event.py +++ b/frigate/api/event.py @@ -40,6 +40,7 @@ from frigate.api.defs.response.event_response import ( ) from frigate.api.defs.response.generic_response import GenericResponse from frigate.api.defs.tags import Tags +from frigate.comms.event_metadata_updater import EventMetadataTypeEnum from frigate.const import CLIPS_DIR from frigate.embeddings import EmbeddingsContext from frigate.events.external import ExternalEventProcessor @@ -969,27 +970,16 @@ def set_sub_label( try: event: Event = Event.get(Event.id == event_id) except DoesNotExist: - if not body.camera: - return JSONResponse( - content=( - { - "success": False, - "message": "Event " - + event_id - + " not found and camera is not provided.", - } - ), - status_code=404, - ) - event = None if request.app.detected_frames_processor: - tracked_obj: TrackedObject = ( - request.app.detected_frames_processor.camera_states[ - event.camera if event else body.camera - ].tracked_objects.get(event_id) - ) + tracked_obj: TrackedObject = None + + for state in request.app.detected_frames_processor.camera_states.values(): + tracked_obj = state.tracked_objects.get(event_id) + + if tracked_obj is not None: + break else: tracked_obj = None @@ -1008,23 +998,9 @@ def set_sub_label( new_sub_label = None new_score = None - if tracked_obj: - tracked_obj.obj_data["sub_label"] = (new_sub_label, new_score) - - # update timeline items - Timeline.update( - data=Timeline.data.update({"sub_label": (new_sub_label, new_score)}) - ).where(Timeline.source_id == event_id).execute() - - if event: - event.sub_label = new_sub_label - data = event.data - if new_sub_label is None: - data["sub_label_score"] = None - elif new_score is not None: - data["sub_label_score"] = new_score - event.data = data - event.save() + request.app.event_metadata_updater.publish( + EventMetadataTypeEnum.sub_label, (event_id, new_sub_label, new_score) + ) return JSONResponse( content={ @@ -1105,7 +1081,9 @@ def regenerate_description( camera_config = request.app.frigate_config.cameras[event.camera] if camera_config.genai.enabled: - request.app.event_metadata_updater.publish((event.id, params.source)) + request.app.event_metadata_updater.publish( + EventMetadataTypeEnum.regenerate_description, (event.id, params.source) + ) return JSONResponse( content=( diff --git a/frigate/app.py b/frigate/app.py index a77533619..af675eaaf 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -20,10 +20,7 @@ from frigate.camera import CameraMetrics, PTZMetrics from frigate.comms.base_communicator import Communicator from frigate.comms.config_updater import ConfigPublisher from frigate.comms.dispatcher import Dispatcher -from frigate.comms.event_metadata_updater import ( - EventMetadataPublisher, - EventMetadataTypeEnum, -) +from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient from frigate.comms.webpush import WebPushClient @@ -327,9 +324,7 @@ class FrigateApp: def init_inter_process_communicator(self) -> None: self.inter_process_communicator = InterProcessCommunicator() self.inter_config_updater = ConfigPublisher() - self.event_metadata_updater = EventMetadataPublisher( - EventMetadataTypeEnum.regenerate_description - ) + self.event_metadata_updater = EventMetadataPublisher() self.inter_zmq_proxy = ZmqProxy() def init_onvif(self) -> None: diff --git a/frigate/comms/event_metadata_updater.py b/frigate/comms/event_metadata_updater.py index 87e1889ce..f3301aef4 100644 --- a/frigate/comms/event_metadata_updater.py +++ b/frigate/comms/event_metadata_updater.py @@ -2,9 +2,6 @@ import logging from enum import Enum -from typing import Optional - -from frigate.events.types import RegenerateDescriptionEnum from .zmq_proxy import Publisher, Subscriber @@ -14,6 +11,7 @@ logger = logging.getLogger(__name__) class EventMetadataTypeEnum(str, Enum): all = "" regenerate_description = "regenerate_description" + sub_label = "sub_label" class EventMetadataPublisher(Publisher): @@ -21,12 +19,11 @@ class EventMetadataPublisher(Publisher): topic_base = "event_metadata/" - def __init__(self, topic: EventMetadataTypeEnum) -> None: - topic = topic.value - super().__init__(topic) + def __init__(self) -> None: + super().__init__() - def publish(self, payload: tuple[str, RegenerateDescriptionEnum]) -> None: - super().publish(payload) + def publish(self, topic: EventMetadataTypeEnum, payload: any) -> None: + super().publish(payload, topic.value) class EventMetadataSubscriber(Subscriber): @@ -35,17 +32,14 @@ class EventMetadataSubscriber(Subscriber): topic_base = "event_metadata/" def __init__(self, topic: EventMetadataTypeEnum) -> None: - topic = topic.value - super().__init__(topic) + super().__init__(topic.value) - def check_for_update( - self, timeout: float = 1 - ) -> Optional[tuple[EventMetadataTypeEnum, str, RegenerateDescriptionEnum]]: + def check_for_update(self, timeout: float = 1) -> tuple | None: return super().check_for_update(timeout) - def _return_object(self, topic: str, payload: any) -> any: + def _return_object(self, topic: str, payload: tuple) -> tuple: if payload is None: - return (None, None, None) + return (None, None) + topic = EventMetadataTypeEnum[topic[len(self.topic_base) :]] - event_id, source = payload - return (topic, event_id, RegenerateDescriptionEnum(source)) + return (topic, payload) diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index 012924a1f..c74949d9c 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -8,12 +8,11 @@ from typing import List, Optional, Tuple import cv2 import numpy as np -import requests from Levenshtein import distance from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset from shapely.geometry import Polygon -from frigate.const import FRIGATE_LOCALHOST +from frigate.comms.event_metadata_updater import EventMetadataTypeEnum from frigate.util.image import area logger = logging.getLogger(__name__) @@ -1059,22 +1058,15 @@ class LicensePlateProcessingMixin: ) # Send the result to the API - resp = requests.post( - f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", - json={ - "camera": obj_data.get("camera"), - "subLabel": sub_label, - "subLabelScore": avg_confidence, - }, + self.sub_label_publisher.publish( + EventMetadataTypeEnum.sub_label, (id, sub_label, avg_confidence) ) - - if resp.status_code == 200: - self.detected_license_plates[id] = { - "plate": top_plate, - "char_confidences": top_char_confidences, - "area": top_area, - "obj_data": obj_data, - } + self.detected_license_plates[id] = { + "plate": top_plate, + "char_confidences": top_char_confidences, + "area": top_area, + "obj_data": obj_data, + } def handle_request(self, topic, request_data) -> dict[str, any] | None: return diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index 2c80418c7..e5c8a29a8 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -8,6 +8,7 @@ import numpy as np from peewee import DoesNotExist from frigate.comms.embeddings_updater import EmbeddingsRequestEnum +from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.config import FrigateConfig from frigate.data_processing.common.license_plate.mixin import ( WRITE_DEBUG_IMAGES, @@ -30,6 +31,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): def __init__( self, config: FrigateConfig, + sub_label_publisher: EventMetadataPublisher, metrics: DataProcessorMetrics, model_runner: LicensePlateModelRunner, detected_license_plates: dict[str, dict[str, any]], @@ -38,6 +40,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): self.model_runner = model_runner self.lpr_config = config.lpr self.config = config + self.sub_label_publisher = sub_label_publisher super().__init__(config, metrics, model_runner) def process_data( diff --git a/frigate/data_processing/real_time/bird.py b/frigate/data_processing/real_time/bird.py index 01490d895..d942edf6f 100644 --- a/frigate/data_processing/real_time/bird.py +++ b/frigate/data_processing/real_time/bird.py @@ -5,10 +5,13 @@ import os import cv2 import numpy as np -import requests +from frigate.comms.event_metadata_updater import ( + EventMetadataPublisher, + EventMetadataTypeEnum, +) from frigate.config import FrigateConfig -from frigate.const import FRIGATE_LOCALHOST, MODEL_CACHE_DIR +from frigate.const import MODEL_CACHE_DIR from frigate.util.object import calculate_region from ..types import DataProcessorMetrics @@ -23,9 +26,15 @@ logger = logging.getLogger(__name__) class BirdRealTimeProcessor(RealTimeProcessorApi): - def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): + def __init__( + self, + config: FrigateConfig, + sub_label_publisher: EventMetadataPublisher, + metrics: DataProcessorMetrics, + ): super().__init__(config, metrics) self.interpreter: Interpreter = None + self.sub_label_publisher = sub_label_publisher self.tensor_input_details: dict[str, any] = None self.tensor_output_details: dict[str, any] = None self.detected_birds: dict[str, float] = {} @@ -134,17 +143,10 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): logger.debug(f"Score {score} is worse than previous score {previous_score}") return - resp = requests.post( - f"{FRIGATE_LOCALHOST}/api/events/{obj_data['id']}/sub_label", - json={ - "camera": obj_data.get("camera"), - "subLabel": self.labelmap[best_id], - "subLabelScore": score, - }, + self.sub_label_publisher.publish( + EventMetadataTypeEnum.sub_label, (id, self.labelmap[best_id], score) ) - - if resp.status_code == 200: - self.detected_birds[obj_data["id"]] = score + self.detected_birds[obj_data["id"]] = score def handle_request(self, topic, request_data): return None diff --git a/frigate/data_processing/real_time/face.py b/frigate/data_processing/real_time/face.py index e7cf622e9..c88228651 100644 --- a/frigate/data_processing/real_time/face.py +++ b/frigate/data_processing/real_time/face.py @@ -11,11 +11,14 @@ from typing import Optional import cv2 import numpy as np -import requests from frigate.comms.embeddings_updater import EmbeddingsRequestEnum +from frigate.comms.event_metadata_updater import ( + EventMetadataPublisher, + EventMetadataTypeEnum, +) from frigate.config import FrigateConfig -from frigate.const import FACE_DIR, FRIGATE_LOCALHOST, MODEL_CACHE_DIR +from frigate.const import FACE_DIR, MODEL_CACHE_DIR from frigate.util.image import area from ..types import DataProcessorMetrics @@ -28,9 +31,15 @@ MIN_MATCHING_FACES = 2 class FaceRealTimeProcessor(RealTimeProcessorApi): - def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): + def __init__( + self, + config: FrigateConfig, + sub_label_publisher: EventMetadataPublisher, + metrics: DataProcessorMetrics, + ): super().__init__(config, metrics) self.face_config = config.face_recognition + self.sub_label_publisher = sub_label_publisher self.face_detector: cv2.FaceDetectorYN = None self.landmark_detector: cv2.face.FacemarkLBF = None self.recognizer: cv2.face.LBPHFaceRecognizer = None @@ -349,18 +358,10 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.__update_metrics(datetime.datetime.now().timestamp() - start) return - resp = requests.post( - f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", - json={ - "camera": obj_data.get("camera"), - "subLabel": sub_label, - "subLabelScore": score, - }, + self.sub_label_publisher.publish( + EventMetadataTypeEnum.sub_label, (id, sub_label, score) ) - - if resp.status_code == 200: - self.detected_faces[id] = face_score - + self.detected_faces[id] = face_score self.__update_metrics(datetime.datetime.now().timestamp() - start) def handle_request(self, topic, request_data) -> dict[str, any] | None: diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py index c8f0efa11..d2cb9f2a5 100644 --- a/frigate/data_processing/real_time/license_plate.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -4,6 +4,7 @@ import logging import numpy as np +from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.config import FrigateConfig from frigate.data_processing.common.license_plate.mixin import ( LicensePlateProcessingMixin, @@ -22,6 +23,7 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess def __init__( self, config: FrigateConfig, + sub_label_publisher: EventMetadataPublisher, metrics: DataProcessorMetrics, model_runner: LicensePlateModelRunner, detected_license_plates: dict[str, dict[str, any]], @@ -30,6 +32,7 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess self.model_runner = model_runner self.lpr_config = config.lpr self.config = config + self.sub_label_publisher = sub_label_publisher super().__init__(config, metrics) def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index b3bd6c204..2fa3eeb2c 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -15,6 +15,7 @@ from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder from frigate.comms.event_metadata_updater import ( + EventMetadataPublisher, EventMetadataSubscriber, EventMetadataTypeEnum, ) @@ -43,7 +44,7 @@ from frigate.data_processing.real_time.license_plate import ( LicensePlateRealTimeProcessor, ) from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum -from frigate.events.types import EventTypeEnum +from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum from frigate.genai import get_genai_client from frigate.models import Event from frigate.types import TrackedObjectUpdateTypesEnum @@ -89,6 +90,7 @@ class EmbeddingMaintainer(threading.Thread): self.event_subscriber = EventUpdateSubscriber() self.event_end_subscriber = EventEndSubscriber() + self.event_metadata_publisher = EventMetadataPublisher() self.event_metadata_subscriber = EventMetadataSubscriber( EventMetadataTypeEnum.regenerate_description ) @@ -108,15 +110,27 @@ class EmbeddingMaintainer(threading.Thread): self.realtime_processors: list[RealTimeProcessorApi] = [] if self.config.face_recognition.enabled: - self.realtime_processors.append(FaceRealTimeProcessor(self.config, metrics)) + self.realtime_processors.append( + FaceRealTimeProcessor( + self.config, self.event_metadata_publisher, metrics + ) + ) if self.config.classification.bird.enabled: - self.realtime_processors.append(BirdRealTimeProcessor(self.config, metrics)) + self.realtime_processors.append( + BirdRealTimeProcessor( + self.config, self.event_metadata_publisher, metrics + ) + ) if self.config.lpr.enabled: self.realtime_processors.append( LicensePlateRealTimeProcessor( - self.config, metrics, lpr_model_runner, self.detected_license_plates + self.config, + self.event_metadata_publisher, + metrics, + lpr_model_runner, + self.detected_license_plates, ) ) @@ -126,7 +140,11 @@ class EmbeddingMaintainer(threading.Thread): if self.config.lpr.enabled: self.post_processors.append( LicensePlatePostProcessor( - self.config, metrics, lpr_model_runner, self.detected_license_plates + self.config, + self.event_metadata_publisher, + metrics, + lpr_model_runner, + self.detected_license_plates, ) ) @@ -150,6 +168,7 @@ class EmbeddingMaintainer(threading.Thread): self.event_subscriber.stop() self.event_end_subscriber.stop() self.recordings_subscriber.stop() + self.event_metadata_publisher.stop() self.event_metadata_subscriber.stop() self.embeddings_responder.stop() self.requestor.stop() @@ -375,15 +394,17 @@ class EmbeddingMaintainer(threading.Thread): def _process_event_metadata(self): # Check for regenerate description requests - (topic, event_id, source) = self.event_metadata_subscriber.check_for_update( - timeout=0.01 - ) + (topic, payload) = self.event_metadata_subscriber.check_for_update(timeout=0.01) if topic is None: return + event_id, source = payload + if event_id: - self.handle_regenerate_description(event_id, source) + self.handle_regenerate_description( + event_id, RegenerateDescriptionEnum(source) + ) def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: """Return jpg thumbnail of a region of the frame.""" diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 8faf91cb5..d31ca83e1 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -9,10 +9,15 @@ from typing import Callable, Optional import cv2 import numpy as np +from peewee import DoesNotExist from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.dispatcher import Dispatcher +from frigate.comms.event_metadata_updater import ( + EventMetadataSubscriber, + EventMetadataTypeEnum, +) from frigate.comms.events_updater import EventEndSubscriber, EventUpdatePublisher from frigate.comms.inter_process import InterProcessRequestor from frigate.config import ( @@ -24,6 +29,7 @@ from frigate.config import ( ) from frigate.const import UPDATE_CAMERA_ACTIVITY from frigate.events.types import EventStateEnum, EventTypeEnum +from frigate.models import Event, Timeline from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.track.tracked_object import TrackedObject from frigate.util.image import ( @@ -446,6 +452,9 @@ class TrackedObjectProcessor(threading.Thread): self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video) self.event_sender = EventUpdatePublisher() self.event_end_subscriber = EventEndSubscriber() + self.sub_label_subscriber = EventMetadataSubscriber( + EventMetadataTypeEnum.sub_label + ) self.camera_activity: dict[str, dict[str, any]] = {} @@ -684,6 +693,46 @@ class TrackedObjectProcessor(threading.Thread): """Returns the latest frame time for a given camera.""" return self.camera_states[camera].current_frame_time + def set_sub_label( + self, event_id: str, sub_label: str | None, score: float | None + ) -> None: + """Update sub label for given event id.""" + tracked_obj: TrackedObject = None + + for state in self.camera_states.values(): + tracked_obj = state.tracked_objects.get(event_id) + + if tracked_obj is not None: + break + + try: + event: Event = Event.get(Event.id == event_id) + except DoesNotExist: + event = None + + if not tracked_obj and not event: + return + + if tracked_obj: + tracked_obj.obj_data["sub_label"] = (sub_label, score) + + if event: + event.sub_label = sub_label + data = event.data + if sub_label is None: + data["sub_label_score"] = None + elif score is not None: + data["sub_label_score"] = score + event.data = data + event.save() + + # update timeline items + Timeline.update( + data=Timeline.data.update({"sub_label": (sub_label, score)}) + ).where(Timeline.source_id == event_id).execute() + + return True + def force_end_all_events(self, camera: str, camera_state: CameraState): """Ends all active events on camera when disabling.""" last_frame_name = camera_state.previous_frame_id @@ -741,6 +790,18 @@ class TrackedObjectProcessor(threading.Thread): if not current_enabled: continue + # check for sub label updates + while True: + (topic, payload) = self.sub_label_subscriber.check_for_update( + timeout=0.1 + ) + + if not topic: + break + + (event_id, sub_label, score) = payload + self.set_sub_label(event_id, sub_label, score) + try: ( camera, @@ -799,6 +860,7 @@ class TrackedObjectProcessor(threading.Thread): self.detection_publisher.stop() self.event_sender.stop() self.event_end_subscriber.stop() + self.sub_label_subscriber.stop() self.config_enabled_subscriber.stop() logger.info("Exiting object processor...") diff --git a/frigate/test/test_http.py b/frigate/test/test_http.py index d6ff91a83..d23727672 100644 --- a/frigate/test/test_http.py +++ b/frigate/test/test_http.py @@ -2,6 +2,7 @@ import datetime import logging import os import unittest +from unittest.mock import Mock from fastapi.testclient import TestClient from peewee_migrate import Router @@ -10,6 +11,7 @@ from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase from frigate.api.fastapi_app import create_fastapi_app +from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.config import FrigateConfig from frigate.const import BASE_DIR, CACHE_DIR from frigate.models import Event, Recordings, Timeline @@ -243,6 +245,7 @@ class TestHttp(unittest.TestCase): assert len(events) == 1 def test_set_delete_sub_label(self): + mock_event_updater = Mock(spec=EventMetadataPublisher) app = create_fastapi_app( FrigateConfig(**self.minimal_config), self.db, @@ -252,11 +255,18 @@ class TestHttp(unittest.TestCase): None, None, None, - None, + mock_event_updater, ) id = "123456.random" sub_label = "sub" + def update_event(topic, payload): + event = Event.get(id=id) + event.sub_label = payload[1] + event.save() + + mock_event_updater.publish.side_effect = update_event + with TestClient(app) as client: _insert_mock_event(id) new_sub_label_response = client.post( @@ -281,6 +291,7 @@ class TestHttp(unittest.TestCase): assert event["sub_label"] == None def test_sub_label_list(self): + mock_event_updater = Mock(spec=EventMetadataPublisher) app = create_fastapi_app( FrigateConfig(**self.minimal_config), self.db, @@ -290,11 +301,18 @@ class TestHttp(unittest.TestCase): None, None, None, - None, + mock_event_updater, ) id = "123456.random" sub_label = "sub" + def update_event(topic, payload): + event = Event.get(id=id) + event.sub_label = payload[1] + event.save() + + mock_event_updater.publish.side_effect = update_event + with TestClient(app) as client: _insert_mock_event(id) client.post(