Files
blakeblackshear.frigate/frigate/api/export.py
Josh Hawkins e7e6f87682 Export improvements (#22867)
* backend

* frontend + i18n

* tests + api spec

* tweak backend to use Job infrastructure for exports

* frontend tweaks and Job infrastructure

* tests

* tweaks

- add ability to remove from case
- change location of counts in case card

* add stale export reaper on startup

* fix toaster close button color

* improve add dialog

* formatting

* hide max_concurrent from camera config export settings

* remove border

* refactor batch endpoint for multiple review items

* frontend

* tests and fastapi spec

* fix deletion of in-progress exports in a case

* tweaks

- hide cases when filtering cameras that have no exports from those cameras
- remove description from case card
- use textarea instead of input for case description in add new case dialog

* add auth exceptions for exports

* add e2e test for deleting cases with exports

* refactor delete and case endpoints

allow bulk deleting and reassigning

* frontend

- bulk selection like Review
- gate admin-only actions
- consolidate dialogs
- spacing/padding tweaks

* i18n and tests

* update openapi spec

* tweaks

- add None to case selection list
- allow new case creation from single cam export dialog

* fix codeql

* fix i18n

* remove unused

* fix frontend tests
2026-04-14 08:19:50 -06:00

1004 lines
32 KiB
Python

"""Export apis."""
import datetime
import logging
import random
import string
import time
from pathlib import Path
from typing import List, Optional
import psutil
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import JSONResponse
from pathvalidate import sanitize_filepath
from peewee import DoesNotExist
from playhouse.shortcuts import model_to_dict
from frigate.api.auth import (
allow_any_authenticated,
get_allowed_cameras_for_filter,
get_current_user,
require_camera_access,
require_role,
)
from frigate.api.defs.request.batch_export_body import (
BatchExportBody,
BatchExportItem,
)
from frigate.api.defs.request.export_bulk_body import (
ExportBulkDeleteBody,
ExportBulkReassignBody,
)
from frigate.api.defs.request.export_case_body import (
ExportCaseCreateBody,
ExportCaseUpdateBody,
)
from frigate.api.defs.request.export_recordings_body import (
ExportRecordingsBody,
ExportRecordingsCustomBody,
)
from frigate.api.defs.request.export_rename_body import ExportRenameBody
from frigate.api.defs.response.export_case_response import (
ExportCaseModel,
ExportCasesResponse,
)
from frigate.api.defs.response.export_response import (
BatchExportResponse,
ExportJobModel,
ExportJobsResponse,
ExportModel,
ExportsResponse,
StartExportResponse,
)
from frigate.api.defs.response.generic_response import GenericResponse
from frigate.api.defs.tags import Tags
from frigate.const import CLIPS_DIR, EXPORT_DIR
from frigate.jobs.export import (
ExportJob,
ExportQueueFullError,
available_export_queue_slots,
cancel_queued_export_jobs_for_case,
get_export_job,
list_active_export_jobs,
start_export_job,
)
from frigate.models import Export, ExportCase, Previews, Recordings
from frigate.record.export import (
DEFAULT_TIME_LAPSE_FFMPEG_ARGS,
PlaybackSourceEnum,
validate_ffmpeg_args,
)
from frigate.util.time import is_current_hour
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.export])
def _generate_id(length: int = 12) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
def _generate_export_id(camera_name: str) -> str:
return f"{camera_name}_{_generate_id(6)}"
def _create_export_case_record(
name: str,
description: Optional[str],
) -> ExportCase:
now = datetime.datetime.fromtimestamp(time.time())
return ExportCase.create(
id=_generate_id(),
name=name,
description=description,
created_at=now,
updated_at=now,
)
def _validate_camera_name(request: Request, camera_name: str) -> Optional[JSONResponse]:
if camera_name and request.app.frigate_config.cameras.get(camera_name):
return None
return JSONResponse(
content={"success": False, "message": f"{camera_name} is not a valid camera."},
status_code=404,
)
def _validate_export_case(export_case_id: Optional[str]) -> Optional[JSONResponse]:
if export_case_id is None:
return None
try:
ExportCase.get(ExportCase.id == export_case_id)
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found"},
status_code=404,
)
return None
def _sanitize_existing_image(
image_path: Optional[str],
) -> tuple[Optional[str], Optional[JSONResponse]]:
existing_image = sanitize_filepath(image_path) if image_path else None
if existing_image and not existing_image.startswith(CLIPS_DIR):
return None, JSONResponse(
content={"success": False, "message": "Invalid image path"},
status_code=400,
)
return existing_image, None
def _validate_export_source(
camera_name: str,
start_time: float,
end_time: float,
playback_source: PlaybackSourceEnum,
) -> Optional[str]:
if playback_source == PlaybackSourceEnum.recordings:
recordings_count = (
Recordings.select()
.where(
Recordings.start_time.between(start_time, end_time)
| Recordings.end_time.between(start_time, end_time)
| (
(start_time > Recordings.start_time)
& (end_time < Recordings.end_time)
)
)
.where(Recordings.camera == camera_name)
.count()
)
if recordings_count <= 0:
return "No recordings found for time range"
return None
previews_count = (
Previews.select()
.where(
Previews.start_time.between(start_time, end_time)
| Previews.end_time.between(start_time, end_time)
| ((start_time > Previews.start_time) & (end_time < Previews.end_time))
)
.where(Previews.camera == camera_name)
.count()
)
if not is_current_hour(start_time) and previews_count <= 0:
return "No previews found for time range"
return None
def _get_item_recording_export_errors(
request: Request,
items: list[BatchExportItem],
) -> dict[int, str]:
"""Return {item_index: error message} for items with invalid state.
Checks camera configuration and recording presence per item. Groups by
camera and issues one query per unique camera covering that camera's
full requested range, then checks each item's range against the returned
rows in Python. This avoids O(N) DB round-trips on large batches.
"""
configured_cameras = request.app.frigate_config.cameras
errors: dict[int, str] = {}
# Validate camera configuration first
item_ranges_by_camera: dict[str, list[tuple[int, float, float]]] = {}
for index, item in enumerate(items):
if not configured_cameras.get(item.camera):
errors[index] = f"{item.camera} is not a valid camera."
continue
item_ranges_by_camera.setdefault(item.camera, []).append(
(index, item.start_time, item.end_time)
)
if not item_ranges_by_camera:
return errors
# For each camera, fetch recordings that cover the union of ranges
for camera_name, indexed_ranges in item_ranges_by_camera.items():
min_start = min(r[1] for r in indexed_ranges)
max_end = max(r[2] for r in indexed_ranges)
recording_ranges = list(
Recordings.select(Recordings.start_time, Recordings.end_time)
.where(
Recordings.camera == camera_name,
Recordings.start_time.between(min_start, max_end)
| Recordings.end_time.between(min_start, max_end)
| (
(min_start > Recordings.start_time)
& (max_end < Recordings.end_time)
),
)
.iterator()
)
for index, start_time, end_time in indexed_ranges:
has_recording = any(
(
start_time <= rec.start_time <= end_time
or start_time <= rec.end_time <= end_time
or (start_time > rec.start_time and end_time < rec.end_time)
)
for rec in recording_ranges
)
if not has_recording:
errors[index] = "No recordings found for time range"
return errors
def _build_export_job(
camera_name: str,
start_time: float,
end_time: float,
friendly_name: Optional[str],
existing_image: Optional[str],
playback_source: PlaybackSourceEnum,
export_case_id: Optional[str],
ffmpeg_input_args: Optional[str] = None,
ffmpeg_output_args: Optional[str] = None,
cpu_fallback: bool = False,
) -> ExportJob:
return ExportJob(
id=_generate_export_id(camera_name),
camera=camera_name,
name=friendly_name,
image_path=existing_image,
export_case_id=export_case_id,
request_start_time=int(start_time),
request_end_time=int(end_time),
playback_source=playback_source.value,
ffmpeg_input_args=ffmpeg_input_args,
ffmpeg_output_args=ffmpeg_output_args,
cpu_fallback=cpu_fallback,
)
def _export_case_to_dict(case: ExportCase) -> dict[str, object]:
case_dict = model_to_dict(case)
for field in ("created_at", "updated_at"):
value = case_dict.get(field)
if isinstance(value, datetime.datetime):
case_dict[field] = value.timestamp()
return case_dict
@router.get(
"/exports",
response_model=ExportsResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Get exports",
description="""Gets all exports from the database for cameras the user has access to.
Returns a list of exports ordered by date (most recent first).""",
)
def get_exports(
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
export_case_id: Optional[str] = None,
cameras: Optional[str] = Query(default="all"),
start_date: Optional[float] = None,
end_date: Optional[float] = None,
):
query = Export.select().where(Export.camera << allowed_cameras)
if export_case_id is not None:
if export_case_id == "unassigned":
query = query.where(Export.export_case.is_null(True))
else:
query = query.where(Export.export_case == export_case_id)
if cameras and cameras != "all":
requested = set(cameras.split(","))
filtered_cameras = list(requested.intersection(allowed_cameras))
if not filtered_cameras:
return JSONResponse(content=[])
query = query.where(Export.camera << filtered_cameras)
if start_date is not None:
query = query.where(Export.date >= start_date)
if end_date is not None:
query = query.where(Export.date <= end_date)
exports = query.order_by(Export.date.desc()).dicts().iterator()
return JSONResponse(content=[e for e in exports])
@router.get(
"/cases",
response_model=ExportCasesResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Get export cases",
description="Gets all export cases from the database.",
)
def get_export_cases():
cases = ExportCase.select().order_by(ExportCase.created_at.desc()).iterator()
return JSONResponse(content=[_export_case_to_dict(case) for case in cases])
@router.post(
"/cases",
response_model=ExportCaseModel,
dependencies=[Depends(require_role(["admin"]))],
summary="Create export case",
description="Creates a new export case.",
)
def create_export_case(body: ExportCaseCreateBody):
case = _create_export_case_record(body.name, body.description)
return JSONResponse(content=_export_case_to_dict(case))
@router.get(
"/cases/{case_id}",
response_model=ExportCaseModel,
dependencies=[Depends(allow_any_authenticated())],
summary="Get a single export case",
description="Gets a specific export case by ID.",
)
def get_export_case(case_id: str):
try:
case = ExportCase.get(ExportCase.id == case_id)
return JSONResponse(content=_export_case_to_dict(case))
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found"},
status_code=404,
)
@router.patch(
"/cases/{case_id}",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Update export case",
description="Updates an existing export case.",
)
def update_export_case(case_id: str, body: ExportCaseUpdateBody):
try:
case = ExportCase.get(ExportCase.id == case_id)
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found"},
status_code=404,
)
if body.name is not None:
case.name = body.name
if body.description is not None:
case.description = body.description
case.updated_at = datetime.datetime.fromtimestamp(time.time())
case.save()
return JSONResponse(
content={"success": True, "message": "Successfully updated export case."}
)
@router.delete(
"/cases/{case_id}",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete export case",
description="""Deletes an export case.\n Exports that reference this case will have their export_case set to null.\n """,
)
def delete_export_case(case_id: str, request: Request, delete_exports: bool = False):
try:
case = ExportCase.get(ExportCase.id == case_id)
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found"},
status_code=404,
)
if delete_exports:
cancel_queued_export_jobs_for_case(request.app.frigate_config, case_id)
exports = list(Export.select().where(Export.export_case == case_id))
for export in exports:
Path(export.video_path).unlink(missing_ok=True)
if export.thumb_path:
Path(export.thumb_path).unlink(missing_ok=True)
export.delete_instance()
else:
# Unassign exports from this case but keep the exports themselves
Export.update(export_case=None).where(Export.export_case == case_id).execute()
case.delete_instance()
return JSONResponse(
content={"success": True, "message": "Successfully deleted export case."}
)
@router.get(
"/jobs/export",
response_model=ExportJobsResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Get active export jobs",
description="Gets queued and running export jobs.",
)
def get_active_export_jobs(
request: Request,
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
):
jobs = list_active_export_jobs(request.app.frigate_config)
return JSONResponse(
content=[job.to_dict() for job in jobs if job.camera in allowed_cameras]
)
@router.get(
"/jobs/export/{export_id}",
response_model=ExportJobModel,
dependencies=[Depends(allow_any_authenticated())],
summary="Get export job status",
description="Gets queued, running, or completed status for a specific export job.",
)
async def get_export_job_status(export_id: str, request: Request):
job = get_export_job(request.app.frigate_config, export_id)
if job is None:
return JSONResponse(
content={"success": False, "message": "Job not found"},
status_code=404,
)
await require_camera_access(job.camera, request=request)
return JSONResponse(content=job.to_dict())
@router.post(
"/exports/batch",
response_model=BatchExportResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Start recording export batch",
description=(
"Starts recording exports for a batch of items, each with its own camera "
"and time range, and assigns them to a single export case. Attaching to "
"an existing case is temporarily admin-only until case-level ACLs exist."
),
)
def export_recordings_batch(
request: Request,
body: BatchExportBody,
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
current_user: dict = Depends(get_current_user),
):
if isinstance(current_user, JSONResponse):
return current_user
# Stopgap: attaching to an existing case remains admin-only until
# case-level ACLs exist. Non-admins can still create a fresh case
# as a side effect of queueing items they already have camera access to.
if body.export_case_id is not None and current_user["role"] != "admin":
return JSONResponse(
content={
"success": False,
"message": "Only admins can attach exports to an existing case.",
},
status_code=403,
)
case_validation_error = _validate_export_case(body.export_case_id)
if case_validation_error is not None:
return case_validation_error
# Fail-closed camera access: any item referencing an inaccessible
# camera rejects the whole request. The UI's review list is already
# filtered by camera access, so reaching this branch implies a stale
# session or a crafted request — reject loudly rather than silently
# dropping items.
allowed_camera_set = set(allowed_cameras)
for item in body.items:
if item.camera not in allowed_camera_set:
return JSONResponse(
content={
"success": False,
"message": f"Cannot export from {item.camera}: access denied",
},
status_code=403,
)
# Sanitize each item's image_path up front. A bad path in any item
# kills the whole request, consistent with single-export behavior.
sanitized_images: list[Optional[str]] = []
for item in body.items:
existing_image, image_validation_error = _sanitize_existing_image(
item.image_path
)
if image_validation_error is not None:
return image_validation_error
sanitized_images.append(existing_image)
item_errors = _get_item_recording_export_errors(request, body.items)
queueable_indexes = [
index for index in range(len(body.items)) if index not in item_errors
]
if not queueable_indexes:
return JSONResponse(
content={
"success": False,
"message": (
"No exports could be queued: no recordings found for the "
"requested ranges."
),
},
status_code=400,
)
# Preflight admission: reject the whole batch if we can't fit every
# queueable item. Prevents partial batches where the tail fails with
# "queue full" after we've already created a case.
if available_export_queue_slots(request.app.frigate_config) < len(
queueable_indexes
):
return JSONResponse(
content={
"success": False,
"message": "Export queue is full. Try again once current exports finish.",
},
status_code=503,
)
export_case = None
export_case_id = body.export_case_id
if export_case_id is None and body.new_case_name:
export_case = _create_export_case_record(
body.new_case_name,
body.new_case_description,
)
export_case_id = export_case.id
export_ids: list[str] = []
results: list[dict[str, Optional[str] | bool | int]] = []
for index, item in enumerate(body.items):
if index in item_errors:
results.append(
{
"camera": item.camera,
"export_id": None,
"success": False,
"status": None,
"error": item_errors[index],
"item_index": index,
"client_item_id": item.client_item_id,
}
)
continue
export_job = _build_export_job(
item.camera,
item.start_time,
item.end_time,
item.friendly_name,
sanitized_images[index],
PlaybackSourceEnum.recordings,
export_case_id,
)
try:
start_export_job(request.app.frigate_config, export_job)
except Exception:
logger.exception("Failed to queue export job %s", export_job.id)
results.append(
{
"camera": item.camera,
"export_id": None,
"success": False,
"status": None,
"error": "Failed to queue export job",
"item_index": index,
"client_item_id": item.client_item_id,
}
)
continue
export_ids.append(export_job.id)
results.append(
{
"camera": item.camera,
"export_id": export_job.id,
"success": True,
"status": "queued",
"error": None,
"item_index": index,
"client_item_id": item.client_item_id,
}
)
if export_case is not None and not export_ids:
export_case.delete_instance()
export_case_id = None
return JSONResponse(
content={
"export_case_id": export_case_id,
"export_ids": export_ids,
"results": results,
},
status_code=202,
)
@router.post(
"/export/{camera_name}/start/{start_time}/end/{end_time}",
response_model=StartExportResponse,
dependencies=[Depends(require_camera_access)],
summary="Start recording export",
description="""Starts an export of a recording for the specified time range.
The export can be from recordings or preview footage. Returns the export ID if
successful, or an error message if the camera is invalid or no recordings/previews
are found for the time range.""",
)
def export_recording(
request: Request,
camera_name: str,
start_time: float,
end_time: float,
body: ExportRecordingsBody,
current_user: dict = Depends(get_current_user),
):
if isinstance(current_user, JSONResponse):
return current_user
camera_validation_error = _validate_camera_name(request, camera_name)
if camera_validation_error is not None:
return camera_validation_error
playback_source = body.source
friendly_name = body.name
existing_image, image_validation_error = _sanitize_existing_image(body.image_path)
if image_validation_error is not None:
return image_validation_error
export_case_id = body.export_case_id
# Attaching to an existing case requires admin. Single-export for
# cameras the user can access is otherwise non-admin; we only gate
# the case-attachment side effect.
if export_case_id is not None and current_user["role"] != "admin":
return JSONResponse(
content={
"success": False,
"message": "Only admins can attach exports to an existing case.",
},
status_code=403,
)
case_validation_error = _validate_export_case(export_case_id)
if case_validation_error is not None:
return case_validation_error
source_error = _validate_export_source(
camera_name,
start_time,
end_time,
playback_source,
)
if source_error is not None:
return JSONResponse(
content={"success": False, "message": source_error},
status_code=400,
)
export_job = _build_export_job(
camera_name,
start_time,
end_time,
friendly_name,
existing_image,
playback_source,
export_case_id,
)
try:
start_export_job(request.app.frigate_config, export_job)
except ExportQueueFullError:
logger.warning("Export queue is full; rejecting %s", export_job.id)
return JSONResponse(
content={
"success": False,
"message": "Export queue is full. Try again once current exports finish.",
},
status_code=503,
)
return JSONResponse(
content=(
{
"success": True,
"message": "Export queued.",
"export_id": export_job.id,
"status": "queued",
}
),
status_code=202,
)
@router.patch(
"/export/{event_id}/rename",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Rename export",
description="""Renames an export.
NOTE: This changes the friendly name of the export, not the filename.
""",
)
async def export_rename(event_id: str, body: ExportRenameBody, request: Request):
try:
export: Export = Export.get(Export.id == event_id)
await require_camera_access(export.camera, request=request)
except DoesNotExist:
return JSONResponse(
content=(
{
"success": False,
"message": "Export not found.",
}
),
status_code=404,
)
export.name = body.name
export.save()
return JSONResponse(
content=(
{
"success": True,
"message": "Successfully renamed export.",
}
),
status_code=200,
)
@router.post(
"/export/custom/{camera_name}/start/{start_time}/end/{end_time}",
response_model=StartExportResponse,
dependencies=[Depends(require_camera_access)],
summary="Start custom recording export",
description="""Starts an export of a recording for the specified time range using custom FFmpeg arguments.
The export can be from recordings or preview footage. Returns the export ID if
successful, or an error message if the camera is invalid or no recordings/previews
are found for the time range. If ffmpeg_input_args and ffmpeg_output_args are not provided,
defaults to timelapse export settings.""",
)
def export_recording_custom(
request: Request,
camera_name: str,
start_time: float,
end_time: float,
body: ExportRecordingsCustomBody,
):
camera_validation_error = _validate_camera_name(request, camera_name)
if camera_validation_error is not None:
return camera_validation_error
playback_source = body.source
friendly_name = body.name
existing_image, image_validation_error = _sanitize_existing_image(body.image_path)
if image_validation_error is not None:
return image_validation_error
ffmpeg_input_args = body.ffmpeg_input_args
ffmpeg_output_args = body.ffmpeg_output_args
cpu_fallback = body.cpu_fallback
export_case_id = body.export_case_id
case_validation_error = _validate_export_case(export_case_id)
if case_validation_error is not None:
return case_validation_error
source_error = _validate_export_source(
camera_name,
start_time,
end_time,
playback_source,
)
if source_error is not None:
return JSONResponse(
content={"success": False, "message": source_error},
status_code=400,
)
# Validate user-provided ffmpeg args to prevent injection.
# Admin users are trusted and skip validation.
is_admin = request.headers.get("remote-role", "") == "admin"
if not is_admin:
for args_label, args_value in [
("input", ffmpeg_input_args),
("output", ffmpeg_output_args),
]:
if args_value is not None:
valid, message = validate_ffmpeg_args(args_value)
if not valid:
return JSONResponse(
content=(
{
"success": False,
"message": f"Invalid ffmpeg {args_label} arguments: {message}",
}
),
status_code=400,
)
# Set default values if not provided (timelapse defaults)
if ffmpeg_input_args is None:
ffmpeg_input_args = ""
if ffmpeg_output_args is None:
ffmpeg_output_args = DEFAULT_TIME_LAPSE_FFMPEG_ARGS
export_job = _build_export_job(
camera_name,
start_time,
end_time,
friendly_name,
existing_image,
playback_source,
export_case_id,
ffmpeg_input_args,
ffmpeg_output_args,
cpu_fallback,
)
try:
start_export_job(request.app.frigate_config, export_job)
except ExportQueueFullError:
logger.warning("Export queue is full; rejecting %s", export_job.id)
return JSONResponse(
content={
"success": False,
"message": "Export queue is full. Try again once current exports finish.",
},
status_code=503,
)
return JSONResponse(
content=(
{
"success": True,
"message": "Export queued.",
"export_id": export_job.id,
"status": "queued",
}
),
status_code=202,
)
@router.get(
"/exports/{export_id}",
response_model=ExportModel,
dependencies=[Depends(allow_any_authenticated())],
summary="Get a single export",
description="""Gets a specific export by ID. The user must have access to the camera
associated with the export.""",
)
async def get_export(export_id: str, request: Request):
try:
export = Export.get(Export.id == export_id)
await require_camera_access(export.camera, request=request)
return JSONResponse(content=model_to_dict(export))
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export not found"},
status_code=404,
)
def _get_files_in_use() -> set[str]:
"""Get set of export filenames currently in use by ffmpeg."""
files_in_use: set[str] = set()
for process in psutil.process_iter():
try:
if process.name() != "ffmpeg":
continue
file_list = process.open_files()
if file_list:
for nt in file_list:
if nt.path.startswith(EXPORT_DIR):
files_in_use.add(nt.path.split("/")[-1])
except psutil.Error:
continue
return files_in_use
@router.post(
"/exports/delete",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Bulk delete exports",
description="Deletes one or more exports by ID. All IDs must exist and none can be in-progress.",
)
def bulk_delete_exports(body: ExportBulkDeleteBody):
exports = list(Export.select().where(Export.id << body.ids))
if len(exports) != len(body.ids):
return JSONResponse(
content={"success": False, "message": "One or more exports not found."},
status_code=404,
)
files_in_use = _get_files_in_use()
for export in exports:
if export.video_path.split("/")[-1] in files_in_use:
return JSONResponse(
content={
"success": False,
"message": "Can not delete in-progress export.",
},
status_code=400,
)
for export in exports:
Path(export.video_path).unlink(missing_ok=True)
if export.thumb_path:
Path(export.thumb_path).unlink(missing_ok=True)
Export.delete().where(Export.id << body.ids).execute()
return JSONResponse(
content={
"success": True,
"message": f"Successfully deleted {len(exports)} export(s).",
},
status_code=200,
)
@router.post(
"/exports/reassign",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Bulk reassign exports to a case",
description="Assigns or unassigns one or more exports to/from a case. All IDs must exist.",
)
def bulk_reassign_exports(body: ExportBulkReassignBody):
exports = list(Export.select().where(Export.id << body.ids))
if len(exports) != len(body.ids):
return JSONResponse(
content={"success": False, "message": "One or more exports not found."},
status_code=404,
)
if body.export_case_id is not None:
try:
ExportCase.get(ExportCase.id == body.export_case_id)
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found."},
status_code=404,
)
Export.update(export_case=body.export_case_id).where(
Export.id << body.ids
).execute()
return JSONResponse(
content={
"success": True,
"message": f"Successfully updated {len(exports)} export(s).",
},
status_code=200,
)