diff --git a/frigate/embeddings/embeddings.py b/frigate/embeddings/embeddings.py index 8d7bcd235..aaf5a5f2e 100644 --- a/frigate/embeddings/embeddings.py +++ b/frigate/embeddings/embeddings.py @@ -28,6 +28,7 @@ from frigate.types import ModelStatusTypesEnum from frigate.util.builtin import EventsPerSecond, InferenceSpeed, serialize from frigate.util.file import get_event_thumbnail_bytes +from .genai_embedding import GenAIEmbedding from .onnx.jina_v1_embedding import JinaV1ImageEmbedding, JinaV1TextEmbedding from .onnx.jina_v2_embedding import JinaV2Embedding @@ -73,11 +74,13 @@ class Embeddings: config: FrigateConfig, db: SqliteVecQueueDatabase, metrics: DataProcessorMetrics, + genai_manager=None, ) -> None: self.config = config self.db = db self.metrics = metrics self.requestor = InterProcessRequestor() + self.genai_manager = genai_manager self.image_inference_speed = InferenceSpeed(self.metrics.image_embeddings_speed) self.image_eps = EventsPerSecond() @@ -104,7 +107,27 @@ class Embeddings: }, ) - if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2: + model_cfg = self.config.semantic_search.model + is_genai_model = isinstance(model_cfg, str) + + if is_genai_model: + embeddings_client = ( + genai_manager.embeddings_client if genai_manager else None + ) + if not embeddings_client: + raise ValueError( + f"semantic_search.model is '{model_cfg}' (GenAI provider) but " + "no embeddings client is configured. Ensure the GenAI provider " + "has 'embeddings' in its roles." + ) + self.embedding = GenAIEmbedding(embeddings_client) + self.text_embedding = lambda input_data: self.embedding( + input_data, embedding_type="text" + ) + self.vision_embedding = lambda input_data: self.embedding( + input_data, embedding_type="vision" + ) + elif model_cfg == SemanticSearchModelEnum.jinav2: # Single JinaV2Embedding instance for both text and vision self.embedding = JinaV2Embedding( model_size=self.config.semantic_search.model_size, @@ -118,7 +141,8 @@ class Embeddings: self.vision_embedding = lambda input_data: self.embedding( input_data, embedding_type="vision" ) - else: # Default to jinav1 + else: + # Default to jinav1 self.text_embedding = JinaV1TextEmbedding( model_size=config.semantic_search.model_size, requestor=self.requestor, @@ -136,8 +160,11 @@ class Embeddings: self.metrics.text_embeddings_eps.value = self.text_eps.eps() def get_model_definitions(self): - # Version-specific models - if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2: + model_cfg = self.config.semantic_search.model + if isinstance(model_cfg, str): + # GenAI provider: no ONNX models to download + models = [] + elif model_cfg == SemanticSearchModelEnum.jinav2: models = [ "jinaai/jina-clip-v2-tokenizer", "jinaai/jina-clip-v2-model_fp16.onnx" @@ -224,6 +251,14 @@ class Embeddings: embeddings = self.vision_embedding(valid_thumbs) + if len(embeddings) != len(valid_ids): + logger.warning( + "Batch embed returned %d embeddings for %d thumbnails; skipping batch", + len(embeddings), + len(valid_ids), + ) + return [] + if upsert: items = [] for i in range(len(valid_ids)): @@ -246,9 +281,15 @@ class Embeddings: def embed_description( self, event_id: str, description: str, upsert: bool = True - ) -> np.ndarray: + ) -> np.ndarray | None: start = datetime.datetime.now().timestamp() - embedding = self.text_embedding([description])[0] + embeddings = self.text_embedding([description]) + if not embeddings: + logger.warning( + "Failed to generate description embedding for event %s", event_id + ) + return None + embedding = embeddings[0] if upsert: self.db.execute_sql( @@ -271,8 +312,32 @@ class Embeddings: # upsert embeddings one by one to avoid token limit embeddings = [] - for desc in event_descriptions.values(): - embeddings.append(self.text_embedding([desc])[0]) + for eid, desc in event_descriptions.items(): + result = self.text_embedding([desc]) + if not result: + logger.warning( + "Failed to generate description embedding for event %s", eid + ) + continue + embeddings.append(result[0]) + + if not embeddings: + logger.warning("No description embeddings generated in batch") + return np.array([]) + + # Build ids list for only successful embeddings - we need to track which succeeded + ids = list(event_descriptions.keys()) + if len(embeddings) != len(ids): + # Rebuild ids/embeddings for only successful ones (match by order) + ids = [] + embeddings_filtered = [] + for eid, desc in event_descriptions.items(): + result = self.text_embedding([desc]) + if result: + ids.append(eid) + embeddings_filtered.append(result[0]) + ids = ids + embeddings = embeddings_filtered if upsert: ids = list(event_descriptions.keys()) @@ -314,7 +379,10 @@ class Embeddings: batch_size = ( 4 - if self.config.semantic_search.model == SemanticSearchModelEnum.jinav2 + if ( + isinstance(self.config.semantic_search.model, str) + or self.config.semantic_search.model == SemanticSearchModelEnum.jinav2 + ) else 32 ) current_page = 1 @@ -601,6 +669,8 @@ class Embeddings: if trigger.type == "description": logger.debug(f"Generating embedding for trigger description {trigger_name}") embedding = self.embed_description(None, trigger.data, upsert=False) + if embedding is None: + return b"" return embedding.astype(np.float32).tobytes() elif trigger.type == "thumbnail": @@ -636,6 +706,8 @@ class Embeddings: embedding = self.embed_thumbnail( str(trigger.data), thumbnail, upsert=False ) + if embedding is None: + return b"" return embedding.astype(np.float32).tobytes() else: diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 54831942a..ad1257e3a 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -116,8 +116,10 @@ class EmbeddingMaintainer(threading.Thread): models = [Event, Recordings, ReviewSegment, Trigger] db.bind(models) + self.genai_manager = GenAIClientManager(config) + if config.semantic_search.enabled: - self.embeddings = Embeddings(config, db, metrics) + self.embeddings = Embeddings(config, db, metrics, self.genai_manager) # Check if we need to re-index events if config.semantic_search.reindex: @@ -144,7 +146,6 @@ class EmbeddingMaintainer(threading.Thread): self.frame_manager = SharedMemoryFrameManager() self.detected_license_plates: dict[str, dict[str, Any]] = {} - self.genai_manager = GenAIClientManager(config) # model runners to share between realtime and post processors if self.config.lpr.enabled: