blakeblackshear.frigate/frigate/api/export.py
Josh Hawkins 74ca009b0b
UI viewer role (#16978)
* db migration

* db model

* assign admin role on password reset

* add role to jwt and api responses

* don't restrict api access for admins yet

* use json response

* frontend auth context

* update auth form for profile endpoint

* add access denied page

* add protected routes

* auth hook

* dialogs

* user settings view

* restrict viewer access to settings

* restrict camera functions for viewer role

* add password dialog to account menu

* spacing tweak

* migrator default to admin

* escape quotes in migrator

* ui tweaks

* tweaks

* colors

* colors

* fix merge conflict

* fix icons

* add api layer enforcement

* ui tweaks

* fix error message

* debug

* clean up

* remove print

* guard apis for admin only

* fix tests

* fix review tests

* use correct error responses from api in toasts

* add role to account menu
2025-03-08 10:01:08 -06:00

226 lines
6.4 KiB
Python

"""Export apis."""
import logging
import random
import string
from pathlib import Path
import psutil
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse
from peewee import DoesNotExist
from playhouse.shortcuts import model_to_dict
from frigate.api.auth import require_role
from frigate.api.defs.request.export_recordings_body import ExportRecordingsBody
from frigate.api.defs.request.export_rename_body import ExportRenameBody
from frigate.api.defs.tags import Tags
from frigate.const import EXPORT_DIR
from frigate.models import Export, Previews, Recordings
from frigate.record.export import (
PlaybackFactorEnum,
PlaybackSourceEnum,
RecordingExporter,
)
from frigate.util.builtin import is_current_hour
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.export])
@router.get("/exports")
def get_exports():
exports = Export.select().order_by(Export.date.desc()).dicts().iterator()
return JSONResponse(content=[e for e in exports])
@router.post("/export/{camera_name}/start/{start_time}/end/{end_time}")
def export_recording(
request: Request,
camera_name: str,
start_time: float,
end_time: float,
body: ExportRecordingsBody,
):
if not camera_name or not request.app.frigate_config.cameras.get(camera_name):
return JSONResponse(
content=(
{"success": False, "message": f"{camera_name} is not a valid camera."}
),
status_code=404,
)
playback_factor = body.playback
playback_source = body.source
friendly_name = body.name
existing_image = body.image_path
if playback_source == "recordings":
recordings_count = (
Recordings.select()
.where(
Recordings.start_time.between(start_time, end_time)
| Recordings.end_time.between(start_time, end_time)
| (
(start_time > Recordings.start_time)
& (end_time < Recordings.end_time)
)
)
.where(Recordings.camera == camera_name)
.count()
)
if recordings_count <= 0:
return JSONResponse(
content=(
{"success": False, "message": "No recordings found for time range"}
),
status_code=400,
)
else:
previews_count = (
Previews.select()
.where(
Previews.start_time.between(start_time, end_time)
| Previews.end_time.between(start_time, end_time)
| ((start_time > Previews.start_time) & (end_time < Previews.end_time))
)
.where(Previews.camera == camera_name)
.count()
)
if not is_current_hour(start_time) and previews_count <= 0:
return JSONResponse(
content=(
{"success": False, "message": "No previews found for time range"}
),
status_code=400,
)
export_id = f"{camera_name}_{''.join(random.choices(string.ascii_lowercase + string.digits, k=6))}"
exporter = RecordingExporter(
request.app.frigate_config,
export_id,
camera_name,
friendly_name,
existing_image,
int(start_time),
int(end_time),
(
PlaybackFactorEnum[playback_factor]
if playback_factor in PlaybackFactorEnum.__members__.values()
else PlaybackFactorEnum.realtime
),
(
PlaybackSourceEnum[playback_source]
if playback_source in PlaybackSourceEnum.__members__.values()
else PlaybackSourceEnum.recordings
),
)
exporter.start()
return JSONResponse(
content=(
{
"success": True,
"message": "Starting export of recording.",
"export_id": export_id,
}
),
status_code=200,
)
@router.patch(
"/export/{event_id}/rename", dependencies=[Depends(require_role(["admin"]))]
)
def export_rename(event_id: str, body: ExportRenameBody):
try:
export: Export = Export.get(Export.id == event_id)
except DoesNotExist:
return JSONResponse(
content=(
{
"success": False,
"message": "Export not found.",
}
),
status_code=404,
)
export.name = body.name
export.save()
return JSONResponse(
content=(
{
"success": True,
"message": "Successfully renamed export.",
}
),
status_code=200,
)
@router.delete("/export/{event_id}", dependencies=[Depends(require_role(["admin"]))])
def export_delete(event_id: str):
try:
export: Export = Export.get(Export.id == event_id)
except DoesNotExist:
return JSONResponse(
content=(
{
"success": False,
"message": "Export not found.",
}
),
status_code=404,
)
files_in_use = []
for process in psutil.process_iter():
try:
if process.name() != "ffmpeg":
continue
file_list = process.open_files()
if file_list:
for nt in file_list:
if nt.path.startswith(EXPORT_DIR):
files_in_use.append(nt.path.split("/")[-1])
except psutil.Error:
continue
if export.video_path.split("/")[-1] in files_in_use:
return JSONResponse(
content=(
{"success": False, "message": "Can not delete in progress export."}
),
status_code=400,
)
Path(export.video_path).unlink(missing_ok=True)
if export.thumb_path:
Path(export.thumb_path).unlink(missing_ok=True)
export.delete_instance()
return JSONResponse(
content=(
{
"success": True,
"message": "Successfully deleted export.",
}
),
status_code=200,
)
@router.get("/exports/{export_id}")
def get_export(export_id: str):
try:
return JSONResponse(content=model_to_dict(Export.get(Export.id == export_id)))
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export not found"},
status_code=404,
)