blakeblackshear.frigate/frigate/api/app.py
Josh Hawkins 9d85136f8f
Add Camera Wizard (#20461)
* fetch more from ffprobe

* add detailed param to ffprobe endpoint

* add dots variant to step indicator

* add classname

* tweak colors for dark mode to match figma

* add step 1 form

* add helper function for ffmpeg snapshot

* add go2rtc stream add and ffprobe snapshot endpoints

* add camera image and stream details on successful test

* step 1 tweaks

* step 2 and i18n

* types

* step 1 and 2 tweaks

* add wizard to camera settings view

* add data unit i18n keys

* restream tweak

* fix type

* implement rough idea for step 3

* add api endpoint to delete stream from go2rtc

* add main wizard dialog component

* extract logic for friendly_name and use in wizard

* add i18n and popover for brand url

* add camera name to top

* consolidate validation logic

* prevent dialog from closing when clicking outside

* center camera name on mobile

* add help/docs link popovers

* keep spaces in friendly name

* add stream details to overlay like stats in liveplayer

* add validation results pane to step 3

* ensure test is invalidated if stream is changed

* only display validation results and enable save button if all streams have been tested

* tweaks

* normalize camera name to lower case and improve hash generation

* move wizard to subfolder

* tweaks

* match look of camera edit form to wizard

* move wizard and edit form to its own component

* move enabled/disabled switch to management section

* clean up

* fixes

* fix mobile
2025-10-13 10:52:08 -06:00

1069 lines
33 KiB
Python

"""Main api runner."""
import asyncio
import copy
import json
import logging
import os
import traceback
import urllib
from datetime import datetime, timedelta
from functools import reduce
from io import StringIO
from pathlib import Path as FilePath
from typing import Any, Dict, List, Optional
import aiofiles
import requests
import ruamel.yaml
from fastapi import APIRouter, Body, Path, Request, Response
from fastapi.encoders import jsonable_encoder
from fastapi.params import Depends
from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
from markupsafe import escape
from peewee import SQL, fn, operator
from pydantic import ValidationError
from frigate.api.auth import require_role
from frigate.api.defs.query.app_query_parameters import AppTimelineHourlyQueryParameters
from frigate.api.defs.request.app_body import AppConfigSetBody
from frigate.api.defs.tags import Tags
from frigate.config import FrigateConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateTopic,
)
from frigate.models import Event, Timeline
from frigate.stats.prometheus import get_metrics, update_metrics
from frigate.util.builtin import (
clean_camera_user_pass,
flatten_config_data,
get_tz_modifiers,
process_config_query_string,
update_yaml_file_bulk,
)
from frigate.util.config import find_config_file
from frigate.util.image import run_ffmpeg_snapshot
from frigate.util.services import (
ffprobe_stream,
get_nvidia_driver_info,
process_logs,
restart_frigate,
vainfo_hwaccel,
)
from frigate.version import VERSION
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.app])
@router.get("/", response_class=PlainTextResponse)
def is_healthy():
return "Frigate is running. Alive and healthy!"
@router.get("/config/schema.json")
def config_schema(request: Request):
return Response(
content=request.app.frigate_config.schema_json(), media_type="application/json"
)
@router.get("/go2rtc/streams")
def go2rtc_streams():
r = requests.get("http://127.0.0.1:1984/api/streams")
if not r.ok:
logger.error("Failed to fetch streams from go2rtc")
return JSONResponse(
content=({"success": False, "message": "Error fetching stream data"}),
status_code=500,
)
stream_data = r.json()
for data in stream_data.values():
for producer in data.get("producers") or []:
producer["url"] = clean_camera_user_pass(producer.get("url", ""))
return JSONResponse(content=stream_data)
@router.get("/go2rtc/streams/{camera_name}")
def go2rtc_camera_stream(request: Request, camera_name: str):
r = requests.get(
f"http://127.0.0.1:1984/api/streams?src={camera_name}&video=all&audio=all&microphone"
)
if not r.ok:
camera_config = request.app.frigate_config.cameras.get(camera_name)
if camera_config and camera_config.enabled:
logger.error("Failed to fetch streams from go2rtc")
return JSONResponse(
content=({"success": False, "message": "Error fetching stream data"}),
status_code=500,
)
stream_data = r.json()
for producer in stream_data.get("producers", []):
producer["url"] = clean_camera_user_pass(producer.get("url", ""))
return JSONResponse(content=stream_data)
@router.put(
"/go2rtc/streams/{stream_name}", dependencies=[Depends(require_role(["admin"]))]
)
def go2rtc_add_stream(request: Request, stream_name: str, src: str = ""):
"""Add or update a go2rtc stream configuration."""
try:
params = {"name": stream_name}
if src:
params["src"] = src
r = requests.put(
"http://127.0.0.1:1984/api/streams",
params=params,
timeout=10,
)
if not r.ok:
logger.error(f"Failed to add go2rtc stream {stream_name}: {r.text}")
return JSONResponse(
content=(
{"success": False, "message": f"Failed to add stream: {r.text}"}
),
status_code=r.status_code,
)
return JSONResponse(
content={"success": True, "message": "Stream added successfully"}
)
except requests.RequestException as e:
logger.error(f"Error communicating with go2rtc: {e}")
return JSONResponse(
content=(
{
"success": False,
"message": "Error communicating with go2rtc",
}
),
status_code=500,
)
@router.delete(
"/go2rtc/streams/{stream_name}", dependencies=[Depends(require_role(["admin"]))]
)
def go2rtc_delete_stream(stream_name: str):
"""Delete a go2rtc stream."""
try:
r = requests.delete(
"http://127.0.0.1:1984/api/streams",
params={"src": stream_name},
timeout=10,
)
if not r.ok:
logger.error(f"Failed to delete go2rtc stream {stream_name}: {r.text}")
return JSONResponse(
content=(
{"success": False, "message": f"Failed to delete stream: {r.text}"}
),
status_code=r.status_code,
)
return JSONResponse(
content={"success": True, "message": "Stream deleted successfully"}
)
except requests.RequestException as e:
logger.error(f"Error communicating with go2rtc: {e}")
return JSONResponse(
content=(
{
"success": False,
"message": "Error communicating with go2rtc",
}
),
status_code=500,
)
@router.get("/version", response_class=PlainTextResponse)
def version():
return VERSION
@router.get("/stats")
def stats(request: Request):
return JSONResponse(content=request.app.stats_emitter.get_latest_stats())
@router.get("/stats/history")
def stats_history(request: Request, keys: str = None):
if keys:
keys = keys.split(",")
return JSONResponse(content=request.app.stats_emitter.get_stats_history(keys))
@router.get("/metrics")
def metrics(request: Request):
"""Expose Prometheus metrics endpoint and update metrics with latest stats"""
# Retrieve the latest statistics and update the Prometheus metrics
stats = request.app.stats_emitter.get_latest_stats()
# query DB for count of events by camera, label
event_counts: List[Dict[str, Any]] = (
Event.select(Event.camera, Event.label, fn.Count())
.group_by(Event.camera, Event.label)
.dicts()
)
update_metrics(stats=stats, event_counts=event_counts)
content, content_type = get_metrics()
return Response(content=content, media_type=content_type)
@router.get("/config")
def config(request: Request):
config_obj: FrigateConfig = request.app.frigate_config
config: dict[str, dict[str, Any]] = config_obj.model_dump(
mode="json", warnings="none", exclude_none=True
)
# remove the mqtt password
config["mqtt"].pop("password", None)
# remove the proxy secret
config["proxy"].pop("auth_secret", None)
for camera_name, camera in request.app.frigate_config.cameras.items():
camera_dict = config["cameras"][camera_name]
# clean paths
for input in camera_dict.get("ffmpeg", {}).get("inputs", []):
input["path"] = clean_camera_user_pass(input["path"])
# add clean ffmpeg_cmds
camera_dict["ffmpeg_cmds"] = copy.deepcopy(camera.ffmpeg_cmds)
for cmd in camera_dict["ffmpeg_cmds"]:
cmd["cmd"] = clean_camera_user_pass(" ".join(cmd["cmd"]))
# ensure that zones are relative
for zone_name, zone in config_obj.cameras[camera_name].zones.items():
camera_dict["zones"][zone_name]["color"] = zone.color
# remove go2rtc stream passwords
go2rtc: dict[str, Any] = config_obj.go2rtc.model_dump(
mode="json", warnings="none", exclude_none=True
)
for stream_name, stream in go2rtc.get("streams", {}).items():
if stream is None:
continue
if isinstance(stream, str):
cleaned = clean_camera_user_pass(stream)
else:
cleaned = []
for item in stream:
cleaned.append(clean_camera_user_pass(item))
config["go2rtc"]["streams"][stream_name] = cleaned
config["plus"] = {"enabled": request.app.frigate_config.plus_api.is_active()}
config["model"]["colormap"] = config_obj.model.colormap
config["model"]["all_attributes"] = config_obj.model.all_attributes
config["model"]["non_logo_attributes"] = config_obj.model.non_logo_attributes
# Add model plus data if plus is enabled
if config["plus"]["enabled"]:
model_path = config.get("model", {}).get("path")
if model_path:
model_json_path = FilePath(model_path).with_suffix(".json")
try:
with open(model_json_path, "r") as f:
model_plus_data = json.load(f)
config["model"]["plus"] = model_plus_data
except FileNotFoundError:
config["model"]["plus"] = None
except json.JSONDecodeError:
config["model"]["plus"] = None
else:
config["model"]["plus"] = None
# use merged labelamp
for detector_config in config["detectors"].values():
detector_config["model"]["labelmap"] = (
request.app.frigate_config.model.merged_labelmap
)
return JSONResponse(content=config)
@router.get("/config/raw")
def config_raw():
config_file = find_config_file()
if not os.path.isfile(config_file):
return JSONResponse(
content=({"success": False, "message": "Could not find file"}),
status_code=404,
)
with open(config_file, "r") as f:
raw_config = f.read()
f.close()
return JSONResponse(
content=raw_config, media_type="text/plain", status_code=200
)
@router.post("/config/save", dependencies=[Depends(require_role(["admin"]))])
def config_save(save_option: str, body: Any = Body(media_type="text/plain")):
new_config = body.decode()
if not new_config:
return JSONResponse(
content=(
{"success": False, "message": "Config with body param is required"}
),
status_code=400,
)
# Validate the config schema
try:
# Use ruamel to parse and preserve line numbers
yaml_config = ruamel.yaml.YAML()
yaml_config.preserve_quotes = True
full_config = yaml_config.load(StringIO(new_config))
FrigateConfig.parse_yaml(new_config)
except ValidationError as e:
error_message = []
for error in e.errors():
error_path = error["loc"]
current = full_config
line_number = "Unknown"
last_line_number = "Unknown"
try:
for i, part in enumerate(error_path):
key = int(part) if part.isdigit() else part
if isinstance(current, ruamel.yaml.comments.CommentedMap):
current = current[key]
elif isinstance(current, list):
current = current[key]
if hasattr(current, "lc"):
last_line_number = current.lc.line
if i == len(error_path) - 1:
if hasattr(current, "lc"):
line_number = current.lc.line
else:
line_number = last_line_number
except Exception:
line_number = "Unable to determine"
error_message.append(
f"Line {line_number}: {' -> '.join(map(str, error_path))} - {error.get('msg', error.get('type', 'Unknown'))}"
)
return JSONResponse(
content=(
{
"success": False,
"message": "Your configuration is invalid.\nSee the official documentation at docs.frigate.video.\n\n"
+ "\n".join(error_message),
}
),
status_code=400,
)
except Exception:
return JSONResponse(
content=(
{
"success": False,
"message": f"\nYour configuration is invalid.\nSee the official documentation at docs.frigate.video.\n\n{escape(str(traceback.format_exc()))}",
}
),
status_code=400,
)
# Save the config to file
try:
config_file = find_config_file()
with open(config_file, "w") as f:
f.write(new_config)
f.close()
except Exception:
return JSONResponse(
content=(
{
"success": False,
"message": "Could not write config file, be sure that Frigate has write permission on the config file.",
}
),
status_code=400,
)
if save_option == "restart":
try:
restart_frigate()
except Exception as e:
logging.error(f"Error restarting Frigate: {e}")
return JSONResponse(
content=(
{
"success": True,
"message": "Config successfully saved, unable to restart Frigate",
}
),
status_code=200,
)
return JSONResponse(
content=(
{
"success": True,
"message": "Config successfully saved, restarting (this can take up to one minute)...",
}
),
status_code=200,
)
else:
return JSONResponse(
content=({"success": True, "message": "Config successfully saved."}),
status_code=200,
)
@router.put("/config/set", dependencies=[Depends(require_role(["admin"]))])
def config_set(request: Request, body: AppConfigSetBody):
config_file = find_config_file()
with open(config_file, "r") as f:
old_raw_config = f.read()
try:
updates = {}
# process query string parameters (takes precedence over body.config_data)
parsed_url = urllib.parse.urlparse(str(request.url))
query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
# Filter out empty keys but keep blank values for non-empty keys
query_string = {k: v for k, v in query_string.items() if k}
if query_string:
updates = process_config_query_string(query_string)
elif body.config_data:
updates = flatten_config_data(body.config_data)
if not updates:
return JSONResponse(
content=(
{"success": False, "message": "No configuration data provided"}
),
status_code=400,
)
# apply all updates in a single operation
update_yaml_file_bulk(config_file, updates)
# validate the updated config
with open(config_file, "r") as f:
new_raw_config = f.read()
try:
config = FrigateConfig.parse(new_raw_config)
except Exception:
with open(config_file, "w") as f:
f.write(old_raw_config)
f.close()
logger.error(f"\nConfig Error:\n\n{str(traceback.format_exc())}")
return JSONResponse(
content=(
{
"success": False,
"message": "Error parsing config. Check logs for error message.",
}
),
status_code=400,
)
except Exception as e:
logging.error(f"Error updating config: {e}")
return JSONResponse(
content=({"success": False, "message": "Error updating config"}),
status_code=500,
)
if body.requires_restart == 0 or body.update_topic:
old_config: FrigateConfig = request.app.frigate_config
request.app.frigate_config = config
if body.update_topic and body.update_topic.startswith("config/cameras/"):
_, _, camera, field = body.update_topic.split("/")
if field == "add":
settings = config.cameras[camera]
elif field == "remove":
settings = old_config.cameras[camera]
else:
settings = config.get_nested_object(body.update_topic)
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum[field], camera),
settings,
)
return JSONResponse(
content=(
{
"success": True,
"message": "Config successfully updated, restart to apply",
}
),
status_code=200,
)
@router.get("/ffprobe")
def ffprobe(request: Request, paths: str = "", detailed: bool = False):
path_param = paths
if not path_param:
return JSONResponse(
content=({"success": False, "message": "Path needs to be provided."}),
status_code=404,
)
if path_param.startswith("camera"):
camera = path_param[7:]
if camera not in request.app.frigate_config.cameras.keys():
return JSONResponse(
content=(
{"success": False, "message": f"{camera} is not a valid camera."}
),
status_code=404,
)
if not request.app.frigate_config.cameras[camera].enabled:
return JSONResponse(
content=({"success": False, "message": f"{camera} is not enabled."}),
status_code=404,
)
paths = map(
lambda input: input.path,
request.app.frigate_config.cameras[camera].ffmpeg.inputs,
)
elif "," in clean_camera_user_pass(path_param):
paths = path_param.split(",")
else:
paths = [path_param]
# user has multiple streams
output = []
for path in paths:
ffprobe = ffprobe_stream(
request.app.frigate_config.ffmpeg, path.strip(), detailed=detailed
)
result = {
"return_code": ffprobe.returncode,
"stderr": (
ffprobe.stderr.decode("unicode_escape").strip()
if ffprobe.returncode != 0
else ""
),
"stdout": (
json.loads(ffprobe.stdout.decode("unicode_escape").strip())
if ffprobe.returncode == 0
else ""
),
}
# Add detailed metadata if requested and probe was successful
if detailed and ffprobe.returncode == 0 and result["stdout"]:
try:
probe_data = result["stdout"]
metadata = {}
# Extract video stream information
video_stream = None
audio_stream = None
for stream in probe_data.get("streams", []):
if stream.get("codec_type") == "video":
video_stream = stream
elif stream.get("codec_type") == "audio":
audio_stream = stream
# Video metadata
if video_stream:
metadata["video"] = {
"codec": video_stream.get("codec_name"),
"width": video_stream.get("width"),
"height": video_stream.get("height"),
"fps": _extract_fps(video_stream.get("r_frame_rate")),
"pixel_format": video_stream.get("pix_fmt"),
"profile": video_stream.get("profile"),
"level": video_stream.get("level"),
}
# Calculate resolution string
if video_stream.get("width") and video_stream.get("height"):
metadata["video"]["resolution"] = (
f"{video_stream['width']}x{video_stream['height']}"
)
# Audio metadata
if audio_stream:
metadata["audio"] = {
"codec": audio_stream.get("codec_name"),
"channels": audio_stream.get("channels"),
"sample_rate": audio_stream.get("sample_rate"),
"channel_layout": audio_stream.get("channel_layout"),
}
# Container/format metadata
if probe_data.get("format"):
format_info = probe_data["format"]
metadata["container"] = {
"format": format_info.get("format_name"),
"duration": format_info.get("duration"),
"size": format_info.get("size"),
}
result["metadata"] = metadata
except Exception as e:
logger.warning(f"Failed to extract detailed metadata: {e}")
# Continue without metadata if parsing fails
output.append(result)
return JSONResponse(content=output)
@router.get("/ffprobe/snapshot", dependencies=[Depends(require_role(["admin"]))])
def ffprobe_snapshot(request: Request, url: str = "", timeout: int = 10):
"""Get a snapshot from a stream URL using ffmpeg."""
if not url:
return JSONResponse(
content={"success": False, "message": "URL parameter is required"},
status_code=400,
)
config: FrigateConfig = request.app.frigate_config
image_data, error = run_ffmpeg_snapshot(
config.ffmpeg, url, "mjpeg", timeout=timeout
)
if image_data:
return Response(
image_data,
media_type="image/jpeg",
headers={"Cache-Control": "no-store"},
)
elif error == "timeout":
return JSONResponse(
content={"success": False, "message": "Timeout capturing snapshot"},
status_code=408,
)
else:
logger.error(f"ffmpeg failed: {error}")
return JSONResponse(
content={"success": False, "message": "Failed to capture snapshot"},
status_code=500,
)
def _extract_fps(r_frame_rate: str) -> float | None:
"""Extract FPS from ffprobe r_frame_rate string (e.g., '30/1' -> 30.0)"""
if not r_frame_rate:
return None
try:
num, den = r_frame_rate.split("/")
return round(float(num) / float(den), 2)
except (ValueError, ZeroDivisionError):
return None
@router.get("/vainfo")
def vainfo():
vainfo = vainfo_hwaccel()
return JSONResponse(
content={
"return_code": vainfo.returncode,
"stderr": (
vainfo.stderr.decode("unicode_escape").strip()
if vainfo.returncode != 0
else ""
),
"stdout": (
vainfo.stdout.decode("unicode_escape").strip()
if vainfo.returncode == 0
else ""
),
}
)
@router.get("/nvinfo")
def nvinfo():
return JSONResponse(content=get_nvidia_driver_info())
@router.get("/logs/{service}", tags=[Tags.logs])
async def logs(
service: str = Path(enum=["frigate", "nginx", "go2rtc"]),
download: Optional[str] = None,
stream: Optional[bool] = False,
start: Optional[int] = 0,
end: Optional[int] = None,
):
"""Get logs for the requested service (frigate/nginx/go2rtc)"""
def download_logs(service_location: str):
try:
file = open(service_location, "r")
contents = file.read()
file.close()
return JSONResponse(jsonable_encoder(contents))
except FileNotFoundError as e:
logger.error(e)
return JSONResponse(
content={"success": False, "message": "Could not find log file"},
status_code=500,
)
async def stream_logs(file_path: str):
"""Asynchronously stream log lines."""
buffer = ""
try:
async with aiofiles.open(file_path, "r") as file:
await file.seek(0, 2)
while True:
line = await file.readline()
if line:
buffer += line
# Process logs only when there are enough lines in the buffer
if "\n" in buffer:
_, processed_lines = process_logs(buffer, service)
buffer = ""
for processed_line in processed_lines:
yield f"{processed_line}\n"
else:
await asyncio.sleep(0.1)
except FileNotFoundError:
yield "Log file not found.\n"
log_locations = {
"frigate": "/dev/shm/logs/frigate/current",
"go2rtc": "/dev/shm/logs/go2rtc/current",
"nginx": "/dev/shm/logs/nginx/current",
}
service_location = log_locations.get(service)
if not service_location:
return JSONResponse(
content={"success": False, "message": "Not a valid service"},
status_code=404,
)
if download:
return download_logs(service_location)
if stream:
return StreamingResponse(stream_logs(service_location), media_type="text/plain")
# For full logs initially
try:
async with aiofiles.open(service_location, "r") as file:
contents = await file.read()
total_lines, log_lines = process_logs(contents, service, start, end)
return JSONResponse(
content={"totalLines": total_lines, "lines": log_lines},
status_code=200,
)
except FileNotFoundError as e:
logger.error(e)
return JSONResponse(
content={"success": False, "message": "Could not find log file"},
status_code=500,
)
@router.post("/restart", dependencies=[Depends(require_role(["admin"]))])
def restart():
try:
restart_frigate()
except Exception as e:
logging.error(f"Error restarting Frigate: {e}")
return JSONResponse(
content=(
{
"success": False,
"message": "Unable to restart Frigate.",
}
),
status_code=500,
)
return JSONResponse(
content=(
{
"success": True,
"message": "Restarting (this can take up to one minute)...",
}
),
status_code=200,
)
@router.get("/labels")
def get_labels(camera: str = ""):
try:
if camera:
events = Event.select(Event.label).where(Event.camera == camera).distinct()
else:
events = Event.select(Event.label).distinct()
except Exception as e:
logger.error(e)
return JSONResponse(
content=({"success": False, "message": "Failed to get labels"}),
status_code=404,
)
labels = sorted([e.label for e in events])
return JSONResponse(content=labels)
@router.get("/sub_labels")
def get_sub_labels(split_joined: Optional[int] = None):
try:
events = Event.select(Event.sub_label).distinct()
except Exception:
return JSONResponse(
content=({"success": False, "message": "Failed to get sub_labels"}),
status_code=404,
)
sub_labels = [e.sub_label for e in events]
if None in sub_labels:
sub_labels.remove(None)
if split_joined:
original_labels = sub_labels.copy()
for label in original_labels:
if "," in label:
sub_labels.remove(label)
parts = label.split(",")
for part in parts:
if part.strip() not in sub_labels:
sub_labels.append(part.strip())
sub_labels.sort()
return JSONResponse(content=sub_labels)
@router.get("/plus/models")
def plusModels(request: Request, filterByCurrentModelDetector: bool = False):
if not request.app.frigate_config.plus_api.is_active():
return JSONResponse(
content=({"success": False, "message": "Frigate+ is not enabled"}),
status_code=400,
)
models: dict[Any, Any] = request.app.frigate_config.plus_api.get_models()
if not models["list"]:
return JSONResponse(
content=({"success": False, "message": "No models found"}),
status_code=400,
)
modelList = models["list"]
# current model type
modelType = request.app.frigate_config.model.model_type
# current detectorType for comparing to supportedDetectors
detectorType = list(request.app.frigate_config.detectors.values())[0].type
validModels = []
for model in sorted(
filter(
lambda m: (
not filterByCurrentModelDetector
or (detectorType in m["supportedDetectors"] and modelType in m["type"])
),
modelList,
),
key=(lambda m: m["trainDate"]),
reverse=True,
):
validModels.append(model)
return JSONResponse(content=validModels)
@router.get("/recognized_license_plates")
def get_recognized_license_plates(split_joined: Optional[int] = None):
try:
query = (
Event.select(
SQL("json_extract(data, '$.recognized_license_plate') AS plate")
)
.where(SQL("json_extract(data, '$.recognized_license_plate') IS NOT NULL"))
.distinct()
)
recognized_license_plates = [row[0] for row in query.tuples()]
except Exception:
return JSONResponse(
content=(
{"success": False, "message": "Failed to get recognized license plates"}
),
status_code=404,
)
if split_joined:
original_recognized_license_plates = recognized_license_plates.copy()
for recognized_license_plate in original_recognized_license_plates:
if recognized_license_plate and "," in recognized_license_plate:
recognized_license_plates.remove(recognized_license_plate)
parts = recognized_license_plate.split(",")
for part in parts:
if part.strip() not in recognized_license_plates:
recognized_license_plates.append(part.strip())
recognized_license_plates = list(set(recognized_license_plates))
recognized_license_plates.sort()
return JSONResponse(content=recognized_license_plates)
@router.get("/timeline")
def timeline(camera: str = "all", limit: int = 100, source_id: Optional[str] = None):
clauses = []
selected_columns = [
Timeline.timestamp,
Timeline.camera,
Timeline.source,
Timeline.source_id,
Timeline.class_type,
Timeline.data,
]
if camera != "all":
clauses.append((Timeline.camera == camera))
if source_id:
clauses.append((Timeline.source_id == source_id))
if len(clauses) == 0:
clauses.append((True))
timeline = (
Timeline.select(*selected_columns)
.where(reduce(operator.and_, clauses))
.order_by(Timeline.timestamp.asc())
.limit(limit)
.dicts()
)
return JSONResponse(content=[t for t in timeline])
@router.get("/timeline/hourly")
def hourly_timeline(params: AppTimelineHourlyQueryParameters = Depends()):
"""Get hourly summary for timeline."""
cameras = params.cameras
labels = params.labels
before = params.before
after = params.after
limit = params.limit
tz_name = params.timezone
_, minute_modifier, _ = get_tz_modifiers(tz_name)
minute_offset = int(minute_modifier.split(" ")[0])
clauses = []
if cameras != "all":
camera_list = cameras.split(",")
clauses.append((Timeline.camera << camera_list))
if labels != "all":
label_list = labels.split(",")
clauses.append((Timeline.data["label"] << label_list))
if before:
clauses.append((Timeline.timestamp < before))
if after:
clauses.append((Timeline.timestamp > after))
if len(clauses) == 0:
clauses.append((True))
timeline = (
Timeline.select(
Timeline.camera,
Timeline.timestamp,
Timeline.data,
Timeline.class_type,
Timeline.source_id,
Timeline.source,
)
.where(reduce(operator.and_, clauses))
.order_by(Timeline.timestamp.desc())
.limit(limit)
.dicts()
.iterator()
)
count = 0
start = 0
end = 0
hours: dict[str, list[dict[str, Any]]] = {}
for t in timeline:
if count == 0:
start = t["timestamp"]
else:
end = t["timestamp"]
count += 1
hour = (
datetime.fromtimestamp(t["timestamp"]).replace(
minute=0, second=0, microsecond=0
)
+ timedelta(
minutes=minute_offset,
)
).timestamp()
if hour not in hours:
hours[hour] = [t]
else:
hours[hour].insert(0, t)
return JSONResponse(
content={
"start": start,
"end": end,
"count": count,
"hours": hours,
}
)