blakeblackshear.frigate/frigate/data_processing/post/review_descriptions.py
Nicolas Mowen 2cf8dd693c Review Item GenAI metadata (#19442)
* Rename existing function

* Keep track of thumbnial updates

* Tinkering with genai prompt

* Adjust input format

* Create model for review description output

* testing prompt changes

* Prompt improvements and image saving

* Add config for review items genai

* Use genai review config

* Actual config usage

* Adjust debug image saving

* Fix

* Fix review creation

* Adjust prompt

* Prompt adjustment

* Run genai in thread

* Fix detections block

* Adjust prompt

* Prompt changes

* Save genai response to metadata model

* Handle metadata

* Send review update to dispatcher

* Save review metadata to DB

* Send review notification updates

* Quick fix

* Fix name

* Fix update type

* Correctly dump model

* Add card

* Add card

* Remove message

* Cleanup typing and UI

* Adjust prompt

* Formatting

* Add log

* Formatting

* Add inference speed and keep alive
2025-08-16 10:20:33 -05:00

163 lines
5.2 KiB
Python

"""Post processor for review items to get descriptions."""
import copy
import datetime
import logging
import os
import shutil
import threading
from pathlib import Path
import cv2
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR, UPDATE_REVIEW_DESCRIPTION
from frigate.data_processing.types import PostProcessDataEnum
from frigate.genai import GenAIClient
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
from ..post.api import PostProcessorApi
from ..types import DataProcessorMetrics
logger = logging.getLogger(__name__)
class ReviewDescriptionProcessor(PostProcessorApi):
def __init__(
self,
config: FrigateConfig,
requestor: InterProcessRequestor,
metrics: DataProcessorMetrics,
client: GenAIClient,
):
super().__init__(config, metrics, None)
self.requestor = requestor
self.metrics = metrics
self.tracked_review_items: dict[str, list[tuple[int, bytes]]] = {}
self.genai_client = client
self.review_desc_speed = InferenceSpeed(self.metrics.review_desc_speed)
self.review_descs_dps = EventsPerSecond()
self.review_descs_dps.start()
def process_data(self, data, data_type):
self.metrics.review_desc_dps.value = self.review_descs_dps.eps()
if data_type != PostProcessDataEnum.review:
return
id = data["after"]["id"]
if data["type"] == "new" or data["type"] == "update":
if id not in self.tracked_review_items:
self.tracked_review_items[id] = []
thumb_time = data["after"]["data"]["thumb_time"]
thumb_path = data["after"]["thumb_path"]
if thumb_time and thumb_path:
if (
len(self.tracked_review_items[id]) > 0
and self.tracked_review_items[id][0] == thumb_time
):
# we have already processed this thumbnail
return
thumb_data = cv2.imread(thumb_path)
ret, jpg = cv2.imencode(
".jpg", thumb_data, [int(cv2.IMWRITE_JPEG_QUALITY), 100]
)
if ret:
self.tracked_review_items[id].append((thumb_time, jpg.tobytes()))
if self.config.cameras[
data["after"]["camera"]
].review.genai.debug_save_thumbnails:
id = data["after"]["id"]
Path(os.path.join(CLIPS_DIR, f"genai-requests/{id}")).mkdir(
parents=True, exist_ok=True
)
shutil.copy(
thumb_path,
os.path.join(
CLIPS_DIR,
f"genai-requests/{id}/{thumb_time}.webp",
),
)
else:
if id not in self.tracked_review_items:
return
final_data = data["after"]
camera = final_data["camera"]
if (
final_data["severity"] == "alert"
and not self.config.cameras[camera].review.genai.alerts
):
self.tracked_review_items.pop(id)
return
elif (
final_data["severity"] == "detection"
and not self.config.cameras[camera].review.genai.detections
):
self.tracked_review_items.pop(id)
return
# kickoff analysis
self.review_descs_dps.update()
threading.Thread(
target=run_analysis,
args=(
self.requestor,
self.genai_client,
self.review_desc_speed,
camera,
final_data,
copy.copy([r[1] for r in self.tracked_review_items[id]]),
),
).start()
self.tracked_review_items.pop(id)
def handle_request(self, request_data):
pass
@staticmethod
def run_analysis(
requestor: InterProcessRequestor,
genai_client: GenAIClient,
review_inference_speed: InferenceSpeed,
camera: str,
final_data: dict[str, str],
thumbs: list[bytes],
) -> None:
start = datetime.datetime.now().timestamp()
metadata = genai_client.generate_review_description(
{
"camera": camera,
"objects": final_data["data"]["objects"],
"recognized_objects": final_data["data"]["sub_labels"],
"zones": final_data["data"]["zones"],
"timestamp": datetime.datetime.fromtimestamp(final_data["end_time"]),
},
thumbs,
)
review_inference_speed.update(datetime.datetime.now().timestamp() - start)
if not metadata:
return None
prev_data = copy.deepcopy(final_data)
final_data["data"]["metadata"] = metadata.model_dump()
requestor.send_data(
UPDATE_REVIEW_DESCRIPTION,
{
"type": "genai",
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in final_data.items()},
},
)