blakeblackshear.frigate/frigate/embeddings/embeddings.py
Josh Hawkins 57bb0cc397
Semantic Search Triggers (#18969)
* semantic trigger test

* database and model

* config

* embeddings maintainer and trigger post-processor

* api to create, edit, delete triggers

* frontend and i18n keys

* use thumbnail and description for trigger types

* image picker tweaks

* initial sync

* thumbnail file management

* clean up logs and use saved thumbnail on frontend

* publish mqtt messages

* webpush changes to enable trigger notifications

* add enabled switch

* add triggers from explore

* renaming and deletion fixes

* fix typing

* UI updates and add last triggering event time and link

* log exception instead of return in endpoint

* highlight entry in UI when triggered

* save and delete thumbnails directly

* remove alert action for now and add descriptions

* tweaks

* clean up

* fix types

* docs

* docs tweaks

* docs

* reuse enum
2025-07-07 09:03:57 -05:00

630 lines
23 KiB
Python

"""SQLite-vec embeddings database."""
import datetime
import logging
import os
import threading
import time
import numpy as np
from peewee import DoesNotExist, IntegrityError
from playhouse.shortcuts import model_to_dict
from frigate.comms.embeddings_updater import (
EmbeddingsRequestEnum,
)
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.classification import SemanticSearchModelEnum
from frigate.const import (
CONFIG_DIR,
TRIGGER_DIR,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_MODEL_STATE,
)
from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event, Trigger
from frigate.types import ModelStatusTypesEnum
from frigate.util.builtin import EventsPerSecond, InferenceSpeed, serialize
from frigate.util.path import get_event_thumbnail_bytes
from .onnx.jina_v1_embedding import JinaV1ImageEmbedding, JinaV1TextEmbedding
from .onnx.jina_v2_embedding import JinaV2Embedding
logger = logging.getLogger(__name__)
def get_metadata(event: Event) -> dict:
"""Extract valid event metadata."""
event_dict = model_to_dict(event)
return (
{
k: v
for k, v in event_dict.items()
if k not in ["thumbnail"]
and v is not None
and isinstance(v, (str, int, float, bool))
}
| {
k: v
for k, v in event_dict["data"].items()
if k not in ["description"]
and v is not None
and isinstance(v, (str, int, float, bool))
}
| {
# Metadata search doesn't support $contains
# and an event can have multiple zones, so
# we need to create a key for each zone
f"{k}_{x}": True
for k, v in event_dict.items()
if isinstance(v, list) and len(v) > 0
for x in v
if isinstance(x, str)
}
)
class Embeddings:
"""SQLite-vec embeddings database."""
def __init__(
self,
config: FrigateConfig,
db: SqliteVecQueueDatabase,
metrics: DataProcessorMetrics,
) -> None:
self.config = config
self.db = db
self.metrics = metrics
self.requestor = InterProcessRequestor()
self.image_inference_speed = InferenceSpeed(self.metrics.image_embeddings_speed)
self.image_eps = EventsPerSecond()
self.image_eps.start()
self.text_inference_speed = InferenceSpeed(self.metrics.text_embeddings_speed)
self.text_eps = EventsPerSecond()
self.text_eps.start()
self.reindex_lock = threading.Lock()
self.reindex_thread = None
self.reindex_running = False
# Create tables if they don't exist
self.db.create_embeddings_tables()
models = self.get_model_definitions()
for model in models:
self.requestor.send_data(
UPDATE_MODEL_STATE,
{
"model": model,
"state": ModelStatusTypesEnum.not_downloaded,
},
)
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2:
# Single JinaV2Embedding instance for both text and vision
self.embedding = JinaV2Embedding(
model_size=self.config.semantic_search.model_size,
requestor=self.requestor,
device="GPU"
if self.config.semantic_search.model_size == "large"
else "CPU",
)
self.text_embedding = lambda input_data: self.embedding(
input_data, embedding_type="text"
)
self.vision_embedding = lambda input_data: self.embedding(
input_data, embedding_type="vision"
)
else: # Default to jinav1
self.text_embedding = JinaV1TextEmbedding(
model_size=config.semantic_search.model_size,
requestor=self.requestor,
device="CPU",
)
self.vision_embedding = JinaV1ImageEmbedding(
model_size=config.semantic_search.model_size,
requestor=self.requestor,
device="GPU" if config.semantic_search.model_size == "large" else "CPU",
)
def update_stats(self) -> None:
self.metrics.image_embeddings_eps.value = self.image_eps.eps()
self.metrics.text_embeddings_eps.value = self.text_eps.eps()
def get_model_definitions(self):
# Version-specific models
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2:
models = [
"jinaai/jina-clip-v2-tokenizer",
"jinaai/jina-clip-v2-model_fp16.onnx"
if self.config.semantic_search.model_size == "large"
else "jinaai/jina-clip-v2-model_quantized.onnx",
"jinaai/jina-clip-v2-preprocessor_config.json",
]
else: # Default to jinav1
models = [
"jinaai/jina-clip-v1-text_model_fp16.onnx",
"jinaai/jina-clip-v1-tokenizer",
"jinaai/jina-clip-v1-vision_model_fp16.onnx"
if self.config.semantic_search.model_size == "large"
else "jinaai/jina-clip-v1-vision_model_quantized.onnx",
"jinaai/jina-clip-v1-preprocessor_config.json",
]
# Add common models
models.extend(
[
"facenet-facenet.onnx",
"paddleocr-onnx-detection.onnx",
"paddleocr-onnx-classification.onnx",
"paddleocr-onnx-recognition.onnx",
]
)
return models
def embed_thumbnail(
self, event_id: str, thumbnail: bytes, upsert: bool = True
) -> np.ndarray:
"""Embed thumbnail and optionally insert into DB.
@param: event_id in Events DB
@param: thumbnail bytes in jpg format
@param: upsert If embedding should be upserted into vec DB
"""
start = datetime.datetime.now().timestamp()
# Convert thumbnail bytes to PIL Image
embedding = self.vision_embedding([thumbnail])[0]
if upsert:
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_thumbnails(id, thumbnail_embedding)
VALUES(?, ?)
""",
(event_id, serialize(embedding)),
)
self.image_inference_speed.update(datetime.datetime.now().timestamp() - start)
self.image_eps.update()
return embedding
def batch_embed_thumbnail(
self, event_thumbs: dict[str, bytes], upsert: bool = True
) -> list[np.ndarray]:
"""Embed thumbnails and optionally insert into DB.
@param: event_thumbs Map of Event IDs in DB to thumbnail bytes in jpg format
@param: upsert If embedding should be upserted into vec DB
"""
start = datetime.datetime.now().timestamp()
ids = list(event_thumbs.keys())
embeddings = self.vision_embedding(list(event_thumbs.values()))
if upsert:
items = []
for i in range(len(ids)):
items.append(ids[i])
items.append(serialize(embeddings[i]))
self.image_eps.update()
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_thumbnails(id, thumbnail_embedding)
VALUES {}
""".format(", ".join(["(?, ?)"] * len(ids))),
items,
)
duration = datetime.datetime.now().timestamp() - start
self.text_inference_speed.update(duration / len(ids))
return embeddings
def embed_description(
self, event_id: str, description: str, upsert: bool = True
) -> np.ndarray:
start = datetime.datetime.now().timestamp()
embedding = self.text_embedding([description])[0]
if upsert:
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_descriptions(id, description_embedding)
VALUES(?, ?)
""",
(event_id, serialize(embedding)),
)
self.text_inference_speed.update(datetime.datetime.now().timestamp() - start)
self.text_eps.update()
return embedding
def batch_embed_description(
self, event_descriptions: dict[str, str], upsert: bool = True
) -> np.ndarray:
start = datetime.datetime.now().timestamp()
# upsert embeddings one by one to avoid token limit
embeddings = []
for desc in event_descriptions.values():
embeddings.append(self.text_embedding([desc])[0])
if upsert:
ids = list(event_descriptions.keys())
items = []
for i in range(len(ids)):
items.append(ids[i])
items.append(serialize(embeddings[i]))
self.text_eps.update()
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_descriptions(id, description_embedding)
VALUES {}
""".format(", ".join(["(?, ?)"] * len(ids))),
items,
)
self.text_inference_speed.update(datetime.datetime.now().timestamp() - start)
return embeddings
def reindex(self) -> None:
logger.info("Indexing tracked object embeddings...")
self.db.drop_embeddings_tables()
logger.debug("Dropped embeddings tables.")
self.db.create_embeddings_tables()
logger.debug("Created embeddings tables.")
# Delete the saved stats file
if os.path.exists(os.path.join(CONFIG_DIR, ".search_stats.json")):
os.remove(os.path.join(CONFIG_DIR, ".search_stats.json"))
st = time.time()
# Get total count of events to process
total_events = Event.select().count()
batch_size = (
4
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2
else 32
)
current_page = 1
totals = {
"thumbnails": 0,
"descriptions": 0,
"processed_objects": total_events - 1 if total_events < batch_size else 0,
"total_objects": total_events,
"time_remaining": 0 if total_events < batch_size else -1,
"status": "indexing",
}
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
events = (
Event.select()
.order_by(Event.start_time.desc())
.paginate(current_page, batch_size)
)
while len(events) > 0:
event: Event
batch_thumbs = {}
batch_descs = {}
for event in events:
thumbnail = get_event_thumbnail_bytes(event)
if thumbnail is None:
continue
batch_thumbs[event.id] = thumbnail
totals["thumbnails"] += 1
if description := event.data.get("description", "").strip():
batch_descs[event.id] = description
totals["descriptions"] += 1
totals["processed_objects"] += 1
# run batch embedding
self.batch_embed_thumbnail(batch_thumbs)
if batch_descs:
self.batch_embed_description(batch_descs)
# report progress every batch so we don't spam the logs
progress = (totals["processed_objects"] / total_events) * 100
logger.debug(
"Processed %d/%d events (%.2f%% complete) | Thumbnails: %d, Descriptions: %d",
totals["processed_objects"],
total_events,
progress,
totals["thumbnails"],
totals["descriptions"],
)
# Calculate time remaining
elapsed_time = time.time() - st
avg_time_per_event = elapsed_time / totals["processed_objects"]
remaining_events = total_events - totals["processed_objects"]
time_remaining = avg_time_per_event * remaining_events
totals["time_remaining"] = int(time_remaining)
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
# Move to the next page
current_page += 1
events = (
Event.select()
.order_by(Event.start_time.desc())
.paginate(current_page, batch_size)
)
logger.info(
"Embedded %d thumbnails and %d descriptions in %s seconds",
totals["thumbnails"],
totals["descriptions"],
round(time.time() - st, 1),
)
totals["status"] = "completed"
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
def start_reindex(self) -> bool:
"""Start reindexing in a separate thread if not already running."""
with self.reindex_lock:
if self.reindex_running:
logger.warning("Reindex embeddings is already running.")
return False
# Mark as running and start the thread
self.reindex_running = True
self.reindex_thread = threading.Thread(
target=self._reindex_wrapper, daemon=True
)
self.reindex_thread.start()
return True
def _reindex_wrapper(self) -> None:
"""Wrapper to run reindex and reset running flag when done."""
try:
self.reindex()
finally:
with self.reindex_lock:
self.reindex_running = False
self.reindex_thread = None
def sync_triggers(self) -> None:
for camera in self.config.cameras.values():
# Get all existing triggers for this camera
existing_triggers = {
trigger.name: trigger
for trigger in Trigger.select().where(Trigger.camera == camera.name)
}
# Get all configured trigger names
configured_trigger_names = set(camera.semantic_search.triggers or {})
# Create or update triggers from config
for trigger_name, trigger in (
camera.semantic_search.triggers or {}
).items():
if trigger_name in existing_triggers:
existing_trigger = existing_triggers[trigger_name]
needs_embedding_update = False
thumbnail_missing = False
# Check if data has changed or thumbnail is missing for thumbnail type
if trigger.type == "thumbnail":
thumbnail_path = os.path.join(
TRIGGER_DIR, camera.name, f"{trigger.data}.webp"
)
try:
event = Event.get(Event.id == trigger.data)
if event.data.get("type") != "object":
logger.warning(
f"Event {trigger.data} is not a tracked object for {trigger.type} trigger"
)
continue # Skip if not an object
# Check if thumbnail needs to be updated (data changed or missing)
if (
existing_trigger.data != trigger.data
or not os.path.exists(thumbnail_path)
):
thumbnail = get_event_thumbnail_bytes(event)
if not thumbnail:
logger.warning(
f"Unable to retrieve thumbnail for event ID {trigger.data} for {trigger_name}."
)
continue
self.write_trigger_thumbnail(
camera.name, trigger.data, thumbnail
)
thumbnail_missing = True
except DoesNotExist:
logger.warning(
f"Event ID {trigger.data} for trigger {trigger_name} does not exist."
)
continue
# Update existing trigger if data has changed
if (
existing_trigger.type != trigger.type
or existing_trigger.data != trigger.data
or existing_trigger.threshold != trigger.threshold
):
existing_trigger.type = trigger.type
existing_trigger.data = trigger.data
existing_trigger.threshold = trigger.threshold
needs_embedding_update = True
# Check if embedding is missing or needs update
if (
not existing_trigger.embedding
or needs_embedding_update
or thumbnail_missing
):
existing_trigger.embedding = self._calculate_trigger_embedding(
trigger
)
needs_embedding_update = True
if needs_embedding_update:
existing_trigger.save()
else:
# Create new trigger
try:
try:
event: Event = Event.get(Event.id == trigger.data)
except DoesNotExist:
logger.warning(
f"Event ID {trigger.data} for trigger {trigger_name} does not exist."
)
continue
# Skip the event if not an object
if event.data.get("type") != "object":
logger.warning(
f"Event ID {trigger.data} for trigger {trigger_name} is not a tracked object."
)
continue
thumbnail = get_event_thumbnail_bytes(event)
if not thumbnail:
logger.warning(
f"Unable to retrieve thumbnail for event ID {trigger.data} for {trigger_name}."
)
continue
self.write_trigger_thumbnail(
camera.name, trigger.data, thumbnail
)
# Calculate embedding for new trigger
embedding = self._calculate_trigger_embedding(trigger)
Trigger.create(
camera=camera.name,
name=trigger_name,
type=trigger.type,
data=trigger.data,
threshold=trigger.threshold,
model=self.config.semantic_search.model,
embedding=embedding,
triggering_event_id="",
last_triggered=None,
)
except IntegrityError:
pass # Handle duplicate creation attempts
# Remove triggers that are no longer in config
triggers_to_remove = (
set(existing_triggers.keys()) - configured_trigger_names
)
if triggers_to_remove:
Trigger.delete().where(
Trigger.camera == camera.name, Trigger.name.in_(triggers_to_remove)
).execute()
for trigger_name in triggers_to_remove:
self.remove_trigger_thumbnail(camera.name, trigger_name)
def write_trigger_thumbnail(
self, camera: str, event_id: str, thumbnail: bytes
) -> None:
"""Write the thumbnail to the trigger directory."""
try:
os.makedirs(os.path.join(TRIGGER_DIR, camera), exist_ok=True)
with open(os.path.join(TRIGGER_DIR, camera, f"{event_id}.webp"), "wb") as f:
f.write(thumbnail)
logger.debug(
f"Writing thumbnail for trigger with data {event_id} in {camera}."
)
except Exception as e:
logger.error(
f"Failed to write thumbnail for trigger with data {event_id} in {camera}: {e}"
)
def remove_trigger_thumbnail(self, camera: str, event_id: str) -> None:
"""Write the thumbnail to the trigger directory."""
try:
os.remove(os.path.join(TRIGGER_DIR, camera, f"{event_id}.webp"))
logger.debug(
f"Deleted thumbnail for trigger with data {event_id} in {camera}."
)
except Exception as e:
logger.error(
f"Failed to delete thumbnail for trigger with data {event_id} in {camera}: {e}"
)
def _calculate_trigger_embedding(self, trigger) -> bytes:
"""Calculate embedding for a trigger based on its type and data."""
if trigger.type == "description":
logger.debug(f"Generating embedding for trigger description {trigger.name}")
embedding = self.requestor.send_data(
EmbeddingsRequestEnum.embed_description.value,
{"id": None, "description": trigger.data, "upsert": False},
)
return embedding.astype(np.float32).tobytes()
elif trigger.type == "thumbnail":
# For image triggers, trigger.data should be an image ID
# Try to get embedding from vec_thumbnails table first
cursor = self.db.execute_sql(
"SELECT thumbnail_embedding FROM vec_thumbnails WHERE id = ?",
[trigger.data],
)
row = cursor.fetchone() if cursor else None
if row:
return row[0] # Already in bytes format
else:
logger.debug(
f"No thumbnail embedding found for image ID: {trigger.data}, generating from saved trigger thumbnail"
)
try:
with open(
os.path.join(
TRIGGER_DIR, trigger.camera, f"{trigger.data}.webp"
),
"rb",
) as f:
thumbnail = f.read()
except Exception as e:
logger.error(
f"Failed to read thumbnail for trigger {trigger.name} with ID {trigger.data}: {e}"
)
return b""
logger.debug(
f"Generating embedding for trigger thumbnail {trigger.name} with ID {trigger.data}"
)
embedding = self.requestor.send_data(
EmbeddingsRequestEnum.embed_thumbnail.value,
{
"id": str(trigger.data),
"thumbnail": str(thumbnail),
"upsert": False,
},
)
return embedding.astype(np.float32).tobytes()
else:
logger.warning(f"Unknown trigger type: {trigger.type}")
return b""