diff --git a/frigate/comms/review_updater.py b/frigate/comms/review_updater.py new file mode 100644 index 000000000..2b3a5b3aa --- /dev/null +++ b/frigate/comms/review_updater.py @@ -0,0 +1,30 @@ +"""Facilitates communication between processes.""" + +import logging + +from .zmq_proxy import Publisher, Subscriber + +logger = logging.getLogger(__name__) + + +class ReviewDataPublisher( + Publisher +): # update when typing improvement is added Publisher[tuple[str, float]] + """Publishes review item data.""" + + topic_base = "review/" + + def __init__(self, topic: str) -> None: + super().__init__(topic) + + def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None: + super().publish(payload, sub_topic) + + +class ReviewDataSubscriber(Subscriber): + """Receives review item data.""" + + topic_base = "review/" + + def __init__(self, topic: str) -> None: + super().__init__(topic) diff --git a/frigate/data_processing/post/review_descriptions.py b/frigate/data_processing/post/review_descriptions.py new file mode 100644 index 000000000..923f96497 --- /dev/null +++ b/frigate/data_processing/post/review_descriptions.py @@ -0,0 +1,25 @@ +"""Post processor for review items to get descriptions.""" + +import logging +from typing import Any + +from frigate.data_processing.types import PostProcessDataEnum + +from ..post.api import PostProcessorApi + +logger = logging.getLogger(__name__) + + +class ReviewDescriptionProcessor(PostProcessorApi): + def __init__(self, config, metrics): + super().__init__(config, metrics, None) + self.tracked_review_items: dict[str, list[Any]] = {} + + def process_data(self, data, data_type): + if data_type != PostProcessDataEnum.review: + return + + logger.info(f"processor is looking at {data}") + + def handle_request(self, request_data): + pass diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 1f6558221..a803d3b9d 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -29,6 +29,7 @@ from frigate.comms.recordings_updater import ( RecordingsDataSubscriber, RecordingsDataTypeEnum, ) +from frigate.comms.review_updater import ReviewDataSubscriber from frigate.config import FrigateConfig from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.updater import ( @@ -49,6 +50,7 @@ from frigate.data_processing.post.audio_transcription import ( from frigate.data_processing.post.license_plate import ( LicensePlatePostProcessor, ) +from frigate.data_processing.post.review_descriptions import ReviewDescriptionProcessor from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor from frigate.data_processing.real_time.api import RealTimeProcessorApi from frigate.data_processing.real_time.bird import BirdRealTimeProcessor @@ -143,6 +145,7 @@ class EmbeddingMaintainer(threading.Thread): self.recordings_subscriber = RecordingsDataSubscriber( RecordingsDataTypeEnum.recordings_available_through ) + self.review_subscriber = ReviewDataSubscriber("") self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() @@ -249,6 +252,7 @@ class EmbeddingMaintainer(threading.Thread): self._process_requests() self._process_updates() self._process_recordings_updates() + self._process_review_updates() self._process_frame_updates() self._expire_dedicated_lpr() self._process_finalized() @@ -523,6 +527,18 @@ class EmbeddingMaintainer(threading.Thread): f"{camera} now has recordings available through {recordings_available_through_timestamp}" ) + def _process_review_updates(self) -> None: + """Process review updates.""" + while True: + review_updates = self.review_subscriber.check_for_update() + + if review_updates == None: + break + + for processor in self.post_processors: + if isinstance(processor, ReviewDescriptionProcessor): + processor.process_data(review_updates, PostProcessDataEnum.review) + def _process_event_metadata(self): # Check for regenerate description requests (topic, payload) = self.event_metadata_subscriber.check_for_update() diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 778717db3..6035f83a5 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -1,6 +1,7 @@ """Maintain review segments in db.""" import copy +import datetime import json import logging import os @@ -17,6 +18,7 @@ import numpy as np from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor +from frigate.comms.review_updater import ReviewDataPublisher from frigate.config import CameraConfig, FrigateConfig from frigate.config.camera.updater import ( CameraConfigUpdateEnum, @@ -63,6 +65,7 @@ class PendingReviewSegment: self.zones = zones self.audio = audio self.last_update = frame_time + self.thumb_time: float | None = None # thumbnail self._frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8) @@ -104,6 +107,7 @@ class PendingReviewSegment: ) if self._frame is not None: + self.thumb_time = datetime.datetime.now().timestamp() self.has_frame = True cv2.imwrite( self.frame_path, self._frame, [int(cv2.IMWRITE_WEBP_QUALITY), 60] @@ -137,6 +141,7 @@ class PendingReviewSegment: "sub_labels": list(self.sub_labels.values()), "zones": self.zones, "audio": list(self.audio), + "thumb_time": self.thumb_time, }, } ) @@ -165,6 +170,7 @@ class ReviewSegmentMaintainer(threading.Thread): ], ) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) + self.review_publisher = ReviewDataPublisher("") # manual events self.indefinite_events: dict[str, dict[str, Any]] = {} @@ -185,16 +191,16 @@ class ReviewSegmentMaintainer(threading.Thread): new_data = segment.get_data(ended=False) self.requestor.send_data(UPSERT_REVIEW_SEGMENT, new_data) start_data = {k: v for k, v in new_data.items()} + review_update = { + "type": "new", + "before": start_data, + "after": start_data, + } self.requestor.send_data( "reviews", - json.dumps( - { - "type": "new", - "before": start_data, - "after": start_data, - } - ), + json.dumps(review_update), ) + self.review_publisher.publish(review_update, segment.camera) self.requestor.send_data( f"{segment.camera}/review_status", segment.severity.value.upper() ) @@ -213,16 +219,16 @@ class ReviewSegmentMaintainer(threading.Thread): new_data = segment.get_data(ended=False) self.requestor.send_data(UPSERT_REVIEW_SEGMENT, new_data) + review_update = { + "type": "update", + "before": {k: v for k, v in prev_data.items()}, + "after": {k: v for k, v in new_data.items()}, + } self.requestor.send_data( "reviews", - json.dumps( - { - "type": "update", - "before": {k: v for k, v in prev_data.items()}, - "after": {k: v for k, v in new_data.items()}, - } - ), + json.dumps(review_update), ) + self.review_publisher.publish(review_update, segment.camera) self.requestor.send_data( f"{segment.camera}/review_status", segment.severity.value.upper() ) @@ -235,16 +241,16 @@ class ReviewSegmentMaintainer(threading.Thread): """End segment.""" final_data = segment.get_data(ended=True) self.requestor.send_data(UPSERT_REVIEW_SEGMENT, final_data) + review_update = { + "type": "end", + "before": {k: v for k, v in prev_data.items()}, + "after": {k: v for k, v in final_data.items()}, + } self.requestor.send_data( "reviews", - json.dumps( - { - "type": "end", - "before": {k: v for k, v in prev_data.items()}, - "after": {k: v for k, v in final_data.items()}, - } - ), + json.dumps(review_update), ) + self.review_publisher.publish(review_update, segment.camera) self.requestor.send_data(f"{segment.camera}/review_status", "NONE") self.active_review_segments[segment.camera] = None