Review Item GenAI metadata (#19442)

* Rename existing function

* Keep track of thumbnial updates

* Tinkering with genai prompt

* Adjust input format

* Create model for review description output

* testing prompt changes

* Prompt improvements and image saving

* Add config for review items genai

* Use genai review config

* Actual config usage

* Adjust debug image saving

* Fix

* Fix review creation

* Adjust prompt

* Prompt adjustment

* Run genai in thread

* Fix detections block

* Adjust prompt

* Prompt changes

* Save genai response to metadata model

* Handle metadata

* Send review update to dispatcher

* Save review metadata to DB

* Send review notification updates

* Quick fix

* Fix name

* Fix update type

* Correctly dump model

* Add card

* Add card

* Remove message

* Cleanup typing and UI

* Adjust prompt

* Formatting

* Add log

* Formatting

* Add inference speed and keep alive
This commit is contained in:
Nicolas Mowen 2025-08-10 05:57:54 -06:00 committed by GitHub
parent 52295fcac4
commit f8ca91643e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 331 additions and 12 deletions

View File

@ -26,6 +26,7 @@ from frigate.const import (
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_EVENT_DESCRIPTION,
UPDATE_MODEL_STATE,
UPDATE_REVIEW_DESCRIPTION,
UPSERT_REVIEW_SEGMENT,
)
from frigate.models import Event, Previews, Recordings, ReviewSegment
@ -149,6 +150,14 @@ class Dispatcher:
),
)
def handle_update_review_description() -> None:
final_data = payload["after"]
ReviewSegment.insert(final_data).on_conflict(
conflict_target=[ReviewSegment.id],
update=final_data,
).execute()
self.publish("reviews", json.dumps(payload))
def handle_update_model_state() -> None:
if payload:
model = payload["model"]
@ -232,6 +241,7 @@ class Dispatcher:
CLEAR_ONGOING_REVIEW_SEGMENTS: handle_clear_ongoing_review_segments,
UPDATE_CAMERA_ACTIVITY: handle_update_camera_activity,
UPDATE_EVENT_DESCRIPTION: handle_update_event_description,
UPDATE_REVIEW_DESCRIPTION: handle_update_review_description,
UPDATE_MODEL_STATE: handle_update_model_state,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS: handle_update_embeddings_reindex_progress,
UPDATE_BIRDSEYE_LAYOUT: handle_update_birdseye_layout,

View File

@ -369,12 +369,22 @@ class WebPushClient(Communicator):
sorted_objects.update(payload["after"]["data"]["sub_labels"])
title = f"{titlecase(', '.join(sorted_objects).replace('_', ' '))}{' was' if state == 'end' else ''} detected in {titlecase(', '.join(payload['after']['data']['zones']).replace('_', ' '))}"
message = f"Detected on {titlecase(camera.replace('_', ' '))}"
image = f"{payload['after']['thumb_path'].replace('/media/frigate', '')}"
ended = state == "end" or state == "genai"
if state == "genai" and payload["after"]["data"]["metadata"]:
message = payload["after"]["data"]["metadata"]["scene"]
else:
message = f"Detected on {titlecase(camera.replace('_', ' '))}"
if ended:
logger.debug(
f"Sending a notification with state {state} and message {message}"
)
# if event is ongoing open to live view otherwise open to recordings view
direct_url = f"/review?id={reviewId}" if state == "end" else f"/#{camera}"
ttl = 3600 if state == "end" else 0
direct_url = f"/review?id={reviewId}" if ended else f"/#{camera}"
ttl = 3600 if ended else 0
logger.debug(f"Sending push notification for {camera}, review ID {reviewId}")

View File

@ -62,6 +62,18 @@ class DetectionsConfig(FrigateBaseModel):
return v
class GenAIReviewConfig(FrigateBaseModel):
alerts: bool = Field(default=False, title="Enable GenAI for alerts.")
detections: bool = Field(default=False, title="Enable GenAI for detections.")
debug_save_thumbnails: bool = Field(
default=False,
title="Save thumbnails sent to generative AI for debugging purposes.",
)
enabled_in_config: Optional[bool] = Field(
default=None, title="Keep track of original state of generative AI."
)
class ReviewConfig(FrigateBaseModel):
"""Configure reviews"""
@ -71,3 +83,6 @@ class ReviewConfig(FrigateBaseModel):
detections: DetectionsConfig = Field(
default_factory=DetectionsConfig, title="Review detections config."
)
genai: GenAIReviewConfig = Field(
default_factory=GenAIReviewConfig, title="Review description genai config."
)

View File

@ -610,6 +610,10 @@ class FrigateConfig(FrigateBaseModel):
camera_config.objects.genai.enabled_in_config = (
camera_config.objects.genai.enabled
)
camera_config.review.genai.enabled_in_config = (
camera_config.review.genai.alerts
or camera_config.review.genai.detections
)
# Add default filters
object_keys = camera_config.objects.track

View File

@ -111,6 +111,7 @@ UPSERT_REVIEW_SEGMENT = "upsert_review_segment"
CLEAR_ONGOING_REVIEW_SEGMENTS = "clear_ongoing_review_segments"
UPDATE_CAMERA_ACTIVITY = "update_camera_activity"
UPDATE_EVENT_DESCRIPTION = "update_event_description"
UPDATE_REVIEW_DESCRIPTION = "update_review_description"
UPDATE_MODEL_STATE = "update_model_state"
UPDATE_EMBEDDINGS_REINDEX_PROGRESS = "handle_embeddings_reindex_progress"
UPDATE_BIRDSEYE_LAYOUT = "update_birdseye_layout"

View File

@ -1,25 +1,162 @@
"""Post processor for review items to get descriptions."""
import copy
import datetime
import logging
from typing import Any
import os
import shutil
import threading
from pathlib import Path
import cv2
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR, UPDATE_REVIEW_DESCRIPTION
from frigate.data_processing.types import PostProcessDataEnum
from frigate.genai import GenAIClient
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
from ..post.api import PostProcessorApi
from ..types import DataProcessorMetrics
logger = logging.getLogger(__name__)
class ReviewDescriptionProcessor(PostProcessorApi):
def __init__(self, config, metrics):
def __init__(
self,
config: FrigateConfig,
requestor: InterProcessRequestor,
metrics: DataProcessorMetrics,
client: GenAIClient,
):
super().__init__(config, metrics, None)
self.tracked_review_items: dict[str, list[Any]] = {}
self.requestor = requestor
self.metrics = metrics
self.tracked_review_items: dict[str, list[tuple[int, bytes]]] = {}
self.genai_client = client
self.review_desc_speed = InferenceSpeed(self.metrics.review_desc_speed)
self.review_descs_dps = EventsPerSecond()
self.review_descs_dps.start()
def process_data(self, data, data_type):
self.metrics.review_desc_dps.value = self.review_descs_dps.eps()
if data_type != PostProcessDataEnum.review:
return
logger.info(f"processor is looking at {data}")
id = data["after"]["id"]
if data["type"] == "new" or data["type"] == "update":
if id not in self.tracked_review_items:
self.tracked_review_items[id] = []
thumb_time = data["after"]["data"]["thumb_time"]
thumb_path = data["after"]["thumb_path"]
if thumb_time and thumb_path:
if (
len(self.tracked_review_items[id]) > 0
and self.tracked_review_items[id][0] == thumb_time
):
# we have already processed this thumbnail
return
thumb_data = cv2.imread(thumb_path)
ret, jpg = cv2.imencode(
".jpg", thumb_data, [int(cv2.IMWRITE_JPEG_QUALITY), 100]
)
if ret:
self.tracked_review_items[id].append((thumb_time, jpg.tobytes()))
if self.config.cameras[
data["after"]["camera"]
].review.genai.debug_save_thumbnails:
id = data["after"]["id"]
Path(os.path.join(CLIPS_DIR, f"genai-requests/{id}")).mkdir(
parents=True, exist_ok=True
)
shutil.copy(
thumb_path,
os.path.join(
CLIPS_DIR,
f"genai-requests/{id}/{thumb_time}.webp",
),
)
else:
if id not in self.tracked_review_items:
return
final_data = data["after"]
camera = final_data["camera"]
if (
final_data["severity"] == "alert"
and not self.config.cameras[camera].review.genai.alerts
):
self.tracked_review_items.pop(id)
return
elif (
final_data["severity"] == "detection"
and not self.config.cameras[camera].review.genai.detections
):
self.tracked_review_items.pop(id)
return
# kickoff analysis
self.review_descs_dps.update()
threading.Thread(
target=run_analysis,
args=(
self.requestor,
self.genai_client,
self.review_desc_speed,
camera,
final_data,
copy.copy([r[1] for r in self.tracked_review_items[id]]),
),
).start()
self.tracked_review_items.pop(id)
def handle_request(self, request_data):
pass
@staticmethod
def run_analysis(
requestor: InterProcessRequestor,
genai_client: GenAIClient,
review_inference_speed: InferenceSpeed,
camera: str,
final_data: dict[str, str],
thumbs: list[bytes],
) -> None:
start = datetime.datetime.now().timestamp()
metadata = genai_client.generate_review_description(
{
"camera": camera,
"objects": final_data["data"]["objects"],
"recognized_objects": final_data["data"]["sub_labels"],
"zones": final_data["data"]["zones"],
"timestamp": datetime.datetime.fromtimestamp(final_data["end_time"]),
},
thumbs,
)
review_inference_speed.update(datetime.datetime.now().timestamp() - start)
if not metadata:
return None
prev_data = copy.deepcopy(final_data)
final_data["data"]["metadata"] = metadata.model_dump()
requestor.send_data(
UPDATE_REVIEW_DESCRIPTION,
{
"type": "genai",
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in final_data.items()},
},
)

View File

@ -0,0 +1,16 @@
from pydantic import BaseModel, Field
class ReviewMetadata(BaseModel):
scene: str = Field(
description="A comprehensive description of the setting and entities, including relevant context and plausible inferences if supported by visual evidence."
)
confidence: float = Field(
description="A float between 0 and 1 representing your overall confidence in this analysis."
)
potential_threat_level: int | None = Field(
default=None,
ge=1,
le=3,
description="An integer representing the potential threat level (1-3). 1: Minor anomaly. 2: Moderate concern. 3: High threat. Only include this field if a clear security concern is observable; otherwise, omit it.",
)

View File

@ -20,6 +20,8 @@ class DataProcessorMetrics:
alpr_pps: Synchronized
yolov9_lpr_speed: Synchronized
yolov9_lpr_pps: Synchronized
review_desc_speed: Synchronized
review_desc_dps: Synchronized
classification_speeds: dict[str, Synchronized]
classification_cps: dict[str, Synchronized]
@ -34,6 +36,8 @@ class DataProcessorMetrics:
self.alpr_pps = manager.Value("d", 0.0)
self.yolov9_lpr_speed = manager.Value("d", 0.0)
self.yolov9_lpr_pps = manager.Value("d", 0.0)
self.review_desc_speed = manager.Value("d", 0.0)
self.review_desc_dps = manager.Value("d", 0.0)
self.classification_speeds = manager.dict()
self.classification_cps = manager.dict()

View File

@ -151,6 +151,7 @@ class EmbeddingMaintainer(threading.Thread):
self.frame_manager = SharedMemoryFrameManager()
self.detected_license_plates: dict[str, dict[str, Any]] = {}
self.genai_client = get_genai_client(config)
# model runners to share between realtime and post processors
if self.config.lpr.enabled:
@ -206,6 +207,13 @@ class EmbeddingMaintainer(threading.Thread):
# post processors
self.post_processors: list[PostProcessorApi] = []
if any(c.review.genai.enabled_in_config for c in self.config.cameras.values()):
self.post_processors.append(
ReviewDescriptionProcessor(
self.config, self.requestor, self.metrics, self.genai_client
)
)
if self.config.lpr.enabled:
self.post_processors.append(
LicensePlatePostProcessor(
@ -240,7 +248,6 @@ class EmbeddingMaintainer(threading.Thread):
self.stop_event = stop_event
self.tracked_events: dict[str, list[Any]] = {}
self.early_request_sent: dict[str, bool] = {}
self.genai_client = get_genai_client(config)
# recordings data
self.recordings_available_through: dict[str, float] = {}
@ -688,7 +695,7 @@ class EmbeddingMaintainer(threading.Thread):
"""Embed the description for an event."""
camera_config = self.config.cameras[event.camera]
description = self.genai_client.generate_description(
description = self.genai_client.generate_object_description(
camera_config, thumbnails, event
)

View File

@ -3,11 +3,13 @@
import importlib
import logging
import os
from typing import Optional
import re
from typing import Any, Optional
from playhouse.shortcuts import model_to_dict
from frigate.config import CameraConfig, FrigateConfig, GenAIConfig, GenAIProviderEnum
from frigate.data_processing.post.types import ReviewMetadata
from frigate.models import Event
logger = logging.getLogger(__name__)
@ -33,7 +35,60 @@ class GenAIClient:
self.timeout = timeout
self.provider = self._init_provider()
def generate_description(
def generate_review_description(
self, review_data: dict[str, Any], thumbnails: list[bytes]
) -> ReviewMetadata | None:
"""Generate a description for the review item activity."""
context_prompt = f"""
Please analyze the image(s), which are in chronological order, strictly from the perspective of the {review_data["camera"].replace("_", " ")} security camera.
Your task is to provide a **neutral, factual, and objective description** of the scene, while also:
- Clearly stating **what is happening** based on observable actions and movements.
- Including **reasonable, evidence-based inferences** about the likely activity or context, but only if directly supported by visible details.
When forming your description:
- **Facts first**: Describe the physical setting, people, and objects exactly as seen.
- **Then context**: Briefly note plausible purposes or activities (e.g., appears to be delivering a package if carrying a box to a door).
- Clearly separate certain facts (A person is holding a ladder) from reasonable inferences (likely performing maintenance).
- Do not speculate beyond what is visible, and do not imply hostility, criminal intent, or other strong judgments unless there is unambiguous visual evidence.
Here is information already known:
- Activity occurred at {review_data["timestamp"].strftime("%I:%M %p")}
- Detected objects: {review_data["objects"]}
- Recognized objects: {review_data["recognized_objects"]}
- Zones involved: {review_data["zones"]}
Your response **MUST** be a flat JSON object with:
- `scene` (string): A full description including setting, entities, actions, and any plausible supported inferences.
- `confidence` (float): A number 01 for overall confidence in the analysis.
- `potential_threat_level` (integer, optional): Include only if there is a clear, observable security concern:
- 1 = Unusual but not overtly threatening
- 2 = Suspicious or potentially harmful
- 3 = Clear and immediate threat
Omit this field if no concern is evident.
**IMPORTANT:**
- Values must be plain strings, floats, or integers no nested objects, no extra commentary.
"""
response = self._send(context_prompt, thumbnails)
if response:
clean_json = re.sub(
r"\n?```$", "", re.sub(r"^```[a-zA-Z0-9]*\n?", "", response)
)
try:
return ReviewMetadata.model_validate_json(clean_json)
except Exception as e:
# rarely LLMs can fail to follow directions on output format
logger.warning(
f"Failed to parse review description as the response did not match expected format. {e}"
)
return None
else:
return None
def generate_object_description(
self,
camera_config: CameraConfig,
thumbnails: list[bytes],

View File

@ -48,6 +48,7 @@ class OllamaClient(GenAIClient):
self.genai_config.model,
prompt,
images=images,
options={"keep_alive": "1h"},
)
return result["response"].strip()
except (TimeoutException, ResponseError) as e:

View File

@ -142,6 +142,7 @@ class PendingReviewSegment:
"zones": self.zones,
"audio": list(self.audio),
"thumb_time": self.thumb_time,
"metadata": None,
},
}
)

View File

@ -356,6 +356,14 @@ def stats_snapshot(
embeddings_metrics.yolov9_lpr_pps.value, 2
)
if embeddings_metrics.review_desc_dps.value > 0.0:
stats["embeddings"]["review_description_speed"] = round(
embeddings_metrics.review_desc_speed.value * 1000, 2
)
stats["embeddings"]["review_descriptions"] = round(
embeddings_metrics.review_desc_dps.value, 2
)
for key in embeddings_metrics.classification_speeds.keys():
stats["embeddings"][f"{key}_classification_speed"] = round(
embeddings_metrics.classification_speeds[key].value * 1000, 2

View File

@ -11,7 +11,11 @@ import { FrigateConfig } from "@/types/frigateConfig";
import { useFormattedTimestamp } from "@/hooks/use-date-utils";
import { getIconForLabel } from "@/utils/iconUtil";
import { useApiHost } from "@/api";
import { ReviewDetailPaneType, ReviewSegment } from "@/types/review";
import {
ReviewDetailPaneType,
ReviewSegment,
ThreatLevel,
} from "@/types/review";
import { Event } from "@/types/event";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { cn } from "@/lib/utils";
@ -69,6 +73,25 @@ export default function ReviewDetailDialog({
review ? ["event_ids", { ids: review.data.detections.join(",") }] : null,
);
const aiAnalysis = useMemo(() => review?.data?.metadata, [review]);
const aiThreatLevel = useMemo(() => {
if (!aiAnalysis?.potential_threat_level) {
return "None";
}
switch (aiAnalysis.potential_threat_level) {
case ThreatLevel.UNUSUAL:
return "Unusual Activity";
case ThreatLevel.SUSPICIOUS:
return "Suspicious Activity";
case ThreatLevel.DANGER:
return "Danger";
}
return "Unknown";
}, [aiAnalysis]);
const hasMismatch = useMemo(() => {
if (!review || !events) {
return false;
@ -232,6 +255,22 @@ export default function ReviewDetailDialog({
)}
{pane == "overview" && (
<div className="flex flex-col gap-5 md:mt-3">
{aiAnalysis != undefined && (
<div
className={cn(
"m-2 flex h-full w-full flex-col gap-2 rounded-md bg-card p-2",
isDesktop && "w-[90%]",
)}
>
AI Analysis
<div className="text-sm text-primary/40">Description</div>
<div className="text-sm">{aiAnalysis.scene}</div>
<div className="text-sm text-primary/40">Score</div>
<div className="text-sm">{aiAnalysis.confidence * 100}%</div>
<div className="text-sm text-primary/40">Threat Level</div>
<div className="text-sm">{aiThreatLevel}</div>
</div>
)}
<div className="flex w-full flex-row">
<div className="flex w-full flex-col gap-3">
<div className="flex flex-col gap-1.5">

View File

@ -18,6 +18,11 @@ export type ReviewData = {
sub_labels?: string[];
significant_motion_areas: number[];
zones: string[];
metadata?: {
scene: string;
confidence: number;
potential_threat_level?: number;
};
};
export type SegmentedReviewData =
@ -73,3 +78,9 @@ export type ConsolidatedSegmentData = {
};
export type TimelineZoomDirection = "in" | "out" | null;
export enum ThreatLevel {
UNUSUAL = 1,
SUSPICIOUS = 2,
DANGER = 3,
}