blakeblackshear.frigate/frigate/embeddings/embeddings.py
Josh Hawkins ef406ba647 Semantic Search Triggers (#18969)
* semantic trigger test

* database and model

* config

* embeddings maintainer and trigger post-processor

* api to create, edit, delete triggers

* frontend and i18n keys

* use thumbnail and description for trigger types

* image picker tweaks

* initial sync

* thumbnail file management

* clean up logs and use saved thumbnail on frontend

* publish mqtt messages

* webpush changes to enable trigger notifications

* add enabled switch

* add triggers from explore

* renaming and deletion fixes

* fix typing

* UI updates and add last triggering event time and link

* log exception instead of return in endpoint

* highlight entry in UI when triggered

* save and delete thumbnails directly

* remove alert action for now and add descriptions

* tweaks

* clean up

* fix types

* docs

* docs tweaks

* docs

* reuse enum
2025-08-07 20:57:35 -06:00

646 lines
24 KiB
Python

"""SQLite-vec embeddings database."""
import datetime
import io
import logging
import os
import threading
import time
import numpy as np
from peewee import DoesNotExist, IntegrityError
from PIL import Image
from playhouse.shortcuts import model_to_dict
from frigate.comms.embeddings_updater import (
EmbeddingsRequestEnum,
)
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.classification import SemanticSearchModelEnum
from frigate.const import (
CONFIG_DIR,
TRIGGER_DIR,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_MODEL_STATE,
)
from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event, Trigger
from frigate.types import ModelStatusTypesEnum
from frigate.util.builtin import EventsPerSecond, InferenceSpeed, serialize
from frigate.util.path import get_event_thumbnail_bytes
from .onnx.jina_v1_embedding import JinaV1ImageEmbedding, JinaV1TextEmbedding
from .onnx.jina_v2_embedding import JinaV2Embedding
logger = logging.getLogger(__name__)
def get_metadata(event: Event) -> dict:
"""Extract valid event metadata."""
event_dict = model_to_dict(event)
return (
{
k: v
for k, v in event_dict.items()
if k not in ["thumbnail"]
and v is not None
and isinstance(v, (str, int, float, bool))
}
| {
k: v
for k, v in event_dict["data"].items()
if k not in ["description"]
and v is not None
and isinstance(v, (str, int, float, bool))
}
| {
# Metadata search doesn't support $contains
# and an event can have multiple zones, so
# we need to create a key for each zone
f"{k}_{x}": True
for k, v in event_dict.items()
if isinstance(v, list) and len(v) > 0
for x in v
if isinstance(x, str)
}
)
class Embeddings:
"""SQLite-vec embeddings database."""
def __init__(
self,
config: FrigateConfig,
db: SqliteVecQueueDatabase,
metrics: DataProcessorMetrics,
) -> None:
self.config = config
self.db = db
self.metrics = metrics
self.requestor = InterProcessRequestor()
self.image_inference_speed = InferenceSpeed(self.metrics.image_embeddings_speed)
self.image_eps = EventsPerSecond()
self.image_eps.start()
self.text_inference_speed = InferenceSpeed(self.metrics.text_embeddings_speed)
self.text_eps = EventsPerSecond()
self.text_eps.start()
self.reindex_lock = threading.Lock()
self.reindex_thread = None
self.reindex_running = False
# Create tables if they don't exist
self.db.create_embeddings_tables()
models = self.get_model_definitions()
for model in models:
self.requestor.send_data(
UPDATE_MODEL_STATE,
{
"model": model,
"state": ModelStatusTypesEnum.not_downloaded,
},
)
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2:
# Single JinaV2Embedding instance for both text and vision
self.embedding = JinaV2Embedding(
model_size=self.config.semantic_search.model_size,
requestor=self.requestor,
device="GPU"
if self.config.semantic_search.model_size == "large"
else "CPU",
)
self.text_embedding = lambda input_data: self.embedding(
input_data, embedding_type="text"
)
self.vision_embedding = lambda input_data: self.embedding(
input_data, embedding_type="vision"
)
else: # Default to jinav1
self.text_embedding = JinaV1TextEmbedding(
model_size=config.semantic_search.model_size,
requestor=self.requestor,
device="CPU",
)
self.vision_embedding = JinaV1ImageEmbedding(
model_size=config.semantic_search.model_size,
requestor=self.requestor,
device="GPU" if config.semantic_search.model_size == "large" else "CPU",
)
def update_stats(self) -> None:
self.metrics.image_embeddings_eps.value = self.image_eps.eps()
self.metrics.text_embeddings_eps.value = self.text_eps.eps()
def get_model_definitions(self):
# Version-specific models
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2:
models = [
"jinaai/jina-clip-v2-tokenizer",
"jinaai/jina-clip-v2-model_fp16.onnx"
if self.config.semantic_search.model_size == "large"
else "jinaai/jina-clip-v2-model_quantized.onnx",
"jinaai/jina-clip-v2-preprocessor_config.json",
]
else: # Default to jinav1
models = [
"jinaai/jina-clip-v1-text_model_fp16.onnx",
"jinaai/jina-clip-v1-tokenizer",
"jinaai/jina-clip-v1-vision_model_fp16.onnx"
if self.config.semantic_search.model_size == "large"
else "jinaai/jina-clip-v1-vision_model_quantized.onnx",
"jinaai/jina-clip-v1-preprocessor_config.json",
]
# Add common models
models.extend(
[
"facenet-facenet.onnx",
"paddleocr-onnx-detection.onnx",
"paddleocr-onnx-classification.onnx",
"paddleocr-onnx-recognition.onnx",
]
)
return models
def embed_thumbnail(
self, event_id: str, thumbnail: bytes, upsert: bool = True
) -> np.ndarray:
"""Embed thumbnail and optionally insert into DB.
@param: event_id in Events DB
@param: thumbnail bytes in jpg format
@param: upsert If embedding should be upserted into vec DB
"""
start = datetime.datetime.now().timestamp()
# Convert thumbnail bytes to PIL Image
embedding = self.vision_embedding([thumbnail])[0]
if upsert:
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_thumbnails(id, thumbnail_embedding)
VALUES(?, ?)
""",
(event_id, serialize(embedding)),
)
self.image_inference_speed.update(datetime.datetime.now().timestamp() - start)
self.image_eps.update()
return embedding
def batch_embed_thumbnail(
self, event_thumbs: dict[str, bytes], upsert: bool = True
) -> list[np.ndarray]:
"""Embed thumbnails and optionally insert into DB.
@param: event_thumbs Map of Event IDs in DB to thumbnail bytes in jpg format
@param: upsert If embedding should be upserted into vec DB
"""
start = datetime.datetime.now().timestamp()
valid_ids = []
valid_thumbs = []
for eid, thumb in event_thumbs.items():
try:
img = Image.open(io.BytesIO(thumb))
img.verify() # Will raise if corrupt
valid_ids.append(eid)
valid_thumbs.append(thumb)
except Exception as e:
logger.warning(
f"Embeddings reindexing: Skipping corrupt thumbnail for event {eid}: {e}"
)
if not valid_thumbs:
logger.warning(
"Embeddings reindexing: No valid thumbnails to embed in this batch."
)
return []
embeddings = self.vision_embedding(valid_thumbs)
if upsert:
items = []
for i in range(len(valid_ids)):
items.append(valid_ids[i])
items.append(serialize(embeddings[i]))
self.image_eps.update()
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_thumbnails(id, thumbnail_embedding)
VALUES {}
""".format(", ".join(["(?, ?)"] * len(valid_ids))),
items,
)
duration = datetime.datetime.now().timestamp() - start
self.text_inference_speed.update(duration / len(valid_ids))
return embeddings
def embed_description(
self, event_id: str, description: str, upsert: bool = True
) -> np.ndarray:
start = datetime.datetime.now().timestamp()
embedding = self.text_embedding([description])[0]
if upsert:
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_descriptions(id, description_embedding)
VALUES(?, ?)
""",
(event_id, serialize(embedding)),
)
self.text_inference_speed.update(datetime.datetime.now().timestamp() - start)
self.text_eps.update()
return embedding
def batch_embed_description(
self, event_descriptions: dict[str, str], upsert: bool = True
) -> np.ndarray:
start = datetime.datetime.now().timestamp()
# upsert embeddings one by one to avoid token limit
embeddings = []
for desc in event_descriptions.values():
embeddings.append(self.text_embedding([desc])[0])
if upsert:
ids = list(event_descriptions.keys())
items = []
for i in range(len(ids)):
items.append(ids[i])
items.append(serialize(embeddings[i]))
self.text_eps.update()
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_descriptions(id, description_embedding)
VALUES {}
""".format(", ".join(["(?, ?)"] * len(ids))),
items,
)
self.text_inference_speed.update(datetime.datetime.now().timestamp() - start)
return embeddings
def reindex(self) -> None:
logger.info("Indexing tracked object embeddings...")
self.db.drop_embeddings_tables()
logger.debug("Dropped embeddings tables.")
self.db.create_embeddings_tables()
logger.debug("Created embeddings tables.")
# Delete the saved stats file
if os.path.exists(os.path.join(CONFIG_DIR, ".search_stats.json")):
os.remove(os.path.join(CONFIG_DIR, ".search_stats.json"))
st = time.time()
# Get total count of events to process
total_events = Event.select().count()
batch_size = (
4
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2
else 32
)
current_page = 1
totals = {
"thumbnails": 0,
"descriptions": 0,
"processed_objects": total_events - 1 if total_events < batch_size else 0,
"total_objects": total_events,
"time_remaining": 0 if total_events < batch_size else -1,
"status": "indexing",
}
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
events = (
Event.select()
.order_by(Event.start_time.desc())
.paginate(current_page, batch_size)
)
while events:
event: Event
batch_thumbs = {}
batch_descs = {}
for event in events:
totals["processed_objects"] += 1
if description := event.data.get("description", "").strip():
batch_descs[event.id] = description
totals["descriptions"] += 1
if thumbnail := get_event_thumbnail_bytes(event):
batch_thumbs[event.id] = thumbnail
totals["thumbnails"] += 1
# run batch embedding
if batch_thumbs:
self.batch_embed_thumbnail(batch_thumbs)
if batch_descs:
self.batch_embed_description(batch_descs)
# report progress every batch so we don't spam the logs
progress = (totals["processed_objects"] / total_events) * 100
logger.debug(
"Processed %d/%d events (%.2f%% complete) | Thumbnails: %d, Descriptions: %d",
totals["processed_objects"],
total_events,
progress,
totals["thumbnails"],
totals["descriptions"],
)
# Calculate time remaining
elapsed_time = time.time() - st
avg_time_per_event = elapsed_time / totals["processed_objects"]
remaining_events = total_events - totals["processed_objects"]
time_remaining = avg_time_per_event * remaining_events
totals["time_remaining"] = int(time_remaining)
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
# Move to the next page
current_page += 1
events = (
Event.select()
.order_by(Event.start_time.desc())
.paginate(current_page, batch_size)
)
logger.info(
"Embedded %d thumbnails and %d descriptions in %s seconds",
totals["thumbnails"],
totals["descriptions"],
round(time.time() - st, 1),
)
totals["status"] = "completed"
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
def start_reindex(self) -> bool:
"""Start reindexing in a separate thread if not already running."""
with self.reindex_lock:
if self.reindex_running:
logger.warning("Reindex embeddings is already running.")
return False
# Mark as running and start the thread
self.reindex_running = True
self.reindex_thread = threading.Thread(
target=self._reindex_wrapper, daemon=True
)
self.reindex_thread.start()
return True
def _reindex_wrapper(self) -> None:
"""Wrapper to run reindex and reset running flag when done."""
try:
self.reindex()
finally:
with self.reindex_lock:
self.reindex_running = False
self.reindex_thread = None
def sync_triggers(self) -> None:
for camera in self.config.cameras.values():
# Get all existing triggers for this camera
existing_triggers = {
trigger.name: trigger
for trigger in Trigger.select().where(Trigger.camera == camera.name)
}
# Get all configured trigger names
configured_trigger_names = set(camera.semantic_search.triggers or {})
# Create or update triggers from config
for trigger_name, trigger in (
camera.semantic_search.triggers or {}
).items():
if trigger_name in existing_triggers:
existing_trigger = existing_triggers[trigger_name]
needs_embedding_update = False
thumbnail_missing = False
# Check if data has changed or thumbnail is missing for thumbnail type
if trigger.type == "thumbnail":
thumbnail_path = os.path.join(
TRIGGER_DIR, camera.name, f"{trigger.data}.webp"
)
try:
event = Event.get(Event.id == trigger.data)
if event.data.get("type") != "object":
logger.warning(
f"Event {trigger.data} is not a tracked object for {trigger.type} trigger"
)
continue # Skip if not an object
# Check if thumbnail needs to be updated (data changed or missing)
if (
existing_trigger.data != trigger.data
or not os.path.exists(thumbnail_path)
):
thumbnail = get_event_thumbnail_bytes(event)
if not thumbnail:
logger.warning(
f"Unable to retrieve thumbnail for event ID {trigger.data} for {trigger_name}."
)
continue
self.write_trigger_thumbnail(
camera.name, trigger.data, thumbnail
)
thumbnail_missing = True
except DoesNotExist:
logger.warning(
f"Event ID {trigger.data} for trigger {trigger_name} does not exist."
)
continue
# Update existing trigger if data has changed
if (
existing_trigger.type != trigger.type
or existing_trigger.data != trigger.data
or existing_trigger.threshold != trigger.threshold
):
existing_trigger.type = trigger.type
existing_trigger.data = trigger.data
existing_trigger.threshold = trigger.threshold
needs_embedding_update = True
# Check if embedding is missing or needs update
if (
not existing_trigger.embedding
or needs_embedding_update
or thumbnail_missing
):
existing_trigger.embedding = self._calculate_trigger_embedding(
trigger
)
needs_embedding_update = True
if needs_embedding_update:
existing_trigger.save()
else:
# Create new trigger
try:
try:
event: Event = Event.get(Event.id == trigger.data)
except DoesNotExist:
logger.warning(
f"Event ID {trigger.data} for trigger {trigger_name} does not exist."
)
continue
# Skip the event if not an object
if event.data.get("type") != "object":
logger.warning(
f"Event ID {trigger.data} for trigger {trigger_name} is not a tracked object."
)
continue
thumbnail = get_event_thumbnail_bytes(event)
if not thumbnail:
logger.warning(
f"Unable to retrieve thumbnail for event ID {trigger.data} for {trigger_name}."
)
continue
self.write_trigger_thumbnail(
camera.name, trigger.data, thumbnail
)
# Calculate embedding for new trigger
embedding = self._calculate_trigger_embedding(trigger)
Trigger.create(
camera=camera.name,
name=trigger_name,
type=trigger.type,
data=trigger.data,
threshold=trigger.threshold,
model=self.config.semantic_search.model,
embedding=embedding,
triggering_event_id="",
last_triggered=None,
)
except IntegrityError:
pass # Handle duplicate creation attempts
# Remove triggers that are no longer in config
triggers_to_remove = (
set(existing_triggers.keys()) - configured_trigger_names
)
if triggers_to_remove:
Trigger.delete().where(
Trigger.camera == camera.name, Trigger.name.in_(triggers_to_remove)
).execute()
for trigger_name in triggers_to_remove:
self.remove_trigger_thumbnail(camera.name, trigger_name)
def write_trigger_thumbnail(
self, camera: str, event_id: str, thumbnail: bytes
) -> None:
"""Write the thumbnail to the trigger directory."""
try:
os.makedirs(os.path.join(TRIGGER_DIR, camera), exist_ok=True)
with open(os.path.join(TRIGGER_DIR, camera, f"{event_id}.webp"), "wb") as f:
f.write(thumbnail)
logger.debug(
f"Writing thumbnail for trigger with data {event_id} in {camera}."
)
except Exception as e:
logger.error(
f"Failed to write thumbnail for trigger with data {event_id} in {camera}: {e}"
)
def remove_trigger_thumbnail(self, camera: str, event_id: str) -> None:
"""Write the thumbnail to the trigger directory."""
try:
os.remove(os.path.join(TRIGGER_DIR, camera, f"{event_id}.webp"))
logger.debug(
f"Deleted thumbnail for trigger with data {event_id} in {camera}."
)
except Exception as e:
logger.error(
f"Failed to delete thumbnail for trigger with data {event_id} in {camera}: {e}"
)
def _calculate_trigger_embedding(self, trigger) -> bytes:
"""Calculate embedding for a trigger based on its type and data."""
if trigger.type == "description":
logger.debug(f"Generating embedding for trigger description {trigger.name}")
embedding = self.requestor.send_data(
EmbeddingsRequestEnum.embed_description.value,
{"id": None, "description": trigger.data, "upsert": False},
)
return embedding.astype(np.float32).tobytes()
elif trigger.type == "thumbnail":
# For image triggers, trigger.data should be an image ID
# Try to get embedding from vec_thumbnails table first
cursor = self.db.execute_sql(
"SELECT thumbnail_embedding FROM vec_thumbnails WHERE id = ?",
[trigger.data],
)
row = cursor.fetchone() if cursor else None
if row:
return row[0] # Already in bytes format
else:
logger.debug(
f"No thumbnail embedding found for image ID: {trigger.data}, generating from saved trigger thumbnail"
)
try:
with open(
os.path.join(
TRIGGER_DIR, trigger.camera, f"{trigger.data}.webp"
),
"rb",
) as f:
thumbnail = f.read()
except Exception as e:
logger.error(
f"Failed to read thumbnail for trigger {trigger.name} with ID {trigger.data}: {e}"
)
return b""
logger.debug(
f"Generating embedding for trigger thumbnail {trigger.name} with ID {trigger.data}"
)
embedding = self.requestor.send_data(
EmbeddingsRequestEnum.embed_thumbnail.value,
{
"id": str(trigger.data),
"thumbnail": str(thumbnail),
"upsert": False,
},
)
return embedding.astype(np.float32).tobytes()
else:
logger.warning(f"Unknown trigger type: {trigger.type}")
return b""