From c169e766a55ff77d754ccc81ad241ad51d2b6fcb Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Fri, 1 Aug 2025 12:35:32 -0600 Subject: [PATCH] Add review item subscriber --- frigate/comms/review_updater.py | 2 +- frigate/embeddings/maintainer.py | 13 +++++++++++++ frigate/review/maintainer.py | 8 ++++---- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/frigate/comms/review_updater.py b/frigate/comms/review_updater.py index 6d3e283ce..13f4286d1 100644 --- a/frigate/comms/review_updater.py +++ b/frigate/comms/review_updater.py @@ -7,7 +7,7 @@ from .zmq_proxy import Publisher, Subscriber logger = logging.getLogger(__name__) -class ReviewDataPublisher(Publisher[tuple[str, float]]): +class ReviewDataPublisher(Publisher): # update when typing improvement is added Publisher[tuple[str, float]] """Publishes review item data.""" topic_base = "review/" diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 1f6558221..7b1c29006 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -29,6 +29,7 @@ from frigate.comms.recordings_updater import ( RecordingsDataSubscriber, RecordingsDataTypeEnum, ) +from frigate.comms.review_updater import ReviewDataSubscriber from frigate.config import FrigateConfig from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.updater import ( @@ -143,6 +144,7 @@ class EmbeddingMaintainer(threading.Thread): self.recordings_subscriber = RecordingsDataSubscriber( RecordingsDataTypeEnum.recordings_available_through ) + self.review_subscriber = ReviewDataSubscriber("") self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() @@ -249,6 +251,7 @@ class EmbeddingMaintainer(threading.Thread): self._process_requests() self._process_updates() self._process_recordings_updates() + self._process_review_updates() self._process_frame_updates() self._expire_dedicated_lpr() self._process_finalized() @@ -523,6 +526,16 @@ class EmbeddingMaintainer(threading.Thread): f"{camera} now has recordings available through {recordings_available_through_timestamp}" ) + def _process_review_updates(self) -> None: + """Process review updates.""" + while True: + review_updates = self.review_subscriber.check_for_update() + + if review_updates == None: + break + + logger.info(f"revieved review update {review_updates}") + def _process_event_metadata(self): # Check for regenerate description requests (topic, payload) = self.event_metadata_subscriber.check_for_update() diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 021022e98..b1452be34 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -166,7 +166,7 @@ class ReviewSegmentMaintainer(threading.Thread): ], ) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) - self.review_publisher = ReviewDataPublisher() + self.review_publisher = ReviewDataPublisher("") # manual events self.indefinite_events: dict[str, dict[str, Any]] = {} @@ -196,7 +196,7 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update) + self.review_publisher.publish(review_update, segment.camera) self.requestor.send_data( f"{segment.camera}/review_status", segment.severity.value.upper() ) @@ -224,7 +224,7 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update) + self.review_publisher.publish(review_update, segment.camera) self.requestor.send_data( f"{segment.camera}/review_status", segment.severity.value.upper() ) @@ -246,7 +246,7 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update) + self.review_publisher.publish(review_update, segment.camera) self.requestor.send_data(f"{segment.camera}/review_status", "NONE") self.active_review_segments[segment.camera] = None