Implement start for review item description processor (#19352)

* Add review item data transmission

* Publish review updates

* Add review item subscriber

* Basic implementation for testing review processor

* Formatting
This commit is contained in:
Nicolas Mowen 2025-08-03 07:33:09 -06:00 committed by GitHub
parent 0a02c665fc
commit 468def6698
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 98 additions and 21 deletions

View File

@ -0,0 +1,30 @@
"""Facilitates communication between processes."""
import logging
from .zmq_proxy import Publisher, Subscriber
logger = logging.getLogger(__name__)
class ReviewDataPublisher(
Publisher
): # update when typing improvement is added Publisher[tuple[str, float]]
"""Publishes review item data."""
topic_base = "review/"
def __init__(self, topic: str) -> None:
super().__init__(topic)
def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None:
super().publish(payload, sub_topic)
class ReviewDataSubscriber(Subscriber):
"""Receives review item data."""
topic_base = "review/"
def __init__(self, topic: str) -> None:
super().__init__(topic)

View File

@ -0,0 +1,25 @@
"""Post processor for review items to get descriptions."""
import logging
from typing import Any
from frigate.data_processing.types import PostProcessDataEnum
from ..post.api import PostProcessorApi
logger = logging.getLogger(__name__)
class ReviewDescriptionProcessor(PostProcessorApi):
def __init__(self, config, metrics):
super().__init__(config, metrics, None)
self.tracked_review_items: dict[str, list[Any]] = {}
def process_data(self, data, data_type):
if data_type != PostProcessDataEnum.review:
return
logger.info(f"processor is looking at {data}")
def handle_request(self, request_data):
pass

View File

@ -29,6 +29,7 @@ from frigate.comms.recordings_updater import (
RecordingsDataSubscriber,
RecordingsDataTypeEnum,
)
from frigate.comms.review_updater import ReviewDataSubscriber
from frigate.config import FrigateConfig
from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import (
@ -49,6 +50,7 @@ from frigate.data_processing.post.audio_transcription import (
from frigate.data_processing.post.license_plate import (
LicensePlatePostProcessor,
)
from frigate.data_processing.post.review_descriptions import ReviewDescriptionProcessor
from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor
from frigate.data_processing.real_time.api import RealTimeProcessorApi
from frigate.data_processing.real_time.bird import BirdRealTimeProcessor
@ -143,6 +145,7 @@ class EmbeddingMaintainer(threading.Thread):
self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.recordings_available_through
)
self.review_subscriber = ReviewDataSubscriber("")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager()
@ -249,6 +252,7 @@ class EmbeddingMaintainer(threading.Thread):
self._process_requests()
self._process_updates()
self._process_recordings_updates()
self._process_review_updates()
self._process_frame_updates()
self._expire_dedicated_lpr()
self._process_finalized()
@ -523,6 +527,18 @@ class EmbeddingMaintainer(threading.Thread):
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
)
def _process_review_updates(self) -> None:
"""Process review updates."""
while True:
review_updates = self.review_subscriber.check_for_update()
if review_updates == None:
break
for processor in self.post_processors:
if isinstance(processor, ReviewDescriptionProcessor):
processor.process_data(review_updates, PostProcessDataEnum.review)
def _process_event_metadata(self):
# Check for regenerate description requests
(topic, payload) = self.event_metadata_subscriber.check_for_update()

View File

@ -1,6 +1,7 @@
"""Maintain review segments in db."""
import copy
import datetime
import json
import logging
import os
@ -17,6 +18,7 @@ import numpy as np
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.comms.review_updater import ReviewDataPublisher
from frigate.config import CameraConfig, FrigateConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
@ -63,6 +65,7 @@ class PendingReviewSegment:
self.zones = zones
self.audio = audio
self.last_update = frame_time
self.thumb_time: float | None = None
# thumbnail
self._frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8)
@ -104,6 +107,7 @@ class PendingReviewSegment:
)
if self._frame is not None:
self.thumb_time = datetime.datetime.now().timestamp()
self.has_frame = True
cv2.imwrite(
self.frame_path, self._frame, [int(cv2.IMWRITE_WEBP_QUALITY), 60]
@ -137,6 +141,7 @@ class PendingReviewSegment:
"sub_labels": list(self.sub_labels.values()),
"zones": self.zones,
"audio": list(self.audio),
"thumb_time": self.thumb_time,
},
}
)
@ -165,6 +170,7 @@ class ReviewSegmentMaintainer(threading.Thread):
],
)
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.review_publisher = ReviewDataPublisher("")
# manual events
self.indefinite_events: dict[str, dict[str, Any]] = {}
@ -185,16 +191,16 @@ class ReviewSegmentMaintainer(threading.Thread):
new_data = segment.get_data(ended=False)
self.requestor.send_data(UPSERT_REVIEW_SEGMENT, new_data)
start_data = {k: v for k, v in new_data.items()}
review_update = {
"type": "new",
"before": start_data,
"after": start_data,
}
self.requestor.send_data(
"reviews",
json.dumps(
{
"type": "new",
"before": start_data,
"after": start_data,
}
),
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.requestor.send_data(
f"{segment.camera}/review_status", segment.severity.value.upper()
)
@ -213,16 +219,16 @@ class ReviewSegmentMaintainer(threading.Thread):
new_data = segment.get_data(ended=False)
self.requestor.send_data(UPSERT_REVIEW_SEGMENT, new_data)
review_update = {
"type": "update",
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in new_data.items()},
}
self.requestor.send_data(
"reviews",
json.dumps(
{
"type": "update",
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in new_data.items()},
}
),
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.requestor.send_data(
f"{segment.camera}/review_status", segment.severity.value.upper()
)
@ -235,16 +241,16 @@ class ReviewSegmentMaintainer(threading.Thread):
"""End segment."""
final_data = segment.get_data(ended=True)
self.requestor.send_data(UPSERT_REVIEW_SEGMENT, final_data)
review_update = {
"type": "end",
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in final_data.items()},
}
self.requestor.send_data(
"reviews",
json.dumps(
{
"type": "end",
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in final_data.items()},
}
),
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.requestor.send_data(f"{segment.camera}/review_status", "NONE")
self.active_review_segments[segment.camera] = None