Files
blakeblackshear.frigate/frigate/embeddings/maintainer.py
Nicolas Mowen d150b44d36 Cleanup
2026-02-18 10:52:32 -07:00

686 lines
26 KiB
Python

"""Maintain embeddings in SQLite-vec."""
import base64
import datetime
import logging
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from peewee import DoesNotExist
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import (
EmbeddingsRequestEnum,
EmbeddingsResponder,
)
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataSubscriber,
EventMetadataTypeEnum,
)
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
from frigate.comms.inter_process import InterProcessRequestor
from frigate.comms.recordings_updater import (
RecordingsDataSubscriber,
RecordingsDataTypeEnum,
)
from frigate.comms.review_updater import ReviewDataSubscriber
from frigate.config import FrigateConfig
from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.data_processing.common.license_plate.model import (
LicensePlateModelRunner,
)
from frigate.data_processing.post.api import PostProcessorApi
from frigate.data_processing.post.audio_transcription import (
AudioTranscriptionPostProcessor,
)
from frigate.data_processing.post.license_plate import (
LicensePlatePostProcessor,
)
from frigate.data_processing.post.object_descriptions import ObjectDescriptionProcessor
from frigate.data_processing.post.review_descriptions import ReviewDescriptionProcessor
from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor
from frigate.data_processing.real_time.api import RealTimeProcessorApi
from frigate.data_processing.real_time.bird import BirdRealTimeProcessor
from frigate.data_processing.real_time.custom_classification import (
CustomObjectClassificationProcessor,
CustomStateClassificationProcessor,
)
from frigate.data_processing.real_time.face import FaceRealTimeProcessor
from frigate.data_processing.real_time.license_plate import (
LicensePlateRealTimeProcessor,
)
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
from frigate.genai import GenAIClientManager
from frigate.models import Event, Recordings, ReviewSegment, Trigger
from frigate.util.builtin import serialize
from frigate.util.file import get_event_thumbnail_bytes
from frigate.util.image import SharedMemoryFrameManager
from .embeddings import Embeddings
logger = logging.getLogger(__name__)
MAX_THUMBNAILS = 10
class EmbeddingMaintainer(threading.Thread):
"""Handle embedding queue and post event updates."""
def __init__(
self,
config: FrigateConfig,
metrics: DataProcessorMetrics | None,
stop_event: MpEvent,
) -> None:
super().__init__(name="embeddings_maintainer")
self.config = config
self.metrics = metrics
self.embeddings = None
self.config_updater = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.object_genai,
CameraConfigUpdateEnum.review_genai,
CameraConfigUpdateEnum.semantic_search,
],
)
self.classification_config_subscriber = ConfigSubscriber(
"config/classification/custom/"
)
# Configure Frigate DB
db = SqliteVecQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in config.cameras.values() if c.enabled])
),
load_vec_extension=True,
)
models = [Event, Recordings, ReviewSegment, Trigger]
db.bind(models)
if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics)
# Check if we need to re-index events
if config.semantic_search.reindex:
self.embeddings.reindex()
# Sync semantic search triggers in db with config
self.embeddings.sync_triggers()
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
self.event_subscriber = EventUpdateSubscriber()
self.event_end_subscriber = EventEndSubscriber()
self.event_metadata_publisher = EventMetadataPublisher()
self.event_metadata_subscriber = EventMetadataSubscriber(
EventMetadataTypeEnum.regenerate_description
)
self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.saved
)
self.review_subscriber = ReviewDataSubscriber("")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager()
self.detected_license_plates: dict[str, dict[str, Any]] = {}
self.genai_manager = GenAIClientManager(config)
# model runners to share between realtime and post processors
if self.config.lpr.enabled:
lpr_model_runner = LicensePlateModelRunner(
self.requestor,
device=self.config.lpr.device,
model_size=self.config.lpr.model_size,
)
# realtime processors
self.realtime_processors: list[RealTimeProcessorApi] = []
if self.config.face_recognition.enabled:
logger.debug("Face recognition enabled, initializing FaceRealTimeProcessor")
self.realtime_processors.append(
FaceRealTimeProcessor(
self.config, self.requestor, self.event_metadata_publisher, metrics
)
)
logger.debug("FaceRealTimeProcessor initialized successfully")
if self.config.classification.bird.enabled:
self.realtime_processors.append(
BirdRealTimeProcessor(
self.config, self.event_metadata_publisher, metrics
)
)
if self.config.lpr.enabled:
self.realtime_processors.append(
LicensePlateRealTimeProcessor(
self.config,
self.requestor,
self.event_metadata_publisher,
metrics,
lpr_model_runner,
self.detected_license_plates,
)
)
for model_config in self.config.classification.custom.values():
self.realtime_processors.append(
CustomStateClassificationProcessor(
self.config, model_config, self.requestor, self.metrics
)
if model_config.state_config != None
else CustomObjectClassificationProcessor(
self.config,
model_config,
self.event_metadata_publisher,
self.requestor,
self.metrics,
)
)
# post processors
self.post_processors: list[PostProcessorApi] = []
if self.genai_manager.vision_client is not None and any(
c.review.genai.enabled_in_config for c in self.config.cameras.values()
):
self.post_processors.append(
ReviewDescriptionProcessor(
self.config,
self.requestor,
self.metrics,
self.genai_manager.vision_client,
)
)
if self.config.lpr.enabled:
self.post_processors.append(
LicensePlatePostProcessor(
self.config,
self.requestor,
self.event_metadata_publisher,
metrics,
lpr_model_runner,
self.detected_license_plates,
)
)
if self.config.audio_transcription.enabled and any(
c.enabled_in_config and c.audio_transcription.enabled
for c in self.config.cameras.values()
):
self.post_processors.append(
AudioTranscriptionPostProcessor(
self.config, self.requestor, self.embeddings, metrics
)
)
semantic_trigger_processor: SemanticTriggerProcessor | None = None
if self.config.semantic_search.enabled:
semantic_trigger_processor = SemanticTriggerProcessor(
db,
self.config,
self.requestor,
self.event_metadata_publisher,
metrics,
self.embeddings,
)
self.post_processors.append(semantic_trigger_processor)
if self.genai_manager.vision_client is not None and any(
c.objects.genai.enabled_in_config for c in self.config.cameras.values()
):
self.post_processors.append(
ObjectDescriptionProcessor(
self.config,
self.embeddings,
self.requestor,
self.metrics,
self.genai_manager.vision_client,
semantic_trigger_processor,
)
)
self.stop_event = stop_event
# recordings data
self.recordings_available_through: dict[str, float] = {}
def run(self) -> None:
"""Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set():
self.config_updater.check_for_updates()
self._check_classification_config_updates()
self._process_requests()
self._process_updates()
self._process_recordings_updates()
self._process_review_updates()
self._process_frame_updates()
self._expire_dedicated_lpr()
self._process_finalized()
self._process_event_metadata()
self.config_updater.stop()
self.classification_config_subscriber.stop()
self.event_subscriber.stop()
self.event_end_subscriber.stop()
self.recordings_subscriber.stop()
self.detection_subscriber.stop()
self.event_metadata_publisher.stop()
self.event_metadata_subscriber.stop()
self.embeddings_responder.stop()
self.requestor.stop()
logger.info("Exiting embeddings maintenance...")
def _check_classification_config_updates(self) -> None:
"""Check for classification config updates and add/remove processors."""
topic, model_config = self.classification_config_subscriber.check_for_update()
if topic:
model_name = topic.split("/")[-1]
if model_config is None:
self.realtime_processors = [
processor
for processor in self.realtime_processors
if not (
isinstance(
processor,
(
CustomStateClassificationProcessor,
CustomObjectClassificationProcessor,
),
)
and processor.model_config.name == model_name
)
]
logger.info(
f"Successfully removed classification processor for model: {model_name}"
)
else:
self.config.classification.custom[model_name] = model_config
# Check if processor already exists
for processor in self.realtime_processors:
if isinstance(
processor,
(
CustomStateClassificationProcessor,
CustomObjectClassificationProcessor,
),
):
if processor.model_config.name == model_name:
logger.debug(
f"Classification processor for model {model_name} already exists, skipping"
)
return
if model_config.state_config is not None:
processor = CustomStateClassificationProcessor(
self.config, model_config, self.requestor, self.metrics
)
else:
processor = CustomObjectClassificationProcessor(
self.config,
model_config,
self.event_metadata_publisher,
self.requestor,
self.metrics,
)
self.realtime_processors.append(processor)
logger.info(
f"Added classification processor for model: {model_name} (type: {type(processor).__name__})"
)
def _process_requests(self) -> None:
"""Process embeddings requests"""
def _handle_request(topic: str, data: dict[str, Any]) -> str:
try:
# First handle the embedding-specific topics when semantic search is enabled
if self.config.semantic_search.enabled:
if topic == EmbeddingsRequestEnum.embed_description.value:
return serialize(
self.embeddings.embed_description(
data["id"], data["description"]
),
pack=False,
)
elif topic == EmbeddingsRequestEnum.embed_thumbnail.value:
thumbnail = base64.b64decode(data["thumbnail"])
return serialize(
self.embeddings.embed_thumbnail(data["id"], thumbnail),
pack=False,
)
elif topic == EmbeddingsRequestEnum.generate_search.value:
return serialize(
self.embeddings.embed_description("", data, upsert=False),
pack=False,
)
elif topic == EmbeddingsRequestEnum.reindex.value:
response = self.embeddings.start_reindex()
return "started" if response else "in_progress"
processors = [self.realtime_processors, self.post_processors]
for processor_list in processors:
for processor in processor_list:
resp = processor.handle_request(topic, data)
if resp is not None:
return resp
logger.error(f"No processor handled the topic {topic}")
return None
except Exception as e:
logger.error(f"Unable to handle embeddings request {e}", exc_info=True)
self.embeddings_responder.check_for_request(_handle_request)
def _process_updates(self) -> None:
"""Process event updates"""
update = self.event_subscriber.check_for_update()
if update is None:
return
source_type, _, camera, frame_name, data = update
logger.debug(
f"Received update - source_type: {source_type}, camera: {camera}, data label: {data.get('label') if data else 'None'}"
)
if not camera or source_type != EventTypeEnum.tracked_object:
logger.debug(
f"Skipping update - camera: {camera}, source_type: {source_type}"
)
return
if self.config.semantic_search.enabled:
self.embeddings.update_stats()
camera_config = self.config.cameras[camera]
# no need to process updated objects if no processors are active
if len(self.realtime_processors) == 0 and len(self.post_processors) == 0:
logger.debug(
f"No processors active - realtime: {len(self.realtime_processors)}, post: {len(self.post_processors)}"
)
return
# Create our own thumbnail based on the bounding box and the frame time
try:
yuv_frame = self.frame_manager.get(
frame_name, camera_config.frame_shape_yuv
)
except FileNotFoundError:
logger.debug(f"Frame {frame_name} not found for camera {camera}")
pass
if yuv_frame is None:
logger.debug(
"Unable to process object update because frame is unavailable."
)
return
logger.debug(
f"Processing {len(self.realtime_processors)} realtime processors for object {data.get('id')} (label: {data.get('label')})"
)
for processor in self.realtime_processors:
logger.debug(f"Calling process_frame on {processor.__class__.__name__}")
processor.process_frame(data, yuv_frame)
for processor in self.post_processors:
if isinstance(processor, ObjectDescriptionProcessor):
processor.process_data(
{
"camera": camera,
"data": data,
"state": "update",
"yuv_frame": yuv_frame,
},
PostProcessDataEnum.tracked_object,
)
self.frame_manager.close(frame_name)
def _process_finalized(self) -> None:
"""Process the end of an event."""
while True:
ended = self.event_end_subscriber.check_for_update()
if ended == None:
break
event_id, camera, updated_db = ended
# expire in realtime processors
for processor in self.realtime_processors:
processor.expire_object(event_id, camera)
thumbnail: bytes | None = None
if updated_db:
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
continue
# Skip the event if not an object
if event.data.get("type") != "object":
continue
# Extract valid thumbnail
thumbnail = get_event_thumbnail_bytes(event)
# Embed the thumbnail
self._embed_thumbnail(event_id, thumbnail)
# call any defined post processors
for processor in self.post_processors:
if isinstance(processor, LicensePlatePostProcessor):
recordings_available = self.recordings_available_through.get(camera)
if (
recordings_available is not None
and event_id in self.detected_license_plates
and self.config.cameras[camera].type != "lpr"
):
processor.process_data(
{
"event_id": event_id,
"camera": camera,
"recordings_available": self.recordings_available_through[
camera
],
"obj_data": self.detected_license_plates[event_id][
"obj_data"
],
},
PostProcessDataEnum.recording,
)
elif isinstance(processor, AudioTranscriptionPostProcessor):
continue
elif isinstance(processor, SemanticTriggerProcessor):
processor.process_data(
{"event_id": event_id, "camera": camera, "type": "image"},
PostProcessDataEnum.tracked_object,
)
elif isinstance(processor, ObjectDescriptionProcessor):
if not updated_db:
# Still need to cleanup tracked events even if not processing
processor.cleanup_event(event_id)
continue
processor.process_data(
{
"event": event,
"camera": camera,
"state": "finalize",
"thumbnail": thumbnail,
},
PostProcessDataEnum.tracked_object,
)
else:
processor.process_data(
{"event_id": event_id, "camera": camera},
PostProcessDataEnum.tracked_object,
)
def _expire_dedicated_lpr(self) -> None:
"""Remove plates not seen for longer than expiration timeout for dedicated lpr cameras."""
now = datetime.datetime.now().timestamp()
to_remove = []
for id, data in self.detected_license_plates.items():
last_seen = data.get("last_seen", 0)
if not last_seen:
continue
if now - last_seen > self.config.cameras[data["camera"]].lpr.expire_time:
to_remove.append(id)
for id in to_remove:
self.event_metadata_publisher.publish(
(id, now),
EventMetadataTypeEnum.manual_event_end.value,
)
self.detected_license_plates.pop(id)
def _process_recordings_updates(self) -> None:
"""Process recordings updates."""
while True:
update = self.recordings_subscriber.check_for_update()
if not update:
break
(raw_topic, payload) = update
if not raw_topic or not payload:
break
topic = str(raw_topic)
if topic.endswith(RecordingsDataTypeEnum.saved.value):
camera, recordings_available_through_timestamp, _ = payload
self.recordings_available_through[camera] = (
recordings_available_through_timestamp
)
logger.debug(
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
)
def _process_review_updates(self) -> None:
"""Process review updates."""
while True:
review_updates = self.review_subscriber.check_for_update()
if review_updates == None:
break
for processor in self.post_processors:
if isinstance(processor, ReviewDescriptionProcessor):
processor.process_data(review_updates, PostProcessDataEnum.review)
def _process_event_metadata(self):
# Check for regenerate description requests
(topic, payload) = self.event_metadata_subscriber.check_for_update()
if topic is None:
return
event_id, source, force = payload
if event_id:
for processor in self.post_processors:
if isinstance(processor, ObjectDescriptionProcessor):
processor.handle_request(
"regenerate_description",
{
"event_id": event_id,
"source": RegenerateDescriptionEnum(source),
"force": force,
},
)
def _process_frame_updates(self) -> None:
"""Process event updates"""
(topic, data) = self.detection_subscriber.check_for_update()
if topic is None:
return
camera, frame_name, _, _, motion_boxes, _ = data
if not camera or camera not in self.config.cameras:
return
camera_config = self.config.cameras[camera]
dedicated_lpr_enabled = (
camera_config.type == CameraTypeEnum.lpr
and "license_plate" not in camera_config.objects.track
)
if not dedicated_lpr_enabled and len(self.config.classification.custom) == 0:
# no active features that use this data
return
try:
yuv_frame = self.frame_manager.get(
frame_name, camera_config.frame_shape_yuv
)
except FileNotFoundError:
pass
if yuv_frame is None:
logger.debug(
"Unable to process dedicated LPR update because frame is unavailable."
)
return
for processor in self.realtime_processors:
if (
dedicated_lpr_enabled
and len(motion_boxes) > 0
and isinstance(processor, LicensePlateRealTimeProcessor)
):
processor.process_frame(camera, yuv_frame, True)
if isinstance(processor, CustomStateClassificationProcessor):
processor.process_frame(
{"camera": camera, "motion": motion_boxes}, yuv_frame
)
self.frame_manager.close(frame_name)
def _embed_thumbnail(self, event_id: str, thumbnail: bytes) -> None:
"""Embed the thumbnail for an event."""
if not self.config.semantic_search.enabled:
return
self.embeddings.embed_thumbnail(event_id, thumbnail)