mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-07-30 13:48:07 +02:00
* semantic trigger test * database and model * config * embeddings maintainer and trigger post-processor * api to create, edit, delete triggers * frontend and i18n keys * use thumbnail and description for trigger types * image picker tweaks * initial sync * thumbnail file management * clean up logs and use saved thumbnail on frontend * publish mqtt messages * webpush changes to enable trigger notifications * add enabled switch * add triggers from explore * renaming and deletion fixes * fix typing * UI updates and add last triggering event time and link * log exception instead of return in endpoint * highlight entry in UI when triggered * save and delete thumbnails directly * remove alert action for now and add descriptions * tweaks * clean up * fix types * docs * docs tweaks * docs * reuse enum
630 lines
23 KiB
Python
630 lines
23 KiB
Python
"""SQLite-vec embeddings database."""
|
|
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
|
|
import numpy as np
|
|
from peewee import DoesNotExist, IntegrityError
|
|
from playhouse.shortcuts import model_to_dict
|
|
|
|
from frigate.comms.embeddings_updater import (
|
|
EmbeddingsRequestEnum,
|
|
)
|
|
from frigate.comms.inter_process import InterProcessRequestor
|
|
from frigate.config import FrigateConfig
|
|
from frigate.config.classification import SemanticSearchModelEnum
|
|
from frigate.const import (
|
|
CONFIG_DIR,
|
|
TRIGGER_DIR,
|
|
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
|
|
UPDATE_MODEL_STATE,
|
|
)
|
|
from frigate.data_processing.types import DataProcessorMetrics
|
|
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
|
|
from frigate.models import Event, Trigger
|
|
from frigate.types import ModelStatusTypesEnum
|
|
from frigate.util.builtin import EventsPerSecond, InferenceSpeed, serialize
|
|
from frigate.util.path import get_event_thumbnail_bytes
|
|
|
|
from .onnx.jina_v1_embedding import JinaV1ImageEmbedding, JinaV1TextEmbedding
|
|
from .onnx.jina_v2_embedding import JinaV2Embedding
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_metadata(event: Event) -> dict:
|
|
"""Extract valid event metadata."""
|
|
event_dict = model_to_dict(event)
|
|
return (
|
|
{
|
|
k: v
|
|
for k, v in event_dict.items()
|
|
if k not in ["thumbnail"]
|
|
and v is not None
|
|
and isinstance(v, (str, int, float, bool))
|
|
}
|
|
| {
|
|
k: v
|
|
for k, v in event_dict["data"].items()
|
|
if k not in ["description"]
|
|
and v is not None
|
|
and isinstance(v, (str, int, float, bool))
|
|
}
|
|
| {
|
|
# Metadata search doesn't support $contains
|
|
# and an event can have multiple zones, so
|
|
# we need to create a key for each zone
|
|
f"{k}_{x}": True
|
|
for k, v in event_dict.items()
|
|
if isinstance(v, list) and len(v) > 0
|
|
for x in v
|
|
if isinstance(x, str)
|
|
}
|
|
)
|
|
|
|
|
|
class Embeddings:
|
|
"""SQLite-vec embeddings database."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
db: SqliteVecQueueDatabase,
|
|
metrics: DataProcessorMetrics,
|
|
) -> None:
|
|
self.config = config
|
|
self.db = db
|
|
self.metrics = metrics
|
|
self.requestor = InterProcessRequestor()
|
|
|
|
self.image_inference_speed = InferenceSpeed(self.metrics.image_embeddings_speed)
|
|
self.image_eps = EventsPerSecond()
|
|
self.image_eps.start()
|
|
self.text_inference_speed = InferenceSpeed(self.metrics.text_embeddings_speed)
|
|
self.text_eps = EventsPerSecond()
|
|
self.text_eps.start()
|
|
|
|
self.reindex_lock = threading.Lock()
|
|
self.reindex_thread = None
|
|
self.reindex_running = False
|
|
|
|
# Create tables if they don't exist
|
|
self.db.create_embeddings_tables()
|
|
|
|
models = self.get_model_definitions()
|
|
|
|
for model in models:
|
|
self.requestor.send_data(
|
|
UPDATE_MODEL_STATE,
|
|
{
|
|
"model": model,
|
|
"state": ModelStatusTypesEnum.not_downloaded,
|
|
},
|
|
)
|
|
|
|
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2:
|
|
# Single JinaV2Embedding instance for both text and vision
|
|
self.embedding = JinaV2Embedding(
|
|
model_size=self.config.semantic_search.model_size,
|
|
requestor=self.requestor,
|
|
device="GPU"
|
|
if self.config.semantic_search.model_size == "large"
|
|
else "CPU",
|
|
)
|
|
self.text_embedding = lambda input_data: self.embedding(
|
|
input_data, embedding_type="text"
|
|
)
|
|
self.vision_embedding = lambda input_data: self.embedding(
|
|
input_data, embedding_type="vision"
|
|
)
|
|
else: # Default to jinav1
|
|
self.text_embedding = JinaV1TextEmbedding(
|
|
model_size=config.semantic_search.model_size,
|
|
requestor=self.requestor,
|
|
device="CPU",
|
|
)
|
|
self.vision_embedding = JinaV1ImageEmbedding(
|
|
model_size=config.semantic_search.model_size,
|
|
requestor=self.requestor,
|
|
device="GPU" if config.semantic_search.model_size == "large" else "CPU",
|
|
)
|
|
|
|
def update_stats(self) -> None:
|
|
self.metrics.image_embeddings_eps.value = self.image_eps.eps()
|
|
self.metrics.text_embeddings_eps.value = self.text_eps.eps()
|
|
|
|
def get_model_definitions(self):
|
|
# Version-specific models
|
|
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2:
|
|
models = [
|
|
"jinaai/jina-clip-v2-tokenizer",
|
|
"jinaai/jina-clip-v2-model_fp16.onnx"
|
|
if self.config.semantic_search.model_size == "large"
|
|
else "jinaai/jina-clip-v2-model_quantized.onnx",
|
|
"jinaai/jina-clip-v2-preprocessor_config.json",
|
|
]
|
|
else: # Default to jinav1
|
|
models = [
|
|
"jinaai/jina-clip-v1-text_model_fp16.onnx",
|
|
"jinaai/jina-clip-v1-tokenizer",
|
|
"jinaai/jina-clip-v1-vision_model_fp16.onnx"
|
|
if self.config.semantic_search.model_size == "large"
|
|
else "jinaai/jina-clip-v1-vision_model_quantized.onnx",
|
|
"jinaai/jina-clip-v1-preprocessor_config.json",
|
|
]
|
|
|
|
# Add common models
|
|
models.extend(
|
|
[
|
|
"facenet-facenet.onnx",
|
|
"paddleocr-onnx-detection.onnx",
|
|
"paddleocr-onnx-classification.onnx",
|
|
"paddleocr-onnx-recognition.onnx",
|
|
]
|
|
)
|
|
|
|
return models
|
|
|
|
def embed_thumbnail(
|
|
self, event_id: str, thumbnail: bytes, upsert: bool = True
|
|
) -> np.ndarray:
|
|
"""Embed thumbnail and optionally insert into DB.
|
|
|
|
@param: event_id in Events DB
|
|
@param: thumbnail bytes in jpg format
|
|
@param: upsert If embedding should be upserted into vec DB
|
|
"""
|
|
start = datetime.datetime.now().timestamp()
|
|
# Convert thumbnail bytes to PIL Image
|
|
embedding = self.vision_embedding([thumbnail])[0]
|
|
|
|
if upsert:
|
|
self.db.execute_sql(
|
|
"""
|
|
INSERT OR REPLACE INTO vec_thumbnails(id, thumbnail_embedding)
|
|
VALUES(?, ?)
|
|
""",
|
|
(event_id, serialize(embedding)),
|
|
)
|
|
|
|
self.image_inference_speed.update(datetime.datetime.now().timestamp() - start)
|
|
self.image_eps.update()
|
|
|
|
return embedding
|
|
|
|
def batch_embed_thumbnail(
|
|
self, event_thumbs: dict[str, bytes], upsert: bool = True
|
|
) -> list[np.ndarray]:
|
|
"""Embed thumbnails and optionally insert into DB.
|
|
|
|
@param: event_thumbs Map of Event IDs in DB to thumbnail bytes in jpg format
|
|
@param: upsert If embedding should be upserted into vec DB
|
|
"""
|
|
start = datetime.datetime.now().timestamp()
|
|
ids = list(event_thumbs.keys())
|
|
embeddings = self.vision_embedding(list(event_thumbs.values()))
|
|
|
|
if upsert:
|
|
items = []
|
|
|
|
for i in range(len(ids)):
|
|
items.append(ids[i])
|
|
items.append(serialize(embeddings[i]))
|
|
self.image_eps.update()
|
|
|
|
self.db.execute_sql(
|
|
"""
|
|
INSERT OR REPLACE INTO vec_thumbnails(id, thumbnail_embedding)
|
|
VALUES {}
|
|
""".format(", ".join(["(?, ?)"] * len(ids))),
|
|
items,
|
|
)
|
|
|
|
duration = datetime.datetime.now().timestamp() - start
|
|
self.text_inference_speed.update(duration / len(ids))
|
|
|
|
return embeddings
|
|
|
|
def embed_description(
|
|
self, event_id: str, description: str, upsert: bool = True
|
|
) -> np.ndarray:
|
|
start = datetime.datetime.now().timestamp()
|
|
embedding = self.text_embedding([description])[0]
|
|
|
|
if upsert:
|
|
self.db.execute_sql(
|
|
"""
|
|
INSERT OR REPLACE INTO vec_descriptions(id, description_embedding)
|
|
VALUES(?, ?)
|
|
""",
|
|
(event_id, serialize(embedding)),
|
|
)
|
|
|
|
self.text_inference_speed.update(datetime.datetime.now().timestamp() - start)
|
|
self.text_eps.update()
|
|
|
|
return embedding
|
|
|
|
def batch_embed_description(
|
|
self, event_descriptions: dict[str, str], upsert: bool = True
|
|
) -> np.ndarray:
|
|
start = datetime.datetime.now().timestamp()
|
|
# upsert embeddings one by one to avoid token limit
|
|
embeddings = []
|
|
|
|
for desc in event_descriptions.values():
|
|
embeddings.append(self.text_embedding([desc])[0])
|
|
|
|
if upsert:
|
|
ids = list(event_descriptions.keys())
|
|
items = []
|
|
|
|
for i in range(len(ids)):
|
|
items.append(ids[i])
|
|
items.append(serialize(embeddings[i]))
|
|
self.text_eps.update()
|
|
|
|
self.db.execute_sql(
|
|
"""
|
|
INSERT OR REPLACE INTO vec_descriptions(id, description_embedding)
|
|
VALUES {}
|
|
""".format(", ".join(["(?, ?)"] * len(ids))),
|
|
items,
|
|
)
|
|
|
|
self.text_inference_speed.update(datetime.datetime.now().timestamp() - start)
|
|
|
|
return embeddings
|
|
|
|
def reindex(self) -> None:
|
|
logger.info("Indexing tracked object embeddings...")
|
|
|
|
self.db.drop_embeddings_tables()
|
|
logger.debug("Dropped embeddings tables.")
|
|
self.db.create_embeddings_tables()
|
|
logger.debug("Created embeddings tables.")
|
|
|
|
# Delete the saved stats file
|
|
if os.path.exists(os.path.join(CONFIG_DIR, ".search_stats.json")):
|
|
os.remove(os.path.join(CONFIG_DIR, ".search_stats.json"))
|
|
|
|
st = time.time()
|
|
|
|
# Get total count of events to process
|
|
total_events = Event.select().count()
|
|
|
|
batch_size = (
|
|
4
|
|
if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2
|
|
else 32
|
|
)
|
|
current_page = 1
|
|
|
|
totals = {
|
|
"thumbnails": 0,
|
|
"descriptions": 0,
|
|
"processed_objects": total_events - 1 if total_events < batch_size else 0,
|
|
"total_objects": total_events,
|
|
"time_remaining": 0 if total_events < batch_size else -1,
|
|
"status": "indexing",
|
|
}
|
|
|
|
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
|
|
|
|
events = (
|
|
Event.select()
|
|
.order_by(Event.start_time.desc())
|
|
.paginate(current_page, batch_size)
|
|
)
|
|
|
|
while len(events) > 0:
|
|
event: Event
|
|
batch_thumbs = {}
|
|
batch_descs = {}
|
|
for event in events:
|
|
thumbnail = get_event_thumbnail_bytes(event)
|
|
|
|
if thumbnail is None:
|
|
continue
|
|
|
|
batch_thumbs[event.id] = thumbnail
|
|
totals["thumbnails"] += 1
|
|
|
|
if description := event.data.get("description", "").strip():
|
|
batch_descs[event.id] = description
|
|
totals["descriptions"] += 1
|
|
|
|
totals["processed_objects"] += 1
|
|
|
|
# run batch embedding
|
|
self.batch_embed_thumbnail(batch_thumbs)
|
|
|
|
if batch_descs:
|
|
self.batch_embed_description(batch_descs)
|
|
|
|
# report progress every batch so we don't spam the logs
|
|
progress = (totals["processed_objects"] / total_events) * 100
|
|
logger.debug(
|
|
"Processed %d/%d events (%.2f%% complete) | Thumbnails: %d, Descriptions: %d",
|
|
totals["processed_objects"],
|
|
total_events,
|
|
progress,
|
|
totals["thumbnails"],
|
|
totals["descriptions"],
|
|
)
|
|
|
|
# Calculate time remaining
|
|
elapsed_time = time.time() - st
|
|
avg_time_per_event = elapsed_time / totals["processed_objects"]
|
|
remaining_events = total_events - totals["processed_objects"]
|
|
time_remaining = avg_time_per_event * remaining_events
|
|
totals["time_remaining"] = int(time_remaining)
|
|
|
|
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
|
|
|
|
# Move to the next page
|
|
current_page += 1
|
|
events = (
|
|
Event.select()
|
|
.order_by(Event.start_time.desc())
|
|
.paginate(current_page, batch_size)
|
|
)
|
|
|
|
logger.info(
|
|
"Embedded %d thumbnails and %d descriptions in %s seconds",
|
|
totals["thumbnails"],
|
|
totals["descriptions"],
|
|
round(time.time() - st, 1),
|
|
)
|
|
totals["status"] = "completed"
|
|
|
|
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
|
|
|
|
def start_reindex(self) -> bool:
|
|
"""Start reindexing in a separate thread if not already running."""
|
|
with self.reindex_lock:
|
|
if self.reindex_running:
|
|
logger.warning("Reindex embeddings is already running.")
|
|
return False
|
|
|
|
# Mark as running and start the thread
|
|
self.reindex_running = True
|
|
self.reindex_thread = threading.Thread(
|
|
target=self._reindex_wrapper, daemon=True
|
|
)
|
|
self.reindex_thread.start()
|
|
return True
|
|
|
|
def _reindex_wrapper(self) -> None:
|
|
"""Wrapper to run reindex and reset running flag when done."""
|
|
try:
|
|
self.reindex()
|
|
finally:
|
|
with self.reindex_lock:
|
|
self.reindex_running = False
|
|
self.reindex_thread = None
|
|
|
|
def sync_triggers(self) -> None:
|
|
for camera in self.config.cameras.values():
|
|
# Get all existing triggers for this camera
|
|
existing_triggers = {
|
|
trigger.name: trigger
|
|
for trigger in Trigger.select().where(Trigger.camera == camera.name)
|
|
}
|
|
|
|
# Get all configured trigger names
|
|
configured_trigger_names = set(camera.semantic_search.triggers or {})
|
|
|
|
# Create or update triggers from config
|
|
for trigger_name, trigger in (
|
|
camera.semantic_search.triggers or {}
|
|
).items():
|
|
if trigger_name in existing_triggers:
|
|
existing_trigger = existing_triggers[trigger_name]
|
|
needs_embedding_update = False
|
|
thumbnail_missing = False
|
|
|
|
# Check if data has changed or thumbnail is missing for thumbnail type
|
|
if trigger.type == "thumbnail":
|
|
thumbnail_path = os.path.join(
|
|
TRIGGER_DIR, camera.name, f"{trigger.data}.webp"
|
|
)
|
|
try:
|
|
event = Event.get(Event.id == trigger.data)
|
|
if event.data.get("type") != "object":
|
|
logger.warning(
|
|
f"Event {trigger.data} is not a tracked object for {trigger.type} trigger"
|
|
)
|
|
continue # Skip if not an object
|
|
|
|
# Check if thumbnail needs to be updated (data changed or missing)
|
|
if (
|
|
existing_trigger.data != trigger.data
|
|
or not os.path.exists(thumbnail_path)
|
|
):
|
|
thumbnail = get_event_thumbnail_bytes(event)
|
|
if not thumbnail:
|
|
logger.warning(
|
|
f"Unable to retrieve thumbnail for event ID {trigger.data} for {trigger_name}."
|
|
)
|
|
continue
|
|
self.write_trigger_thumbnail(
|
|
camera.name, trigger.data, thumbnail
|
|
)
|
|
thumbnail_missing = True
|
|
except DoesNotExist:
|
|
logger.warning(
|
|
f"Event ID {trigger.data} for trigger {trigger_name} does not exist."
|
|
)
|
|
continue
|
|
|
|
# Update existing trigger if data has changed
|
|
if (
|
|
existing_trigger.type != trigger.type
|
|
or existing_trigger.data != trigger.data
|
|
or existing_trigger.threshold != trigger.threshold
|
|
):
|
|
existing_trigger.type = trigger.type
|
|
existing_trigger.data = trigger.data
|
|
existing_trigger.threshold = trigger.threshold
|
|
needs_embedding_update = True
|
|
|
|
# Check if embedding is missing or needs update
|
|
if (
|
|
not existing_trigger.embedding
|
|
or needs_embedding_update
|
|
or thumbnail_missing
|
|
):
|
|
existing_trigger.embedding = self._calculate_trigger_embedding(
|
|
trigger
|
|
)
|
|
needs_embedding_update = True
|
|
|
|
if needs_embedding_update:
|
|
existing_trigger.save()
|
|
else:
|
|
# Create new trigger
|
|
try:
|
|
try:
|
|
event: Event = Event.get(Event.id == trigger.data)
|
|
except DoesNotExist:
|
|
logger.warning(
|
|
f"Event ID {trigger.data} for trigger {trigger_name} does not exist."
|
|
)
|
|
continue
|
|
|
|
# Skip the event if not an object
|
|
if event.data.get("type") != "object":
|
|
logger.warning(
|
|
f"Event ID {trigger.data} for trigger {trigger_name} is not a tracked object."
|
|
)
|
|
continue
|
|
|
|
thumbnail = get_event_thumbnail_bytes(event)
|
|
|
|
if not thumbnail:
|
|
logger.warning(
|
|
f"Unable to retrieve thumbnail for event ID {trigger.data} for {trigger_name}."
|
|
)
|
|
continue
|
|
|
|
self.write_trigger_thumbnail(
|
|
camera.name, trigger.data, thumbnail
|
|
)
|
|
|
|
# Calculate embedding for new trigger
|
|
embedding = self._calculate_trigger_embedding(trigger)
|
|
|
|
Trigger.create(
|
|
camera=camera.name,
|
|
name=trigger_name,
|
|
type=trigger.type,
|
|
data=trigger.data,
|
|
threshold=trigger.threshold,
|
|
model=self.config.semantic_search.model,
|
|
embedding=embedding,
|
|
triggering_event_id="",
|
|
last_triggered=None,
|
|
)
|
|
|
|
except IntegrityError:
|
|
pass # Handle duplicate creation attempts
|
|
|
|
# Remove triggers that are no longer in config
|
|
triggers_to_remove = (
|
|
set(existing_triggers.keys()) - configured_trigger_names
|
|
)
|
|
if triggers_to_remove:
|
|
Trigger.delete().where(
|
|
Trigger.camera == camera.name, Trigger.name.in_(triggers_to_remove)
|
|
).execute()
|
|
for trigger_name in triggers_to_remove:
|
|
self.remove_trigger_thumbnail(camera.name, trigger_name)
|
|
|
|
def write_trigger_thumbnail(
|
|
self, camera: str, event_id: str, thumbnail: bytes
|
|
) -> None:
|
|
"""Write the thumbnail to the trigger directory."""
|
|
try:
|
|
os.makedirs(os.path.join(TRIGGER_DIR, camera), exist_ok=True)
|
|
with open(os.path.join(TRIGGER_DIR, camera, f"{event_id}.webp"), "wb") as f:
|
|
f.write(thumbnail)
|
|
logger.debug(
|
|
f"Writing thumbnail for trigger with data {event_id} in {camera}."
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to write thumbnail for trigger with data {event_id} in {camera}: {e}"
|
|
)
|
|
|
|
def remove_trigger_thumbnail(self, camera: str, event_id: str) -> None:
|
|
"""Write the thumbnail to the trigger directory."""
|
|
try:
|
|
os.remove(os.path.join(TRIGGER_DIR, camera, f"{event_id}.webp"))
|
|
logger.debug(
|
|
f"Deleted thumbnail for trigger with data {event_id} in {camera}."
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to delete thumbnail for trigger with data {event_id} in {camera}: {e}"
|
|
)
|
|
|
|
def _calculate_trigger_embedding(self, trigger) -> bytes:
|
|
"""Calculate embedding for a trigger based on its type and data."""
|
|
if trigger.type == "description":
|
|
logger.debug(f"Generating embedding for trigger description {trigger.name}")
|
|
embedding = self.requestor.send_data(
|
|
EmbeddingsRequestEnum.embed_description.value,
|
|
{"id": None, "description": trigger.data, "upsert": False},
|
|
)
|
|
return embedding.astype(np.float32).tobytes()
|
|
|
|
elif trigger.type == "thumbnail":
|
|
# For image triggers, trigger.data should be an image ID
|
|
# Try to get embedding from vec_thumbnails table first
|
|
cursor = self.db.execute_sql(
|
|
"SELECT thumbnail_embedding FROM vec_thumbnails WHERE id = ?",
|
|
[trigger.data],
|
|
)
|
|
row = cursor.fetchone() if cursor else None
|
|
if row:
|
|
return row[0] # Already in bytes format
|
|
else:
|
|
logger.debug(
|
|
f"No thumbnail embedding found for image ID: {trigger.data}, generating from saved trigger thumbnail"
|
|
)
|
|
|
|
try:
|
|
with open(
|
|
os.path.join(
|
|
TRIGGER_DIR, trigger.camera, f"{trigger.data}.webp"
|
|
),
|
|
"rb",
|
|
) as f:
|
|
thumbnail = f.read()
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to read thumbnail for trigger {trigger.name} with ID {trigger.data}: {e}"
|
|
)
|
|
return b""
|
|
|
|
logger.debug(
|
|
f"Generating embedding for trigger thumbnail {trigger.name} with ID {trigger.data}"
|
|
)
|
|
embedding = self.requestor.send_data(
|
|
EmbeddingsRequestEnum.embed_thumbnail.value,
|
|
{
|
|
"id": str(trigger.data),
|
|
"thumbnail": str(thumbnail),
|
|
"upsert": False,
|
|
},
|
|
)
|
|
return embedding.astype(np.float32).tobytes()
|
|
|
|
else:
|
|
logger.warning(f"Unknown trigger type: {trigger.type}")
|
|
return b""
|