mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-08-08 13:51:01 +02:00
* Fix the `Any` typing hint treewide There has been confusion between the Any type[1] and the any function[2] in typing hints. [1] https://docs.python.org/3/library/typing.html#typing.Any [2] https://docs.python.org/3/library/functions.html#any * Fix typing for various frame_shape members Frame shapes are most likely defined by height and width, so a single int cannot express that. * Wrap gpu stats functions in Optional[] These can return `None`, so they need to be `Type | None`, which is what `Optional` expresses very nicely. * Fix return type in get_latest_segment_datetime Returns a datetime object, not an integer. * Make the return type of FrameManager.write optional This is necessary since the SharedMemoryFrameManager.write function can return None. * Fix total_seconds() return type in get_tz_modifiers The function returns a float, not an int. https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds * Account for floating point results in to_relative_box Because the function uses division the return types may either be int or float. * Resolve ruff deprecation warning The config has been split into formatter and linter, and the global options are deprecated.
679 lines
25 KiB
Python
679 lines
25 KiB
Python
"""Maintain embeddings in SQLite-vec."""
|
|
|
|
import base64
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import threading
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from peewee import DoesNotExist
|
|
from playhouse.sqliteq import SqliteQueueDatabase
|
|
|
|
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
|
|
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder
|
|
from frigate.comms.event_metadata_updater import (
|
|
EventMetadataPublisher,
|
|
EventMetadataSubscriber,
|
|
EventMetadataTypeEnum,
|
|
)
|
|
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
|
|
from frigate.comms.inter_process import InterProcessRequestor
|
|
from frigate.comms.recordings_updater import (
|
|
RecordingsDataSubscriber,
|
|
RecordingsDataTypeEnum,
|
|
)
|
|
from frigate.config import FrigateConfig
|
|
from frigate.config.camera.camera import CameraTypeEnum
|
|
from frigate.const import (
|
|
CLIPS_DIR,
|
|
UPDATE_EVENT_DESCRIPTION,
|
|
)
|
|
from frigate.data_processing.common.license_plate.model import (
|
|
LicensePlateModelRunner,
|
|
)
|
|
from frigate.data_processing.post.api import PostProcessorApi
|
|
from frigate.data_processing.post.license_plate import (
|
|
LicensePlatePostProcessor,
|
|
)
|
|
from frigate.data_processing.real_time.api import RealTimeProcessorApi
|
|
from frigate.data_processing.real_time.bird import BirdRealTimeProcessor
|
|
from frigate.data_processing.real_time.face import FaceRealTimeProcessor
|
|
from frigate.data_processing.real_time.license_plate import (
|
|
LicensePlateRealTimeProcessor,
|
|
)
|
|
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
|
|
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
|
|
from frigate.genai import get_genai_client
|
|
from frigate.models import Event
|
|
from frigate.types import TrackedObjectUpdateTypesEnum
|
|
from frigate.util.builtin import serialize
|
|
from frigate.util.image import (
|
|
SharedMemoryFrameManager,
|
|
calculate_region,
|
|
ensure_jpeg_bytes,
|
|
)
|
|
from frigate.util.path import get_event_thumbnail_bytes
|
|
|
|
from .embeddings import Embeddings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MAX_THUMBNAILS = 10
|
|
|
|
|
|
class EmbeddingMaintainer(threading.Thread):
|
|
"""Handle embedding queue and post event updates."""
|
|
|
|
def __init__(
|
|
self,
|
|
db: SqliteQueueDatabase,
|
|
config: FrigateConfig,
|
|
metrics: DataProcessorMetrics,
|
|
stop_event: MpEvent,
|
|
) -> None:
|
|
super().__init__(name="embeddings_maintainer")
|
|
self.config = config
|
|
self.metrics = metrics
|
|
self.embeddings = None
|
|
|
|
if config.semantic_search.enabled:
|
|
self.embeddings = Embeddings(config, db, metrics)
|
|
|
|
# Check if we need to re-index events
|
|
if config.semantic_search.reindex:
|
|
self.embeddings.reindex()
|
|
|
|
# create communication for updating event descriptions
|
|
self.requestor = InterProcessRequestor()
|
|
|
|
self.event_subscriber = EventUpdateSubscriber()
|
|
self.event_end_subscriber = EventEndSubscriber()
|
|
self.event_metadata_publisher = EventMetadataPublisher()
|
|
self.event_metadata_subscriber = EventMetadataSubscriber(
|
|
EventMetadataTypeEnum.regenerate_description
|
|
)
|
|
self.recordings_subscriber = RecordingsDataSubscriber(
|
|
RecordingsDataTypeEnum.recordings_available_through
|
|
)
|
|
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
|
|
self.embeddings_responder = EmbeddingsResponder()
|
|
self.frame_manager = SharedMemoryFrameManager()
|
|
|
|
self.detected_license_plates: dict[str, dict[str, Any]] = {}
|
|
|
|
# model runners to share between realtime and post processors
|
|
if self.config.lpr.enabled:
|
|
lpr_model_runner = LicensePlateModelRunner(
|
|
self.requestor,
|
|
device=self.config.lpr.device,
|
|
model_size=self.config.lpr.model_size,
|
|
)
|
|
|
|
# realtime processors
|
|
self.realtime_processors: list[RealTimeProcessorApi] = []
|
|
|
|
if self.config.face_recognition.enabled:
|
|
self.realtime_processors.append(
|
|
FaceRealTimeProcessor(
|
|
self.config, self.requestor, self.event_metadata_publisher, metrics
|
|
)
|
|
)
|
|
|
|
if self.config.classification.bird.enabled:
|
|
self.realtime_processors.append(
|
|
BirdRealTimeProcessor(
|
|
self.config, self.event_metadata_publisher, metrics
|
|
)
|
|
)
|
|
|
|
if self.config.lpr.enabled:
|
|
self.realtime_processors.append(
|
|
LicensePlateRealTimeProcessor(
|
|
self.config,
|
|
self.requestor,
|
|
self.event_metadata_publisher,
|
|
metrics,
|
|
lpr_model_runner,
|
|
self.detected_license_plates,
|
|
)
|
|
)
|
|
|
|
# post processors
|
|
self.post_processors: list[PostProcessorApi] = []
|
|
|
|
if self.config.lpr.enabled:
|
|
self.post_processors.append(
|
|
LicensePlatePostProcessor(
|
|
self.config,
|
|
self.requestor,
|
|
self.event_metadata_publisher,
|
|
metrics,
|
|
lpr_model_runner,
|
|
self.detected_license_plates,
|
|
)
|
|
)
|
|
|
|
self.stop_event = stop_event
|
|
self.tracked_events: dict[str, list[Any]] = {}
|
|
self.early_request_sent: dict[str, bool] = {}
|
|
self.genai_client = get_genai_client(config)
|
|
|
|
# recordings data
|
|
self.recordings_available_through: dict[str, float] = {}
|
|
|
|
def run(self) -> None:
|
|
"""Maintain a SQLite-vec database for semantic search."""
|
|
while not self.stop_event.is_set():
|
|
self._process_requests()
|
|
self._process_updates()
|
|
self._process_recordings_updates()
|
|
self._process_dedicated_lpr()
|
|
self._expire_dedicated_lpr()
|
|
self._process_finalized()
|
|
self._process_event_metadata()
|
|
|
|
self.event_subscriber.stop()
|
|
self.event_end_subscriber.stop()
|
|
self.recordings_subscriber.stop()
|
|
self.detection_subscriber.stop()
|
|
self.event_metadata_publisher.stop()
|
|
self.event_metadata_subscriber.stop()
|
|
self.embeddings_responder.stop()
|
|
self.requestor.stop()
|
|
logger.info("Exiting embeddings maintenance...")
|
|
|
|
def _process_requests(self) -> None:
|
|
"""Process embeddings requests"""
|
|
|
|
def _handle_request(topic: str, data: dict[str, Any]) -> str:
|
|
try:
|
|
# First handle the embedding-specific topics when semantic search is enabled
|
|
if self.config.semantic_search.enabled:
|
|
if topic == EmbeddingsRequestEnum.embed_description.value:
|
|
return serialize(
|
|
self.embeddings.embed_description(
|
|
data["id"], data["description"]
|
|
),
|
|
pack=False,
|
|
)
|
|
elif topic == EmbeddingsRequestEnum.embed_thumbnail.value:
|
|
thumbnail = base64.b64decode(data["thumbnail"])
|
|
return serialize(
|
|
self.embeddings.embed_thumbnail(data["id"], thumbnail),
|
|
pack=False,
|
|
)
|
|
elif topic == EmbeddingsRequestEnum.generate_search.value:
|
|
return serialize(
|
|
self.embeddings.embed_description("", data, upsert=False),
|
|
pack=False,
|
|
)
|
|
elif topic == EmbeddingsRequestEnum.reindex.value:
|
|
response = self.embeddings.start_reindex()
|
|
return "started" if response else "in_progress"
|
|
|
|
processors = [self.realtime_processors, self.post_processors]
|
|
for processor_list in processors:
|
|
for processor in processor_list:
|
|
resp = processor.handle_request(topic, data)
|
|
if resp is not None:
|
|
return resp
|
|
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unable to handle embeddings request {e}", exc_info=True)
|
|
|
|
self.embeddings_responder.check_for_request(_handle_request)
|
|
|
|
def _process_updates(self) -> None:
|
|
"""Process event updates"""
|
|
update = self.event_subscriber.check_for_update()
|
|
|
|
if update is None:
|
|
return
|
|
|
|
source_type, _, camera, frame_name, data = update
|
|
|
|
if not camera or source_type != EventTypeEnum.tracked_object:
|
|
return
|
|
|
|
if self.config.semantic_search.enabled:
|
|
self.embeddings.update_stats()
|
|
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
# no need to process updated objects if face recognition, lpr, genai are disabled
|
|
if not camera_config.genai.enabled and len(self.realtime_processors) == 0:
|
|
return
|
|
|
|
# Create our own thumbnail based on the bounding box and the frame time
|
|
try:
|
|
yuv_frame = self.frame_manager.get(
|
|
frame_name, camera_config.frame_shape_yuv
|
|
)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
if yuv_frame is None:
|
|
logger.debug(
|
|
"Unable to process object update because frame is unavailable."
|
|
)
|
|
return
|
|
|
|
for processor in self.realtime_processors:
|
|
processor.process_frame(data, yuv_frame)
|
|
|
|
# no need to save our own thumbnails if genai is not enabled
|
|
# or if the object has become stationary
|
|
if self.genai_client is not None and not data["stationary"]:
|
|
if data["id"] not in self.tracked_events:
|
|
self.tracked_events[data["id"]] = []
|
|
|
|
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
|
|
|
|
# Limit the number of thumbnails saved
|
|
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS:
|
|
# Always keep the first thumbnail for the event
|
|
self.tracked_events[data["id"]].pop(1)
|
|
|
|
self.tracked_events[data["id"]].append(data)
|
|
|
|
# check if we're configured to send an early request after a minimum number of updates received
|
|
if (
|
|
self.genai_client is not None
|
|
and camera_config.genai.send_triggers.after_significant_updates
|
|
):
|
|
if (
|
|
len(self.tracked_events.get(data["id"], []))
|
|
>= camera_config.genai.send_triggers.after_significant_updates
|
|
and data["id"] not in self.early_request_sent
|
|
):
|
|
if data["has_clip"] and data["has_snapshot"]:
|
|
event: Event = Event.get(Event.id == data["id"])
|
|
|
|
if (
|
|
not camera_config.genai.objects
|
|
or event.label in camera_config.genai.objects
|
|
) and (
|
|
not camera_config.genai.required_zones
|
|
or set(data["entered_zones"])
|
|
& set(camera_config.genai.required_zones)
|
|
):
|
|
logger.debug(f"{camera} sending early request to GenAI")
|
|
|
|
self.early_request_sent[data["id"]] = True
|
|
threading.Thread(
|
|
target=self._genai_embed_description,
|
|
name=f"_genai_embed_description_{event.id}",
|
|
daemon=True,
|
|
args=(
|
|
event,
|
|
[
|
|
data["thumbnail"]
|
|
for data in self.tracked_events[data["id"]]
|
|
],
|
|
),
|
|
).start()
|
|
|
|
self.frame_manager.close(frame_name)
|
|
|
|
def _process_finalized(self) -> None:
|
|
"""Process the end of an event."""
|
|
while True:
|
|
ended = self.event_end_subscriber.check_for_update()
|
|
|
|
if ended == None:
|
|
break
|
|
|
|
event_id, camera, updated_db = ended
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
# call any defined post processors
|
|
for processor in self.post_processors:
|
|
if isinstance(processor, LicensePlatePostProcessor):
|
|
recordings_available = self.recordings_available_through.get(camera)
|
|
if (
|
|
recordings_available is not None
|
|
and event_id in self.detected_license_plates
|
|
and self.config.cameras[camera].type != "lpr"
|
|
):
|
|
processor.process_data(
|
|
{
|
|
"event_id": event_id,
|
|
"camera": camera,
|
|
"recordings_available": self.recordings_available_through[
|
|
camera
|
|
],
|
|
"obj_data": self.detected_license_plates[event_id][
|
|
"obj_data"
|
|
],
|
|
},
|
|
PostProcessDataEnum.recording,
|
|
)
|
|
else:
|
|
processor.process_data(event_id, PostProcessDataEnum.event_id)
|
|
|
|
# expire in realtime processors
|
|
for processor in self.realtime_processors:
|
|
processor.expire_object(event_id, camera)
|
|
|
|
if updated_db:
|
|
try:
|
|
event: Event = Event.get(Event.id == event_id)
|
|
except DoesNotExist:
|
|
continue
|
|
|
|
# Skip the event if not an object
|
|
if event.data.get("type") != "object":
|
|
continue
|
|
|
|
# Extract valid thumbnail
|
|
thumbnail = get_event_thumbnail_bytes(event)
|
|
|
|
# Embed the thumbnail
|
|
self._embed_thumbnail(event_id, thumbnail)
|
|
|
|
# Run GenAI
|
|
if (
|
|
camera_config.genai.enabled
|
|
and camera_config.genai.send_triggers.tracked_object_end
|
|
and self.genai_client is not None
|
|
and (
|
|
not camera_config.genai.objects
|
|
or event.label in camera_config.genai.objects
|
|
)
|
|
and (
|
|
not camera_config.genai.required_zones
|
|
or set(event.zones) & set(camera_config.genai.required_zones)
|
|
)
|
|
):
|
|
self._process_genai_description(event, camera_config, thumbnail)
|
|
|
|
# Delete tracked events based on the event_id
|
|
if event_id in self.tracked_events:
|
|
del self.tracked_events[event_id]
|
|
|
|
def _expire_dedicated_lpr(self) -> None:
|
|
"""Remove plates not seen for longer than expiration timeout for dedicated lpr cameras."""
|
|
now = datetime.datetime.now().timestamp()
|
|
|
|
to_remove = []
|
|
|
|
for id, data in self.detected_license_plates.items():
|
|
last_seen = data.get("last_seen", 0)
|
|
if not last_seen:
|
|
continue
|
|
|
|
if now - last_seen > self.config.cameras[data["camera"]].lpr.expire_time:
|
|
to_remove.append(id)
|
|
for id in to_remove:
|
|
self.event_metadata_publisher.publish(
|
|
EventMetadataTypeEnum.manual_event_end,
|
|
(id, now),
|
|
)
|
|
self.detected_license_plates.pop(id)
|
|
|
|
def _process_recordings_updates(self) -> None:
|
|
"""Process recordings updates."""
|
|
while True:
|
|
recordings_data = self.recordings_subscriber.check_for_update()
|
|
|
|
if recordings_data == None:
|
|
break
|
|
|
|
camera, recordings_available_through_timestamp = recordings_data
|
|
|
|
self.recordings_available_through[camera] = (
|
|
recordings_available_through_timestamp
|
|
)
|
|
|
|
logger.debug(
|
|
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
|
|
)
|
|
|
|
def _process_event_metadata(self):
|
|
# Check for regenerate description requests
|
|
(topic, payload) = self.event_metadata_subscriber.check_for_update()
|
|
|
|
if topic is None:
|
|
return
|
|
|
|
event_id, source = payload
|
|
|
|
if event_id:
|
|
self.handle_regenerate_description(
|
|
event_id, RegenerateDescriptionEnum(source)
|
|
)
|
|
|
|
def _process_dedicated_lpr(self) -> None:
|
|
"""Process event updates"""
|
|
(topic, data) = self.detection_subscriber.check_for_update()
|
|
|
|
if topic is None:
|
|
return
|
|
|
|
camera, frame_name, _, _, motion_boxes, _ = data
|
|
|
|
if not camera or not self.config.lpr.enabled or len(motion_boxes) == 0:
|
|
return
|
|
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
if (
|
|
camera_config.type != CameraTypeEnum.lpr
|
|
or "license_plate" in camera_config.objects.track
|
|
):
|
|
# we're not a dedicated lpr camera or we are one but we're using frigate+
|
|
return
|
|
|
|
try:
|
|
yuv_frame = self.frame_manager.get(
|
|
frame_name, camera_config.frame_shape_yuv
|
|
)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
if yuv_frame is None:
|
|
logger.debug(
|
|
"Unable to process dedicated LPR update because frame is unavailable."
|
|
)
|
|
return
|
|
|
|
for processor in self.realtime_processors:
|
|
if isinstance(processor, LicensePlateRealTimeProcessor):
|
|
processor.process_frame(camera, yuv_frame, True)
|
|
|
|
self.frame_manager.close(frame_name)
|
|
|
|
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
|
|
"""Return jpg thumbnail of a region of the frame."""
|
|
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
|
|
region = calculate_region(
|
|
frame.shape, box[0], box[1], box[2], box[3], height, multiplier=1.4
|
|
)
|
|
frame = frame[region[1] : region[3], region[0] : region[2]]
|
|
width = int(height * frame.shape[1] / frame.shape[0])
|
|
frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
|
|
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
|
|
|
|
if ret:
|
|
return jpg.tobytes()
|
|
|
|
return None
|
|
|
|
def _embed_thumbnail(self, event_id: str, thumbnail: bytes) -> None:
|
|
"""Embed the thumbnail for an event."""
|
|
if not self.config.semantic_search.enabled:
|
|
return
|
|
|
|
self.embeddings.embed_thumbnail(event_id, thumbnail)
|
|
|
|
def _process_genai_description(self, event, camera_config, thumbnail) -> None:
|
|
if event.has_snapshot and camera_config.genai.use_snapshot:
|
|
snapshot_image = self._read_and_crop_snapshot(event, camera_config)
|
|
if not snapshot_image:
|
|
return
|
|
|
|
num_thumbnails = len(self.tracked_events.get(event.id, []))
|
|
|
|
# ensure we have a jpeg to pass to the model
|
|
thumbnail = ensure_jpeg_bytes(thumbnail)
|
|
|
|
embed_image = (
|
|
[snapshot_image]
|
|
if event.has_snapshot and camera_config.genai.use_snapshot
|
|
else (
|
|
[data["thumbnail"] for data in self.tracked_events[event.id]]
|
|
if num_thumbnails > 0
|
|
else [thumbnail]
|
|
)
|
|
)
|
|
|
|
if camera_config.genai.debug_save_thumbnails and num_thumbnails > 0:
|
|
logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}")
|
|
|
|
Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir(
|
|
parents=True, exist_ok=True
|
|
)
|
|
|
|
for idx, data in enumerate(self.tracked_events[event.id], 1):
|
|
jpg_bytes: bytes = data["thumbnail"]
|
|
|
|
if jpg_bytes is None:
|
|
logger.warning(f"Unable to save thumbnail {idx} for {event.id}.")
|
|
else:
|
|
with open(
|
|
os.path.join(
|
|
CLIPS_DIR,
|
|
f"genai-requests/{event.id}/{idx}.jpg",
|
|
),
|
|
"wb",
|
|
) as j:
|
|
j.write(jpg_bytes)
|
|
|
|
# Generate the description. Call happens in a thread since it is network bound.
|
|
threading.Thread(
|
|
target=self._genai_embed_description,
|
|
name=f"_genai_embed_description_{event.id}",
|
|
daemon=True,
|
|
args=(
|
|
event,
|
|
embed_image,
|
|
),
|
|
).start()
|
|
|
|
def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None:
|
|
"""Embed the description for an event."""
|
|
camera_config = self.config.cameras[event.camera]
|
|
|
|
description = self.genai_client.generate_description(
|
|
camera_config, thumbnails, event
|
|
)
|
|
|
|
if not description:
|
|
logger.debug("Failed to generate description for %s", event.id)
|
|
return
|
|
|
|
# fire and forget description update
|
|
self.requestor.send_data(
|
|
UPDATE_EVENT_DESCRIPTION,
|
|
{
|
|
"type": TrackedObjectUpdateTypesEnum.description,
|
|
"id": event.id,
|
|
"description": description,
|
|
"camera": event.camera,
|
|
},
|
|
)
|
|
|
|
# Embed the description
|
|
if self.config.semantic_search.enabled:
|
|
self.embeddings.embed_description(event.id, description)
|
|
|
|
logger.debug(
|
|
"Generated description for %s (%d images): %s",
|
|
event.id,
|
|
len(thumbnails),
|
|
description,
|
|
)
|
|
|
|
def _read_and_crop_snapshot(self, event: Event, camera_config) -> bytes | None:
|
|
"""Read, decode, and crop the snapshot image."""
|
|
|
|
snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg")
|
|
|
|
if not os.path.isfile(snapshot_file):
|
|
logger.error(
|
|
f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}"
|
|
)
|
|
return None
|
|
|
|
try:
|
|
with open(snapshot_file, "rb") as image_file:
|
|
snapshot_image = image_file.read()
|
|
|
|
img = cv2.imdecode(
|
|
np.frombuffer(snapshot_image, dtype=np.int8),
|
|
cv2.IMREAD_COLOR,
|
|
)
|
|
|
|
# Crop snapshot based on region
|
|
# provide full image if region doesn't exist (manual events)
|
|
height, width = img.shape[:2]
|
|
x1_rel, y1_rel, width_rel, height_rel = event.data.get(
|
|
"region", [0, 0, 1, 1]
|
|
)
|
|
x1, y1 = int(x1_rel * width), int(y1_rel * height)
|
|
|
|
cropped_image = img[
|
|
y1 : y1 + int(height_rel * height),
|
|
x1 : x1 + int(width_rel * width),
|
|
]
|
|
|
|
_, buffer = cv2.imencode(".jpg", cropped_image)
|
|
|
|
return buffer.tobytes()
|
|
except Exception:
|
|
return None
|
|
|
|
def handle_regenerate_description(self, event_id: str, source: str) -> None:
|
|
try:
|
|
event: Event = Event.get(Event.id == event_id)
|
|
except DoesNotExist:
|
|
logger.error(f"Event {event_id} not found for description regeneration")
|
|
return
|
|
|
|
camera_config = self.config.cameras[event.camera]
|
|
if not camera_config.genai.enabled or self.genai_client is None:
|
|
logger.error(f"GenAI not enabled for camera {event.camera}")
|
|
return
|
|
|
|
thumbnail = get_event_thumbnail_bytes(event)
|
|
|
|
# ensure we have a jpeg to pass to the model
|
|
thumbnail = ensure_jpeg_bytes(thumbnail)
|
|
|
|
logger.debug(
|
|
f"Trying {source} regeneration for {event}, has_snapshot: {event.has_snapshot}"
|
|
)
|
|
|
|
if event.has_snapshot and source == "snapshot":
|
|
snapshot_image = self._read_and_crop_snapshot(event, camera_config)
|
|
if not snapshot_image:
|
|
return
|
|
|
|
embed_image = (
|
|
[snapshot_image]
|
|
if event.has_snapshot and source == "snapshot"
|
|
else (
|
|
[data["thumbnail"] for data in self.tracked_events[event_id]]
|
|
if len(self.tracked_events.get(event_id, [])) > 0
|
|
else [thumbnail]
|
|
)
|
|
)
|
|
|
|
self._genai_embed_description(event, embed_image)
|