Enable event snapshot API to honour query params after event ends (#22375)

* Enable event snapshot API to honour query params

* fix unused imports

* Fixes

* Run ruff check --fix

* Web changes

* Further config and web fixes

* Further docs tweak

* Fix missing quality default in MediaEventsSnapshotQueryParams

* Manual events: don't save annotated jpeg; store frame time

* Remove unnecessary grayscale helper

* Add caveat to docs on snapshot_frame_time pre-0.18

* JPG snapshot should not be treated as clean

* Ensure tracked details uses uncropped, bbox'd snapshot

* Ensure all UI pages / menu actions use uncropped, bbox'd

* web lint

* Add missed config helper text

* Expect  SnapshotsConfig not Any

* docs: Remove pre-0.18 note

* Specify timestamp=0 in the UI

* Move tests out of http media

* Correct missed settings.json wording

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>

* Revert to default None for quality

* Correct camera snapshot config wording

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>

* Fix quality=0 handling

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>

* Fix quality=0 handling #2

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>

* ReRun generate_config_translations

---------

Co-authored-by: leccelecce <example@example.com>
Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
This commit is contained in:
leccelecce
2026-03-22 19:33:04 +00:00
committed by GitHub
parent b6c03c99de
commit ec7040bed5
32 changed files with 797 additions and 475 deletions

View File

@@ -35,7 +35,7 @@ class MediaEventsSnapshotQueryParams(BaseModel):
bbox: Optional[int] = None
crop: Optional[int] = None
height: Optional[int] = None
quality: Optional[int] = 70
quality: Optional[int] = None
class MediaMjpegFeedQueryParams(BaseModel):

View File

@@ -13,7 +13,6 @@ from pathlib import Path
from typing import List
from urllib.parse import unquote
import cv2
import numpy as np
from fastapi import APIRouter, Request
from fastapi.params import Depends
@@ -62,7 +61,7 @@ from frigate.const import CLIPS_DIR, TRIGGER_DIR
from frigate.embeddings import EmbeddingsContext
from frigate.models import Event, ReviewSegment, Timeline, Trigger
from frigate.track.object_processing import TrackedObject
from frigate.util.file import get_event_thumbnail_bytes
from frigate.util.file import get_event_thumbnail_bytes, load_event_snapshot_image
from frigate.util.time import get_dst_transitions, get_tz_modifiers
logger = logging.getLogger(__name__)
@@ -1082,30 +1081,8 @@ async def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = N
content=({"success": False, "message": message}), status_code=400
)
# load clean.webp or clean.png (legacy)
try:
filename_webp = f"{event.camera}-{event.id}-clean.webp"
filename_png = f"{event.camera}-{event.id}-clean.png"
image_path = None
if os.path.exists(os.path.join(CLIPS_DIR, filename_webp)):
image_path = os.path.join(CLIPS_DIR, filename_webp)
elif os.path.exists(os.path.join(CLIPS_DIR, filename_png)):
image_path = os.path.join(CLIPS_DIR, filename_png)
if image_path is None:
logger.error(f"Unable to find clean snapshot for event: {event.id}")
return JSONResponse(
content=(
{
"success": False,
"message": "Unable to find clean snapshot for event",
}
),
status_code=400,
)
image = cv2.imread(image_path)
image, is_clean_snapshot = load_event_snapshot_image(event, clean_only=True)
except Exception:
logger.error(f"Unable to load clean snapshot for event: {event.id}")
return JSONResponse(
@@ -1115,11 +1092,14 @@ async def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = N
status_code=400,
)
if image is None or image.size == 0:
logger.error(f"Unable to load clean snapshot for event: {event.id}")
if not is_clean_snapshot or image is None or image.size == 0:
logger.error(f"Unable to find clean snapshot for event: {event.id}")
return JSONResponse(
content=(
{"success": False, "message": "Unable to load clean snapshot for event"}
{
"success": False,
"message": "Unable to find clean snapshot for event",
}
),
status_code=400,
)

View File

@@ -35,9 +35,9 @@ from frigate.api.defs.query.media_query_parameters import (
from frigate.api.defs.tags import Tags
from frigate.camera.state import CameraState
from frigate.config import FrigateConfig
from frigate.config.camera.snapshots import SnapshotsConfig
from frigate.const import (
CACHE_DIR,
CLIPS_DIR,
INSTALL_DIR,
MAX_SEGMENT_DURATION,
PREVIEW_FRAME_TYPE,
@@ -45,8 +45,13 @@ from frigate.const import (
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
from frigate.output.preview import get_most_recent_preview_frame
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.file import get_event_thumbnail_bytes
from frigate.util.image import get_image_from_recording
from frigate.util.file import (
get_event_snapshot_bytes,
get_event_snapshot_path,
get_event_thumbnail_bytes,
load_event_snapshot_image,
)
from frigate.util.image import get_image_from_recording, get_image_quality_params
logger = logging.getLogger(__name__)
@@ -110,6 +115,24 @@ def imagestream(
)
def _resolve_snapshot_settings(
snapshot_config: SnapshotsConfig, params: MediaEventsSnapshotQueryParams
) -> dict[str, Any]:
return {
"timestamp": snapshot_config.timestamp
if params.timestamp is None
else bool(params.timestamp),
"bounding_box": snapshot_config.bounding_box
if params.bbox is None
else bool(params.bbox),
"crop": snapshot_config.crop if params.crop is None else bool(params.crop),
"height": snapshot_config.height if params.height is None else params.height,
"quality": snapshot_config.quality
if params.quality is None
else params.quality,
}
@router.get("/{camera_name}/ptz/info", dependencies=[Depends(require_camera_access)])
async def camera_ptz_info(request: Request, camera_name: str):
if camera_name in request.app.frigate_config.cameras:
@@ -147,14 +170,7 @@ async def latest_frame(
"paths": params.paths,
"regions": params.regions,
}
quality = params.quality
if extension == Extension.png:
quality_params = None
elif extension == Extension.webp:
quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), quality]
else: # jpg or jpeg
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
quality_params = get_image_quality_params(extension.value, params.quality)
if camera_name in request.app.frigate_config.cameras:
frame = frame_processor.get_current_frame(camera_name, draw_options)
@@ -729,7 +745,7 @@ async def vod_clip(
@router.get(
"/events/{event_id}/snapshot.jpg",
description="Returns a snapshot image for the specified object id. NOTE: The query params only take affect while the event is in-progress. Once the event has ended the snapshot configuration is used.",
description="Returns a snapshot image for the specified object id.",
)
async def event_snapshot(
request: Request,
@@ -748,11 +764,22 @@ async def event_snapshot(
content={"success": False, "message": "Snapshot not available"},
status_code=404,
)
# read snapshot from disk
with open(
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg"), "rb"
) as image_file:
jpg_bytes = image_file.read()
snapshot_settings = _resolve_snapshot_settings(
request.app.frigate_config.cameras[event.camera].snapshots, params
)
jpg_bytes, frame_time = get_event_snapshot_bytes(
event,
ext="jpg",
timestamp=snapshot_settings["timestamp"],
bounding_box=snapshot_settings["bounding_box"],
crop=snapshot_settings["crop"],
height=snapshot_settings["height"],
quality=snapshot_settings["quality"],
timestamp_style=request.app.frigate_config.cameras[
event.camera
].timestamp_style,
colormap=request.app.frigate_config.model.colormap,
)
except DoesNotExist:
# see if the object is currently being tracked
try:
@@ -763,13 +790,16 @@ async def event_snapshot(
if event_id in camera_state.tracked_objects:
tracked_obj = camera_state.tracked_objects.get(event_id)
if tracked_obj is not None:
snapshot_settings = _resolve_snapshot_settings(
camera_state.camera_config.snapshots, params
)
jpg_bytes, frame_time = tracked_obj.get_img_bytes(
ext="jpg",
timestamp=params.timestamp,
bounding_box=params.bbox,
crop=params.crop,
height=params.height,
quality=params.quality,
timestamp=snapshot_settings["timestamp"],
bounding_box=snapshot_settings["bounding_box"],
crop=snapshot_settings["crop"],
height=snapshot_settings["height"],
quality=snapshot_settings["quality"],
)
await require_camera_access(camera_state.name, request=request)
except Exception:
@@ -865,13 +895,11 @@ async def event_thumbnail(
(0, 0, 0),
)
quality_params = None
if extension in (Extension.jpg, Extension.jpeg):
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), 70]
elif extension == Extension.webp:
quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), 60]
_, img = cv2.imencode(f".{extension.value}", thumbnail, quality_params)
_, img = cv2.imencode(
f".{extension.value}",
thumbnail,
get_image_quality_params(extension.value, None),
)
thumbnail_bytes = img.tobytes()
return Response(
@@ -1029,14 +1057,16 @@ def clear_region_grid(request: Request, camera_name: str):
)
def event_snapshot_clean(request: Request, event_id: str, download: bool = False):
webp_bytes = None
event_complete = False
try:
event = Event.get(Event.id == event_id)
event_complete = event.end_time is not None
snapshot_config = request.app.frigate_config.cameras[event.camera].snapshots
if not (snapshot_config.enabled and event.has_snapshot):
return JSONResponse(
content={
"success": False,
"message": "Snapshots and clean_copy must be enabled in the config",
"message": "Snapshots must be enabled in the config",
},
status_code=404,
)
@@ -1068,54 +1098,10 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
)
if webp_bytes is None:
try:
# webp
clean_snapshot_path_webp = os.path.join(
CLIPS_DIR, f"{event.camera}-{event.id}-clean.webp"
image_path, is_clean_snapshot = get_event_snapshot_path(
event, clean_only=True
)
# png (legacy)
clean_snapshot_path_png = os.path.join(
CLIPS_DIR, f"{event.camera}-{event.id}-clean.png"
)
if os.path.exists(clean_snapshot_path_webp):
with open(clean_snapshot_path_webp, "rb") as image_file:
webp_bytes = image_file.read()
elif os.path.exists(clean_snapshot_path_png):
# convert png to webp and save for future use
png_image = cv2.imread(clean_snapshot_path_png, cv2.IMREAD_UNCHANGED)
if png_image is None:
return JSONResponse(
content={
"success": False,
"message": "Invalid png snapshot",
},
status_code=400,
)
ret, webp_data = cv2.imencode(
".webp", png_image, [int(cv2.IMWRITE_WEBP_QUALITY), 60]
)
if not ret:
return JSONResponse(
content={
"success": False,
"message": "Unable to convert png to webp",
},
status_code=400,
)
webp_bytes = webp_data.tobytes()
# save the converted webp for future requests
try:
with open(clean_snapshot_path_webp, "wb") as f:
f.write(webp_bytes)
except Exception as e:
logger.warning(
f"Failed to save converted webp for event {event.id}: {e}"
)
# continue since we now have the data to return
else:
if not is_clean_snapshot or image_path is None:
return JSONResponse(
content={
"success": False,
@@ -1123,6 +1109,34 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
},
status_code=404,
)
if image_path.endswith(".webp"):
with open(image_path, "rb") as image_file:
webp_bytes = image_file.read()
else:
image = load_event_snapshot_image(event, clean_only=True)[0]
if image is None:
return JSONResponse(
content={
"success": False,
"message": "Unable to load clean snapshot for event",
},
status_code=400,
)
ret, webp_data = cv2.imencode(
".webp", image, get_image_quality_params("webp", None)
)
if not ret:
return JSONResponse(
content={
"success": False,
"message": "Unable to convert snapshot to webp",
},
status_code=400,
)
webp_bytes = webp_data.tobytes()
except Exception:
logger.error(f"Unable to load clean snapshot for event: {event.id}")
return JSONResponse(
@@ -1135,7 +1149,7 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
headers = {
"Content-Type": "image/webp",
"Cache-Control": "private, max-age=31536000",
"Cache-Control": "private, max-age=31536000" if event_complete else "no-cache",
}
if download: