blakeblackshear.frigate/frigate/data_processing/post/review_descriptions.py
Nicolas Mowen cc18d7f786 Add config for users to define additional concerns that GenAI should make note of in review summary (#19463)
* Don't default to openai

* Improve UI

* Allow configuring additional concerns that users may want the AI to note

* Formatting

* Add preferred language config

* Remove unused
2025-08-16 10:20:33 -05:00

196 lines
5.8 KiB
Python

"""Post processor for review items to get descriptions."""
import copy
import datetime
import logging
import os
import shutil
import threading
from pathlib import Path
import cv2
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, UPDATE_REVIEW_DESCRIPTION
from frigate.data_processing.types import PostProcessDataEnum
from frigate.genai import GenAIClient
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
from ..post.api import PostProcessorApi
from ..types import DataProcessorMetrics
logger = logging.getLogger(__name__)
class ReviewDescriptionProcessor(PostProcessorApi):
def __init__(
self,
config: FrigateConfig,
requestor: InterProcessRequestor,
metrics: DataProcessorMetrics,
client: GenAIClient,
):
super().__init__(config, metrics, None)
self.requestor = requestor
self.metrics = metrics
self.genai_client = client
self.review_desc_speed = InferenceSpeed(self.metrics.review_desc_speed)
self.review_descs_dps = EventsPerSecond()
self.review_descs_dps.start()
def process_data(self, data, data_type):
self.metrics.review_desc_dps.value = self.review_descs_dps.eps()
if data_type != PostProcessDataEnum.review:
return
camera = data["after"]["camera"]
camera_config = self.config.cameras[camera]
if not camera_config.review.genai.enabled:
return
id = data["after"]["id"]
if data["type"] == "new" or data["type"] == "update":
return
else:
final_data = data["after"]
if (
final_data["severity"] == "alert"
and not camera_config.review.genai.alerts
):
return
elif (
final_data["severity"] == "detection"
and not camera_config.review.genai.detections
):
return
frames = self.get_cache_frames(
camera, final_data["start_time"], final_data["end_time"]
)
if not frames:
frames = [final_data["thumb_path"]]
thumbs = []
for idx, thumb_path in enumerate(frames):
thumb_data = cv2.imread(thumb_path)
ret, jpg = cv2.imencode(
".jpg", thumb_data, [int(cv2.IMWRITE_JPEG_QUALITY), 100]
)
if ret:
thumbs.append(jpg.tobytes())
if camera_config.review.genai.debug_save_thumbnails:
id = data["after"]["id"]
Path(os.path.join(CLIPS_DIR, f"genai-requests/{id}")).mkdir(
parents=True, exist_ok=True
)
shutil.copy(
thumb_path,
os.path.join(
CLIPS_DIR,
f"genai-requests/{id}/{idx}.webp",
),
)
# kickoff analysis
self.review_descs_dps.update()
threading.Thread(
target=run_analysis,
args=(
self.requestor,
self.genai_client,
self.review_desc_speed,
camera,
final_data,
thumbs,
camera_config.review.genai.additional_concerns,
camera_config.review.genai.preferred_language,
),
).start()
def handle_request(self, request_data):
pass
def get_cache_frames(
self, camera: str, start_time: float, end_time: float
) -> list[str]:
preview_dir = os.path.join(CACHE_DIR, "preview_frames")
file_start = f"preview_{camera}"
start_file = f"{file_start}-{start_time}.webp"
end_file = f"{file_start}-{end_time}.webp"
all_frames = []
for file in sorted(os.listdir(preview_dir)):
if not file.startswith(file_start):
continue
if file < start_file:
continue
if file > end_file:
break
all_frames.append(os.path.join(preview_dir, file))
frame_count = len(all_frames)
if frame_count <= 10:
return all_frames
selected_frames = []
step_size = (frame_count - 1) / 9
for i in range(10):
index = round(i * step_size)
selected_frames.append(all_frames[index])
return selected_frames
@staticmethod
def run_analysis(
requestor: InterProcessRequestor,
genai_client: GenAIClient,
review_inference_speed: InferenceSpeed,
camera: str,
final_data: dict[str, str],
thumbs: list[bytes],
concerns: list[str],
preferred_language: str | None,
) -> None:
start = datetime.datetime.now().timestamp()
metadata = genai_client.generate_review_description(
{
"camera": camera,
"objects": final_data["data"]["objects"],
"recognized_objects": final_data["data"]["sub_labels"],
"zones": final_data["data"]["zones"],
"timestamp": datetime.datetime.fromtimestamp(final_data["end_time"]),
},
thumbs,
concerns,
preferred_language,
)
review_inference_speed.update(datetime.datetime.now().timestamp() - start)
if not metadata:
return None
prev_data = copy.deepcopy(final_data)
final_data["data"]["metadata"] = metadata.model_dump()
requestor.send_data(
UPDATE_REVIEW_DESCRIPTION,
{
"type": "genai",
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in final_data.items()},
},
)