processing in maintainer

This commit is contained in:
Josh Hawkins 2025-03-20 09:17:54 -05:00
parent e9fae3fc58
commit 8bf1c216bb

View File

@ -1,6 +1,7 @@
"""Maintain embeddings in SQLite-vec."""
import base64
import datetime
import logging
import os
import threading
@ -13,6 +14,7 @@ import numpy as np
from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
@ -26,6 +28,7 @@ from frigate.comms.recordings_updater import (
RecordingsDataTypeEnum,
)
from frigate.config import FrigateConfig
from frigate.config.classification import CameraTypeEnum
from frigate.const import (
CLIPS_DIR,
UPDATE_EVENT_DESCRIPTION,
@ -97,6 +100,7 @@ class EmbeddingMaintainer(threading.Thread):
self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.recordings_available_through
)
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager()
@ -162,12 +166,15 @@ class EmbeddingMaintainer(threading.Thread):
self._process_requests()
self._process_updates()
self._process_recordings_updates()
self._process_dedicated_lpr()
self._expire_dedicated_lpr()
self._process_finalized()
self._process_event_metadata()
self.event_subscriber.stop()
self.event_end_subscriber.stop()
self.recordings_subscriber.stop()
self.detection_subscriber.stop()
self.event_metadata_publisher.stop()
self.event_metadata_subscriber.stop()
self.embeddings_responder.stop()
@ -374,6 +381,27 @@ class EmbeddingMaintainer(threading.Thread):
if event_id in self.tracked_events:
del self.tracked_events[event_id]
def _expire_dedicated_lpr(self) -> None:
"""Remove plates not seen for longer than expiration timeout for dedicated lpr cameras."""
now = datetime.datetime.now().timestamp()
to_remove = []
for id, data in self.detected_license_plates.items():
last_seen = data.get("last_seen", 0)
if not last_seen:
continue
if now - last_seen > self.config.cameras[data["camera"]].lpr.expire_time:
to_remove.append(id)
for id in to_remove:
print(f"Expiring plate event {id}")
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(id, now),
)
self.detected_license_plates.pop(id)
def _process_recordings_updates(self) -> None:
"""Process recordings updates."""
while True:
@ -406,6 +434,42 @@ class EmbeddingMaintainer(threading.Thread):
event_id, RegenerateDescriptionEnum(source)
)
def _process_dedicated_lpr(self) -> None:
"""Process event updates"""
(topic, data) = self.detection_subscriber.check_for_update(timeout=0.01)
if topic is None:
return
camera, frame_name, _, _, motion_boxes, _ = data
if not camera or not self.config.lpr.enabled or len(motion_boxes) == 0:
return
camera_config = self.config.cameras[camera]
if not camera_config.lpr.camera_type == CameraTypeEnum.lpr:
return
try:
yuv_frame = self.frame_manager.get(
frame_name, camera_config.frame_shape_yuv
)
except FileNotFoundError:
pass
if yuv_frame is None:
logger.debug(
"Unable to process dedicated LPR update because frame is unavailable."
)
return
for processor in self.realtime_processors:
if isinstance(processor, LicensePlateRealTimeProcessor):
processor.process_frame(camera, yuv_frame, True)
self.frame_manager.close(frame_name)
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)