Files
blakeblackshear.frigate/frigate/test/http_api/test_http_export.py
Josh Hawkins e7e6f87682 Export improvements (#22867)
* backend

* frontend + i18n

* tests + api spec

* tweak backend to use Job infrastructure for exports

* frontend tweaks and Job infrastructure

* tests

* tweaks

- add ability to remove from case
- change location of counts in case card

* add stale export reaper on startup

* fix toaster close button color

* improve add dialog

* formatting

* hide max_concurrent from camera config export settings

* remove border

* refactor batch endpoint for multiple review items

* frontend

* tests and fastapi spec

* fix deletion of in-progress exports in a case

* tweaks

- hide cases when filtering cameras that have no exports from those cameras
- remove description from case card
- use textarea instead of input for case description in add new case dialog

* add auth exceptions for exports

* add e2e test for deleting cases with exports

* refactor delete and case endpoints

allow bulk deleting and reassigning

* frontend

- bulk selection like Review
- gate admin-only actions
- consolidate dialogs
- spacing/padding tweaks

* i18n and tests

* update openapi spec

* tweaks

- add None to case selection list
- allow new case creation from single cam export dialog

* fix codeql

* fix i18n

* remove unused

* fix frontend tests
2026-04-14 08:19:50 -06:00

1434 lines
50 KiB
Python

import os
import tempfile
from unittest.mock import patch
from frigate.jobs.export import (
ExportJob,
get_export_job_manager,
reap_stale_exports,
start_export_job,
)
from frigate.models import Export, ExportCase, Previews, Recordings
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
class TestHttpExport(BaseTestHttp):
def setUp(self):
super().setUp([Export, ExportCase, Previews, Recordings])
self.minimal_config["cameras"]["backyard"] = {
"ffmpeg": {
"inputs": [{"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]}]
},
"detect": {
"height": 1080,
"width": 1920,
"fps": 5,
},
}
self.app = super().create_app()
def tearDown(self):
self.app.dependency_overrides.clear()
super().tearDown()
def _insert_recording(
self,
recording_id: str,
camera: str,
start_time: float,
end_time: float,
) -> None:
Recordings.create(
id=recording_id,
camera=camera,
path=f"/tmp/{recording_id}.mp4",
start_time=start_time,
end_time=end_time,
duration=end_time - start_time,
motion=0,
objects=0,
dBFS=0,
segment_size=1,
regions=0,
motion_heatmap=[],
)
def test_create_export_case_uses_wall_clock_time(self):
with patch("frigate.api.export.time.time", return_value=1234.5):
with AuthTestClient(self.app) as client:
response = client.post(
"/cases",
json={
"name": "Investigation",
"description": "A test case",
},
)
assert response.status_code == 200
response_json = response.json()
assert response_json["created_at"] == 1234.5
assert response_json["updated_at"] == 1234.5
case = ExportCase.get(ExportCase.id == response_json["id"])
assert case.created_at.timestamp() == 1234.5
assert case.updated_at.timestamp() == 1234.5
def test_update_export_case_refreshes_updated_at(self):
case = ExportCase.create(
id="case123",
name="Old name",
description="Old description",
created_at=10,
updated_at=10,
)
with patch("frigate.api.export.time.time", return_value=2222.0):
with AuthTestClient(self.app) as client:
response = client.patch(
f"/cases/{case.id}",
json={"name": "New name", "description": "Updated"},
)
assert response.status_code == 200
refreshed = ExportCase.get(ExportCase.id == case.id)
assert refreshed.name == "New name"
assert refreshed.description == "Updated"
assert refreshed.updated_at.timestamp() == 2222.0
def test_delete_export_case_delete_exports_cancels_queued_jobs(self):
case = ExportCase.create(
id="case_delete_me",
name="Delete me",
description="",
created_at=10,
updated_at=10,
)
other_case = ExportCase.create(
id="case_keep_me",
name="Keep me",
description="",
created_at=20,
updated_at=20,
)
with tempfile.TemporaryDirectory() as tmpdir:
video_path = os.path.join(tmpdir, "case_export.mp4")
thumb_path = os.path.join(tmpdir, "case_export.webp")
other_video_path = os.path.join(tmpdir, "other_export.mp4")
other_thumb_path = os.path.join(tmpdir, "other_export.webp")
with open(video_path, "wb") as handle:
handle.write(b"case")
with open(thumb_path, "wb") as handle:
handle.write(b"thumb")
with open(other_video_path, "wb") as handle:
handle.write(b"other")
with open(other_thumb_path, "wb") as handle:
handle.write(b"thumb")
Export.create(
id="export_in_case",
camera="front_door",
name="Case export",
date=100,
video_path=video_path,
thumb_path=thumb_path,
in_progress=False,
export_case=case,
)
Export.create(
id="export_other_case",
camera="front_door",
name="Other export",
date=110,
video_path=other_video_path,
thumb_path=other_thumb_path,
in_progress=False,
export_case=other_case,
)
with (
patch("frigate.jobs.export._job_manager", None),
patch(
"frigate.jobs.export.ExportJobManager.ensure_started",
autospec=True,
return_value=None,
),
):
start_export_job(
self.app.frigate_config,
ExportJob(
id="queued_case_job",
camera="front_door",
export_case_id=case.id,
request_start_time=100,
request_end_time=120,
),
)
start_export_job(
self.app.frigate_config,
ExportJob(
id="queued_other_job",
camera="front_door",
export_case_id=other_case.id,
request_start_time=130,
request_end_time=150,
),
)
manager = get_export_job_manager(self.app.frigate_config)
assert {job.id for job in manager.list_active_jobs()} == {
"queued_case_job",
"queued_other_job",
}
with AuthTestClient(self.app) as client:
response = client.delete(f"/cases/{case.id}?delete_exports=true")
assert response.status_code == 200
assert ExportCase.get_or_none(ExportCase.id == case.id) is None
assert ExportCase.get_or_none(ExportCase.id == other_case.id) is not None
assert Export.get_or_none(Export.id == "export_in_case") is None
assert Export.get_or_none(Export.id == "export_other_case") is not None
assert not os.path.exists(video_path)
assert not os.path.exists(thumb_path)
cancelled_job = manager.get_job("queued_case_job")
assert cancelled_job is not None
assert cancelled_job.status == "cancelled"
remaining_job = manager.get_job("queued_other_job")
assert remaining_job is not None
assert remaining_job.status == "queued"
assert [job.id for job in manager.list_active_jobs()] == [
"queued_other_job"
]
def test_batch_export_creates_case_and_reports_partial_success(self):
self._insert_recording("rec-front", "front_door", 100, 200)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
"friendly_name": "Incident - Front Door",
},
{
"camera": "backyard",
"start_time": 110,
"end_time": 150,
"friendly_name": "Incident - Backyard",
},
],
"new_case_name": "Case Alpha",
"new_case_description": "Batch export",
},
)
assert response.status_code == 202
response_json = response.json()
assert len(response_json["export_ids"]) == 1
assert response_json["results"] == [
{
"camera": "front_door",
"export_id": response_json["export_ids"][0],
"success": True,
"status": "queued",
"error": None,
"item_index": 0,
"client_item_id": None,
},
{
"camera": "backyard",
"export_id": None,
"success": False,
"status": None,
"error": "No recordings found for time range",
"item_index": 1,
"client_item_id": None,
},
]
start_export_job.assert_called_once()
case = ExportCase.get(ExportCase.id == response_json["export_case_id"])
assert case.name == "Case Alpha"
assert case.description == "Batch export"
def test_single_export_is_queued_immediately(self):
self._insert_recording("rec-front", "front_door", 100, 200)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/export/front_door/start/110/end/150",
json={
"name": "Queued export",
},
)
assert response.status_code == 202
response_json = response.json()
assert response_json["success"] is True
assert response_json["status"] == "queued"
assert response_json["export_id"].startswith("front_door_")
start_export_job.assert_called_once()
def test_single_export_returns_503_when_queue_full(self):
self._insert_recording("rec-front", "front_door", 100, 200)
from frigate.jobs.export import ExportQueueFullError
with patch(
"frigate.api.export.start_export_job",
side_effect=ExportQueueFullError("Export queue is full"),
):
with AuthTestClient(self.app) as client:
response = client.post(
"/export/front_door/start/110/end/150",
json={
"name": "Rejected export",
},
)
assert response.status_code == 503
response_json = response.json()
assert response_json["success"] is False
assert "queue is full" in response_json["message"].lower()
def test_batch_export_returns_503_when_queue_cannot_fit_batch(self):
self._insert_recording("rec-front", "front_door", 100, 200)
self._insert_recording("rec-back", "backyard", 100, 200)
with patch(
"frigate.api.export.available_export_queue_slots",
return_value=1,
):
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
},
{
"camera": "backyard",
"start_time": 110,
"end_time": 150,
},
],
"new_case_name": "Overflow Case",
},
)
assert response.status_code == 503
assert response.json()["success"] is False
start_export_job.assert_not_called()
# Empty case should NOT have been created
assert ExportCase.select().count() == 0
def test_get_active_export_jobs_returns_queue_state(self):
queued_job = ExportJob(
id="front_door_queued",
camera="front_door",
status="queued",
request_start_time=100,
request_end_time=150,
)
with patch(
"frigate.api.export.list_active_export_jobs",
return_value=[queued_job],
):
with AuthTestClient(self.app) as client:
response = client.get("/jobs/export")
assert response.status_code == 200
assert response.json() == [queued_job.to_dict()]
def test_reap_stale_exports_deletes_rows_with_no_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
stale_video = os.path.join(tmpdir, "stale.mp4")
stale_thumb = os.path.join(tmpdir, "stale.webp")
# stale_video is intentionally NOT created
with open(stale_thumb, "w") as handle:
handle.write("thumb")
Export.create(
id="stale_no_file",
camera="front_door",
name="Stuck export",
date=100,
video_path=stale_video,
thumb_path=stale_thumb,
in_progress=True,
)
reap_stale_exports()
assert Export.get_or_none(Export.id == "stale_no_file") is None
assert not os.path.exists(stale_thumb)
def test_reap_stale_exports_recovers_rows_with_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
intact_video = os.path.join(tmpdir, "intact.mp4")
intact_thumb = os.path.join(tmpdir, "intact.webp")
with open(intact_video, "wb") as handle:
handle.write(b"not actually an mp4 but non-empty")
with open(intact_thumb, "wb") as handle:
handle.write(b"thumb")
case = ExportCase.create(
id="case_for_stale",
name="Curated case",
description="",
created_at=10,
updated_at=10,
)
Export.create(
id="stale_with_file",
camera="front_door",
name="Recoverable export",
date=200,
video_path=intact_video,
thumb_path=intact_thumb,
in_progress=True,
export_case=case,
)
reap_stale_exports()
recovered = Export.get(Export.id == "stale_with_file")
assert recovered.in_progress is False
# Case link must be cleared so the user re-triages the recovered row
assert recovered.export_case is None
# The case itself is untouched
assert ExportCase.get_or_none(ExportCase.id == "case_for_stale") is not None
# Recovered files must NOT be unlinked
assert os.path.exists(intact_video)
assert os.path.exists(intact_thumb)
def test_reap_stale_exports_delete_path_severs_case_link(self):
with tempfile.TemporaryDirectory() as tmpdir:
missing_video = os.path.join(tmpdir, "missing.mp4")
# file intentionally not created
case = ExportCase.create(
id="case_losing_member",
name="Case losing a member",
description="",
created_at=20,
updated_at=20,
)
Export.create(
id="stale_in_case_no_file",
camera="front_door",
name="Stuck and in a case",
date=250,
video_path=missing_video,
thumb_path="",
in_progress=True,
export_case=case,
)
reap_stale_exports()
# The export row is gone entirely
assert Export.get_or_none(Export.id == "stale_in_case_no_file") is None
# The case stays but has no exports pointing at it
remaining_case = ExportCase.get(ExportCase.id == "case_losing_member")
assert list(remaining_case.exports) == []
def test_reap_stale_exports_deletes_rows_with_empty_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
empty_video = os.path.join(tmpdir, "empty.mp4")
# Create a zero-byte file — partial ffmpeg output
open(empty_video, "w").close()
Export.create(
id="stale_empty_file",
camera="front_door",
name="Zero byte export",
date=300,
video_path=empty_video,
thumb_path="",
in_progress=True,
)
reap_stale_exports()
assert Export.get_or_none(Export.id == "stale_empty_file") is None
assert not os.path.exists(empty_video)
def test_reap_stale_exports_skips_completed_rows(self):
with tempfile.TemporaryDirectory() as tmpdir:
done_video = os.path.join(tmpdir, "done.mp4")
with open(done_video, "wb") as handle:
handle.write(b"done")
Export.create(
id="already_done",
camera="front_door",
name="Completed export",
date=400,
video_path=done_video,
thumb_path="",
in_progress=False,
)
reap_stale_exports()
row = Export.get(Export.id == "already_done")
assert row.in_progress is False
assert os.path.exists(done_video)
def test_batch_export_without_case_goes_to_uncategorized(self):
"""Exports without a case target go to uncategorized."""
self._insert_recording("rec-front", "front_door", 100, 400)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
}
],
},
)
assert response.status_code == 202
response_json = response.json()
assert response_json["export_case_id"] is None
assert ExportCase.select().count() == 0
# --- /exports/batch (item-shaped multi-export) ---------------------------
def test_batch_export_happy_path_creates_case_and_queues_all(self):
self._insert_recording("rec-front", "front_door", 100, 400)
self._insert_recording("rec-back", "backyard", 100, 400)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
},
{
"camera": "front_door",
"start_time": 200,
"end_time": 240,
},
{
"camera": "backyard",
"start_time": 300,
"end_time": 340,
},
],
"new_case_name": "Incident Apr 11",
"new_case_description": "Review items",
},
)
assert response.status_code == 202
response_json = response.json()
assert len(response_json["export_ids"]) == 3
assert all(r["success"] for r in response_json["results"])
assert [r["item_index"] for r in response_json["results"]] == [0, 1, 2]
assert start_export_job.call_count == 3
case = ExportCase.get(ExportCase.id == response_json["export_case_id"])
assert case.name == "Incident Apr 11"
assert case.description == "Review items"
def test_batch_export_existing_case_does_not_create_new_case(self):
self._insert_recording("rec-front", "front_door", 100, 400)
ExportCase.create(
id="existing_case",
name="Existing",
description="",
created_at=10,
updated_at=10,
)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
}
],
"export_case_id": "existing_case",
},
)
assert response.status_code == 202
assert response.json()["export_case_id"] == "existing_case"
# No additional case was created
assert ExportCase.select().count() == 1
def test_batch_export_empty_items_rejected(self):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={"items": [], "new_case_name": "Empty"},
)
assert response.status_code == 422
def test_batch_export_over_limit_rejected(self):
items = [
{"camera": "front_door", "start_time": 100 + i, "end_time": 100 + i + 5}
for i in range(51)
]
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={"items": items, "new_case_name": "Too many"},
)
assert response.status_code == 422
def test_batch_export_end_before_start_rejected(self):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 200,
"end_time": 100,
}
],
"new_case_name": "Bad range",
},
)
assert response.status_code == 422
assert (
response.json()["detail"][0]["msg"]
== "Value error, end_time must be after start_time"
)
def test_batch_export_non_admin_without_case_goes_to_uncategorized(self):
"""Non-admin batch exports go to uncategorized."""
self._insert_recording("rec-front", "front_door", 100, 400)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
headers={"remote-user": "viewer", "remote-role": "viewer"},
json={
"items": [
{
"camera": "front_door",
"start_time": 100,
"end_time": 150,
}
],
},
)
assert response.status_code == 202
response_json = response.json()
assert response_json["export_case_id"] is None
assert ExportCase.select().count() == 0
def test_batch_export_camera_access_denied_fails_closed(self):
from fastapi import Request
from frigate.api.auth import get_allowed_cameras_for_filter
self._insert_recording("rec-front", "front_door", 100, 400)
async def restricted(request: Request):
return ["front_door"]
self.app.dependency_overrides[get_allowed_cameras_for_filter] = restricted
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
},
{
"camera": "backyard", # not in allowed list
"start_time": 110,
"end_time": 150,
},
],
"new_case_name": "Nope",
},
)
assert response.status_code == 403
start_export_job.assert_not_called()
# No case created
assert ExportCase.select().count() == 0
def test_batch_export_case_not_found(self):
self._insert_recording("rec-front", "front_door", 100, 400)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
}
],
"export_case_id": "does_not_exist",
},
)
assert response.status_code == 404
def test_batch_export_per_item_missing_recordings_partial_success(self):
self._insert_recording("rec-front", "front_door", 100, 200)
# backyard has no recordings at all
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
},
{
"camera": "backyard",
"start_time": 110,
"end_time": 150,
},
],
"new_case_name": "Partial",
},
)
assert response.status_code == 202
response_json = response.json()
assert len(response_json["export_ids"]) == 1
results_by_camera = {r["camera"]: r for r in response_json["results"]}
assert results_by_camera["front_door"]["success"] is True
assert results_by_camera["backyard"]["success"] is False
assert (
results_by_camera["backyard"]["error"]
== "No recordings found for time range"
)
start_export_job.assert_called_once()
# Case is still created because at least one item succeeded
assert (
ExportCase.get(ExportCase.id == response_json["export_case_id"]) is not None
)
def test_batch_export_same_camera_different_ranges_one_missing(self):
# Recording covers 100-200 only. First item fits, second does not.
self._insert_recording("rec-front", "front_door", 100, 200)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
},
{
"camera": "front_door",
"start_time": 500,
"end_time": 540,
},
],
"new_case_name": "Split recordings",
},
)
assert response.status_code == 202
response_json = response.json()
assert len(response_json["export_ids"]) == 1
results = response_json["results"]
assert results[0]["success"] is True
assert results[0]["item_index"] == 0
assert results[1]["success"] is False
assert results[1]["item_index"] == 1
assert results[1]["error"] == "No recordings found for time range"
# Both results carry the same camera — item_index is the only way
# the client can tell them apart.
assert results[0]["camera"] == results[1]["camera"] == "front_door"
start_export_job.assert_called_once()
def test_batch_export_all_missing_recordings_rolls_back_case(self):
# No recordings inserted at all
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
}
],
"new_case_name": "Should rollback",
},
)
assert response.status_code == 400
start_export_job.assert_not_called()
assert ExportCase.select().count() == 0
def test_batch_export_preflight_queue_full(self):
self._insert_recording("rec-front", "front_door", 100, 400)
self._insert_recording("rec-back", "backyard", 100, 400)
with patch(
"frigate.api.export.available_export_queue_slots",
return_value=1,
):
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
},
{
"camera": "backyard",
"start_time": 110,
"end_time": 150,
},
],
"new_case_name": "Queue full",
},
)
assert response.status_code == 503
start_export_job.assert_not_called()
assert ExportCase.select().count() == 0
def test_batch_export_all_enqueue_calls_fail_rolls_back_case(self):
self._insert_recording("rec-front", "front_door", 100, 400)
def boom(_config, _job):
raise RuntimeError("simulated enqueue failure")
with patch(
"frigate.api.export.start_export_job",
side_effect=boom,
):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
}
],
"new_case_name": "Will fail",
},
)
assert response.status_code == 202
response_json = response.json()
assert response_json["export_ids"] == []
assert response_json["export_case_id"] is None
assert ExportCase.select().count() == 0
def test_batch_export_rejects_invalid_image_path(self):
self._insert_recording("rec-front", "front_door", 100, 400)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
"image_path": "/etc/passwd",
}
],
"new_case_name": "Bad image",
},
)
assert response.status_code == 400
assert ExportCase.select().count() == 0
def test_batch_export_non_admin_can_queue(self):
self._insert_recording("rec-front", "front_door", 100, 400)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
headers={"remote-user": "viewer", "remote-role": "viewer"},
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
}
],
"new_case_name": "Viewer export",
},
)
assert response.status_code == 202
assert len(response.json()["export_ids"]) == 1
def test_batch_export_non_admin_cannot_attach_to_existing_case(self):
"""Non-admins can create cases via new_case_name but cannot attach
to existing cases they did not create. Closes a write-path hole that
would otherwise be reachable through the unfiltered GET /cases list.
"""
self._insert_recording("rec-front", "front_door", 100, 400)
ExportCase.create(
id="admins_only_case",
name="Admins only",
description="",
created_at=10,
updated_at=10,
)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
headers={"remote-user": "viewer", "remote-role": "viewer"},
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
}
],
"export_case_id": "admins_only_case",
},
)
assert response.status_code == 403
start_export_job.assert_not_called()
# No exports should have been created in the target case
assert Export.select().count() == 0
def test_batch_export_admin_can_attach_to_existing_case(self):
self._insert_recording("rec-front", "front_door", 100, 400)
ExportCase.create(
id="shared_case",
name="Shared",
description="",
created_at=10,
updated_at=10,
)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
}
],
"export_case_id": "shared_case",
},
)
assert response.status_code == 202
assert response.json()["export_case_id"] == "shared_case"
# No additional case created
assert ExportCase.select().count() == 1
def test_batch_export_roundtrips_client_item_id(self):
self._insert_recording("rec-front", "front_door", 100, 400)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/batch",
json={
"items": [
{
"camera": "front_door",
"start_time": 110,
"end_time": 150,
"client_item_id": "review-123",
}
],
"new_case_name": "Client id test",
},
)
assert response.status_code == 202
assert response.json()["results"][0]["client_item_id"] == "review-123"
def test_single_export_non_admin_cannot_attach_to_existing_case(self):
"""The single-export route has the same hole: non-admins should not
be able to smuggle exports into an existing case via export_case_id.
Admin-gating this matches /exports/batch.
"""
self._insert_recording("rec-front", "front_door", 100, 400)
ExportCase.create(
id="admins_only_case",
name="Admins only",
description="",
created_at=10,
updated_at=10,
)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
) as start_export_job:
with AuthTestClient(self.app) as client:
response = client.post(
"/export/front_door/start/110/end/150",
headers={"remote-user": "viewer", "remote-role": "viewer"},
json={"export_case_id": "admins_only_case"},
)
assert response.status_code == 403
start_export_job.assert_not_called()
assert Export.select().count() == 0
def test_single_export_non_admin_can_still_export_without_case(self):
"""Regression guard: the admin gate only applies to export_case_id,
not to single exports in general. Non-admins should still be able
to start a single export for a camera they have access to.
"""
self._insert_recording("rec-front", "front_door", 100, 400)
with patch(
"frigate.api.export.start_export_job",
side_effect=lambda _config, job: job.id,
):
with AuthTestClient(self.app) as client:
response = client.post(
"/export/front_door/start/110/end/150",
headers={"remote-user": "viewer", "remote-role": "viewer"},
json={},
)
assert response.status_code == 202
assert response.json()["success"] is True
# ── Bulk delete exports ────────────────────────────────────────
def test_bulk_delete_exports_success(self):
"""All IDs exist, none in-progress → 200, all deleted."""
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
Export.create(
id="exp2",
camera="front_door",
name="export_2",
date=200,
video_path="/tmp/exp2.mp4",
thumb_path="/tmp/exp2.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/delete",
json={"ids": ["exp1", "exp2"]},
)
assert response.status_code == 200
assert response.json()["success"] is True
assert Export.select().count() == 0
def test_bulk_delete_exports_single_item(self):
"""Regression: single-item delete via batch endpoint."""
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/delete",
json={"ids": ["exp1"]},
)
assert response.status_code == 200
assert Export.select().count() == 0
def test_bulk_delete_exports_some_missing(self):
"""Some IDs don't exist → 404, nothing deleted."""
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/delete",
json={"ids": ["exp1", "nonexistent"]},
)
assert response.status_code == 404
# Nothing deleted
assert Export.select().count() == 1
def test_bulk_delete_exports_all_missing(self):
"""All IDs don't exist → 404."""
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/delete",
json={"ids": ["nope1", "nope2"]},
)
assert response.status_code == 404
def test_bulk_delete_exports_in_progress(self):
"""Some exports in-progress → 400, nothing deleted."""
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path=f"{os.environ.get('EXPORT_DIR', '/media/frigate/exports')}/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=True,
)
with patch(
"frigate.api.export._get_files_in_use",
return_value={"exp1.mp4"},
):
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/delete",
json={"ids": ["exp1"]},
)
assert response.status_code == 400
assert Export.select().count() == 1
def test_bulk_delete_exports_non_admin_rejected(self):
"""Non-admin users cannot bulk delete."""
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/delete",
headers={"remote-user": "viewer", "remote-role": "viewer"},
json={"ids": ["exp1"]},
)
assert response.status_code == 403
assert Export.select().count() == 1
# ── Bulk reassign exports ──────────────────────────────────────
def test_bulk_reassign_exports_to_case(self):
"""All IDs exist, case exists → 200, all reassigned."""
ExportCase.create(
id="case1",
name="Test Case",
description="",
created_at=10,
updated_at=10,
)
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
Export.create(
id="exp2",
camera="front_door",
name="export_2",
date=200,
video_path="/tmp/exp2.mp4",
thumb_path="/tmp/exp2.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/reassign",
json={"ids": ["exp1", "exp2"], "export_case_id": "case1"},
)
assert response.status_code == 200
assert response.json()["success"] is True
for exp_id in ["exp1", "exp2"]:
exp = Export.get(Export.id == exp_id)
assert exp.export_case_id == "case1"
def test_bulk_reassign_exports_to_null(self):
"""Reassign to null (uncategorize) → 200."""
ExportCase.create(
id="case1",
name="Test Case",
description="",
created_at=10,
updated_at=10,
)
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
export_case="case1",
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/reassign",
json={"ids": ["exp1"], "export_case_id": None},
)
assert response.status_code == 200
exp = Export.get(Export.id == "exp1")
assert exp.export_case_id is None
def test_bulk_reassign_exports_single_item(self):
"""Regression: single-item reassign via batch endpoint."""
ExportCase.create(
id="case1",
name="Test Case",
description="",
created_at=10,
updated_at=10,
)
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/reassign",
json={"ids": ["exp1"], "export_case_id": "case1"},
)
assert response.status_code == 200
exp = Export.get(Export.id == "exp1")
assert exp.export_case_id == "case1"
def test_bulk_reassign_exports_some_missing(self):
"""Some IDs don't exist → 404, nothing reassigned."""
ExportCase.create(
id="case1",
name="Test Case",
description="",
created_at=10,
updated_at=10,
)
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/reassign",
json={
"ids": ["exp1", "nonexistent"],
"export_case_id": "case1",
},
)
assert response.status_code == 404
# Nothing reassigned
exp = Export.get(Export.id == "exp1")
assert exp.export_case_id is None
def test_bulk_reassign_exports_case_not_found(self):
"""Target case doesn't exist → 404."""
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/reassign",
json={"ids": ["exp1"], "export_case_id": "nonexistent"},
)
assert response.status_code == 404
exp = Export.get(Export.id == "exp1")
assert exp.export_case_id is None
def test_bulk_reassign_exports_non_admin_rejected(self):
"""Non-admin users cannot bulk reassign."""
Export.create(
id="exp1",
camera="front_door",
name="export_1",
date=100,
video_path="/tmp/exp1.mp4",
thumb_path="/tmp/exp1.jpg",
in_progress=False,
)
with AuthTestClient(self.app) as client:
response = client.post(
"/exports/reassign",
headers={"remote-user": "viewer", "remote-role": "viewer"},
json={"ids": ["exp1"], "export_case_id": None},
)
assert response.status_code == 403