mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	* reload the window on 401 * backend apis for auth * add login page * re-enable web linter * fix login page routing * bypass csrf for internal auth endpoint * disable healthcheck in devcontainer target * include login page in vite build * redirect to login page on 401 * implement config for users and settings * implement JWT actual secret * add brute force protection on login * add support for redirecting from auth failures on api calls * return location for redirect * default cookie name should pass regex test * set hash iterations to current OWASP recommendation * move users to database instead of config * config option to reset admin password on startup * user management UI * check for deleted user on refresh * validate username and fixes * remove password constraint * cleanup * fix user check on refresh * web fixes * implement auth via new external port * use x-forwarded-for to rate limit login attempts by ip * implement logout and profile * fixes * lint fixes * add support for user passthru from upstream proxies * add support for specifying a logout url * add documentation * Update docs/docs/configuration/authentication.md Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com> * Update docs/docs/configuration/authentication.md Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com> --------- Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
		
			
				
	
	
		
			714 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			714 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Main api runner."""
 | 
						|
 | 
						|
import copy
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import traceback
 | 
						|
from datetime import datetime, timedelta
 | 
						|
from functools import reduce
 | 
						|
 | 
						|
import requests
 | 
						|
from flask import Blueprint, Flask, current_app, jsonify, make_response, request
 | 
						|
from markupsafe import escape
 | 
						|
from peewee import operator
 | 
						|
from playhouse.sqliteq import SqliteQueueDatabase
 | 
						|
from werkzeug.middleware.proxy_fix import ProxyFix
 | 
						|
 | 
						|
from frigate.api.auth import AuthBp, get_jwt_secret, limiter
 | 
						|
from frigate.api.event import EventBp
 | 
						|
from frigate.api.export import ExportBp
 | 
						|
from frigate.api.media import MediaBp
 | 
						|
from frigate.api.preview import PreviewBp
 | 
						|
from frigate.api.review import ReviewBp
 | 
						|
from frigate.config import AuthModeEnum, FrigateConfig
 | 
						|
from frigate.const import CONFIG_DIR
 | 
						|
from frigate.events.external import ExternalEventProcessor
 | 
						|
from frigate.models import Event, Timeline
 | 
						|
from frigate.plus import PlusApi
 | 
						|
from frigate.ptz.onvif import OnvifController
 | 
						|
from frigate.stats.emitter import StatsEmitter
 | 
						|
from frigate.storage import StorageMaintainer
 | 
						|
from frigate.util.builtin import (
 | 
						|
    clean_camera_user_pass,
 | 
						|
    get_tz_modifiers,
 | 
						|
    update_yaml_from_url,
 | 
						|
)
 | 
						|
from frigate.util.services import ffprobe_stream, restart_frigate, vainfo_hwaccel
 | 
						|
from frigate.version import VERSION
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
bp = Blueprint("frigate", __name__)
 | 
						|
bp.register_blueprint(EventBp)
 | 
						|
bp.register_blueprint(ExportBp)
 | 
						|
bp.register_blueprint(MediaBp)
 | 
						|
bp.register_blueprint(PreviewBp)
 | 
						|
bp.register_blueprint(ReviewBp)
 | 
						|
bp.register_blueprint(AuthBp)
 | 
						|
 | 
						|
 | 
						|
def create_app(
 | 
						|
    frigate_config,
 | 
						|
    database: SqliteQueueDatabase,
 | 
						|
    detected_frames_processor,
 | 
						|
    storage_maintainer: StorageMaintainer,
 | 
						|
    onvif: OnvifController,
 | 
						|
    external_processor: ExternalEventProcessor,
 | 
						|
    plus_api: PlusApi,
 | 
						|
    stats_emitter: StatsEmitter,
 | 
						|
):
 | 
						|
    app = Flask(__name__)
 | 
						|
 | 
						|
    @app.before_request
 | 
						|
    def check_csrf():
 | 
						|
        if request.method in ["GET", "HEAD", "OPTIONS", "TRACE"]:
 | 
						|
            pass
 | 
						|
        if "origin" in request.headers and "x-csrf-token" not in request.headers:
 | 
						|
            return jsonify({"success": False, "message": "Missing CSRF header"}), 401
 | 
						|
 | 
						|
    @app.before_request
 | 
						|
    def _db_connect():
 | 
						|
        if database.is_closed():
 | 
						|
            database.connect()
 | 
						|
 | 
						|
    @app.teardown_request
 | 
						|
    def _db_close(exc):
 | 
						|
        if not database.is_closed():
 | 
						|
            database.close()
 | 
						|
 | 
						|
    app.frigate_config = frigate_config
 | 
						|
    app.detected_frames_processor = detected_frames_processor
 | 
						|
    app.storage_maintainer = storage_maintainer
 | 
						|
    app.onvif = onvif
 | 
						|
    app.external_processor = external_processor
 | 
						|
    app.plus_api = plus_api
 | 
						|
    app.camera_error_image = None
 | 
						|
    app.stats_emitter = stats_emitter
 | 
						|
    app.jwt_token = (
 | 
						|
        get_jwt_secret() if frigate_config.auth.mode == AuthModeEnum.native else None
 | 
						|
    )
 | 
						|
    # update the request_address with the x-forwarded-for header from nginx
 | 
						|
    app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1)
 | 
						|
    # initialize the rate limiter for the login endpoint
 | 
						|
    limiter.init_app(app)
 | 
						|
    if frigate_config.auth.failed_login_rate_limit is None:
 | 
						|
        limiter.enabled = False
 | 
						|
 | 
						|
    app.register_blueprint(bp)
 | 
						|
 | 
						|
    return app
 | 
						|
 | 
						|
 | 
						|
@bp.route("/")
 | 
						|
def is_healthy():
 | 
						|
    return "Frigate is running. Alive and healthy!"
 | 
						|
 | 
						|
 | 
						|
@bp.route("/config/schema.json")
 | 
						|
def config_schema():
 | 
						|
    return current_app.response_class(
 | 
						|
        current_app.frigate_config.schema_json(), mimetype="application/json"
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@bp.route("/go2rtc/streams")
 | 
						|
def go2rtc_streams():
 | 
						|
    r = requests.get("http://127.0.0.1:1984/api/streams")
 | 
						|
    if not r.ok:
 | 
						|
        logger.error("Failed to fetch streams from go2rtc")
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Error fetching stream data"}),
 | 
						|
            500,
 | 
						|
        )
 | 
						|
    stream_data = r.json()
 | 
						|
    for data in stream_data.values():
 | 
						|
        for producer in data.get("producers", []):
 | 
						|
            producer["url"] = clean_camera_user_pass(producer.get("url", ""))
 | 
						|
    return jsonify(stream_data)
 | 
						|
 | 
						|
 | 
						|
@bp.route("/go2rtc/streams/<camera_name>")
 | 
						|
def go2rtc_camera_stream(camera_name: str):
 | 
						|
    r = requests.get(
 | 
						|
        f"http://127.0.0.1:1984/api/streams?src={camera_name}&video=all&audio=allµphone"
 | 
						|
    )
 | 
						|
    if not r.ok:
 | 
						|
        logger.error("Failed to fetch streams from go2rtc")
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Error fetching stream data"}),
 | 
						|
            500,
 | 
						|
        )
 | 
						|
    stream_data = r.json()
 | 
						|
    for producer in stream_data.get("producers", []):
 | 
						|
        producer["url"] = clean_camera_user_pass(producer.get("url", ""))
 | 
						|
    return jsonify(stream_data)
 | 
						|
 | 
						|
 | 
						|
@bp.route("/version")
 | 
						|
def version():
 | 
						|
    return VERSION
 | 
						|
 | 
						|
 | 
						|
@bp.route("/stats")
 | 
						|
def stats():
 | 
						|
    return jsonify(current_app.stats_emitter.get_latest_stats())
 | 
						|
 | 
						|
 | 
						|
@bp.route("/stats/history")
 | 
						|
def stats_history():
 | 
						|
    keys = request.args.get("keys", default=None)
 | 
						|
 | 
						|
    if keys:
 | 
						|
        keys = keys.split(",")
 | 
						|
 | 
						|
    return jsonify(current_app.stats_emitter.get_stats_history(keys))
 | 
						|
 | 
						|
 | 
						|
@bp.route("/config")
 | 
						|
def config():
 | 
						|
    config_obj: FrigateConfig = current_app.frigate_config
 | 
						|
    config: dict[str, dict[str, any]] = config_obj.model_dump(
 | 
						|
        mode="json", warnings="none", exclude_none=True
 | 
						|
    )
 | 
						|
 | 
						|
    # remove the mqtt password
 | 
						|
    config["mqtt"].pop("password", None)
 | 
						|
 | 
						|
    for camera_name, camera in current_app.frigate_config.cameras.items():
 | 
						|
        camera_dict = config["cameras"][camera_name]
 | 
						|
 | 
						|
        # clean paths
 | 
						|
        for input in camera_dict.get("ffmpeg", {}).get("inputs", []):
 | 
						|
            input["path"] = clean_camera_user_pass(input["path"])
 | 
						|
 | 
						|
        # add clean ffmpeg_cmds
 | 
						|
        camera_dict["ffmpeg_cmds"] = copy.deepcopy(camera.ffmpeg_cmds)
 | 
						|
        for cmd in camera_dict["ffmpeg_cmds"]:
 | 
						|
            cmd["cmd"] = clean_camera_user_pass(" ".join(cmd["cmd"]))
 | 
						|
 | 
						|
        # ensure that zones are relative
 | 
						|
        for zone_name, zone in config_obj.cameras[camera_name].zones.items():
 | 
						|
            camera_dict["zones"][zone_name]["color"] = zone.color
 | 
						|
 | 
						|
    config["plus"] = {"enabled": current_app.plus_api.is_active()}
 | 
						|
    config["model"]["colormap"] = config_obj.model.colormap
 | 
						|
 | 
						|
    for detector_config in config["detectors"].values():
 | 
						|
        detector_config["model"]["labelmap"] = (
 | 
						|
            current_app.frigate_config.model.merged_labelmap
 | 
						|
        )
 | 
						|
 | 
						|
    return jsonify(config)
 | 
						|
 | 
						|
 | 
						|
@bp.route("/config/raw")
 | 
						|
def config_raw():
 | 
						|
    config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
 | 
						|
 | 
						|
    # Check if we can use .yaml instead of .yml
 | 
						|
    config_file_yaml = config_file.replace(".yml", ".yaml")
 | 
						|
 | 
						|
    if os.path.isfile(config_file_yaml):
 | 
						|
        config_file = config_file_yaml
 | 
						|
 | 
						|
    if not os.path.isfile(config_file):
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Could not find file"}), 404
 | 
						|
        )
 | 
						|
 | 
						|
    with open(config_file, "r") as f:
 | 
						|
        raw_config = f.read()
 | 
						|
        f.close()
 | 
						|
 | 
						|
        return raw_config, 200
 | 
						|
 | 
						|
 | 
						|
@bp.route("/config/save", methods=["POST"])
 | 
						|
def config_save():
 | 
						|
    save_option = request.args.get("save_option")
 | 
						|
 | 
						|
    new_config = request.get_data().decode()
 | 
						|
 | 
						|
    if not new_config:
 | 
						|
        return make_response(
 | 
						|
            jsonify(
 | 
						|
                {"success": False, "message": "Config with body param is required"}
 | 
						|
            ),
 | 
						|
            400,
 | 
						|
        )
 | 
						|
 | 
						|
    # Validate the config schema
 | 
						|
    try:
 | 
						|
        FrigateConfig.parse_raw(new_config)
 | 
						|
    except Exception:
 | 
						|
        return make_response(
 | 
						|
            jsonify(
 | 
						|
                {
 | 
						|
                    "success": False,
 | 
						|
                    "message": f"\nConfig Error:\n\n{escape(str(traceback.format_exc()))}",
 | 
						|
                }
 | 
						|
            ),
 | 
						|
            400,
 | 
						|
        )
 | 
						|
 | 
						|
    # Save the config to file
 | 
						|
    try:
 | 
						|
        config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
 | 
						|
 | 
						|
        # Check if we can use .yaml instead of .yml
 | 
						|
        config_file_yaml = config_file.replace(".yml", ".yaml")
 | 
						|
 | 
						|
        if os.path.isfile(config_file_yaml):
 | 
						|
            config_file = config_file_yaml
 | 
						|
 | 
						|
        with open(config_file, "w") as f:
 | 
						|
            f.write(new_config)
 | 
						|
            f.close()
 | 
						|
    except Exception:
 | 
						|
        return make_response(
 | 
						|
            jsonify(
 | 
						|
                {
 | 
						|
                    "success": False,
 | 
						|
                    "message": "Could not write config file, be sure that Frigate has write permission on the config file.",
 | 
						|
                }
 | 
						|
            ),
 | 
						|
            400,
 | 
						|
        )
 | 
						|
 | 
						|
    if save_option == "restart":
 | 
						|
        try:
 | 
						|
            restart_frigate()
 | 
						|
        except Exception as e:
 | 
						|
            logging.error(f"Error restarting Frigate: {e}")
 | 
						|
            return make_response(
 | 
						|
                jsonify(
 | 
						|
                    {
 | 
						|
                        "success": True,
 | 
						|
                        "message": "Config successfully saved, unable to restart Frigate",
 | 
						|
                    }
 | 
						|
                ),
 | 
						|
                200,
 | 
						|
            )
 | 
						|
 | 
						|
        return make_response(
 | 
						|
            jsonify(
 | 
						|
                {
 | 
						|
                    "success": True,
 | 
						|
                    "message": "Config successfully saved, restarting (this can take up to one minute)...",
 | 
						|
                }
 | 
						|
            ),
 | 
						|
            200,
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": True, "message": "Config successfully saved."}),
 | 
						|
            200,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
@bp.route("/config/set", methods=["PUT"])
 | 
						|
def config_set():
 | 
						|
    config_file = os.environ.get("CONFIG_FILE", f"{CONFIG_DIR}/config.yml")
 | 
						|
 | 
						|
    # Check if we can use .yaml instead of .yml
 | 
						|
    config_file_yaml = config_file.replace(".yml", ".yaml")
 | 
						|
 | 
						|
    if os.path.isfile(config_file_yaml):
 | 
						|
        config_file = config_file_yaml
 | 
						|
 | 
						|
    with open(config_file, "r") as f:
 | 
						|
        old_raw_config = f.read()
 | 
						|
        f.close()
 | 
						|
 | 
						|
    try:
 | 
						|
        update_yaml_from_url(config_file, request.url)
 | 
						|
        with open(config_file, "r") as f:
 | 
						|
            new_raw_config = f.read()
 | 
						|
            f.close()
 | 
						|
        # Validate the config schema
 | 
						|
        try:
 | 
						|
            config_obj = FrigateConfig.parse_raw(new_raw_config)
 | 
						|
        except Exception:
 | 
						|
            with open(config_file, "w") as f:
 | 
						|
                f.write(old_raw_config)
 | 
						|
                f.close()
 | 
						|
            logger.error(f"\nConfig Error:\n\n{str(traceback.format_exc())}")
 | 
						|
            return make_response(
 | 
						|
                jsonify(
 | 
						|
                    {
 | 
						|
                        "success": False,
 | 
						|
                        "message": "Error parsing config. Check logs for error message.",
 | 
						|
                    }
 | 
						|
                ),
 | 
						|
                400,
 | 
						|
            )
 | 
						|
    except Exception as e:
 | 
						|
        logging.error(f"Error updating config: {e}")
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Error updating config"}),
 | 
						|
            500,
 | 
						|
        )
 | 
						|
 | 
						|
    json = request.get_json(silent=True) or {}
 | 
						|
 | 
						|
    if json.get("requires_restart", 1) == 0:
 | 
						|
        current_app.frigate_config = FrigateConfig.runtime_config(
 | 
						|
            config_obj, current_app.plus_api
 | 
						|
        )
 | 
						|
 | 
						|
    return make_response(
 | 
						|
        jsonify(
 | 
						|
            {
 | 
						|
                "success": True,
 | 
						|
                "message": "Config successfully updated, restart to apply",
 | 
						|
            }
 | 
						|
        ),
 | 
						|
        200,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@bp.route("/ffprobe", methods=["GET"])
 | 
						|
def ffprobe():
 | 
						|
    path_param = request.args.get("paths", "")
 | 
						|
 | 
						|
    if not path_param:
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Path needs to be provided."}), 404
 | 
						|
        )
 | 
						|
 | 
						|
    if path_param.startswith("camera"):
 | 
						|
        camera = path_param[7:]
 | 
						|
 | 
						|
        if camera not in current_app.frigate_config.cameras.keys():
 | 
						|
            return make_response(
 | 
						|
                jsonify(
 | 
						|
                    {"success": False, "message": f"{camera} is not a valid camera."}
 | 
						|
                ),
 | 
						|
                404,
 | 
						|
            )
 | 
						|
 | 
						|
        if not current_app.frigate_config.cameras[camera].enabled:
 | 
						|
            return make_response(
 | 
						|
                jsonify({"success": False, "message": f"{camera} is not enabled."}), 404
 | 
						|
            )
 | 
						|
 | 
						|
        paths = map(
 | 
						|
            lambda input: input.path,
 | 
						|
            current_app.frigate_config.cameras[camera].ffmpeg.inputs,
 | 
						|
        )
 | 
						|
    elif "," in clean_camera_user_pass(path_param):
 | 
						|
        paths = path_param.split(",")
 | 
						|
    else:
 | 
						|
        paths = [path_param]
 | 
						|
 | 
						|
    # user has multiple streams
 | 
						|
    output = []
 | 
						|
 | 
						|
    for path in paths:
 | 
						|
        ffprobe = ffprobe_stream(path.strip())
 | 
						|
        output.append(
 | 
						|
            {
 | 
						|
                "return_code": ffprobe.returncode,
 | 
						|
                "stderr": (
 | 
						|
                    ffprobe.stderr.decode("unicode_escape").strip()
 | 
						|
                    if ffprobe.returncode != 0
 | 
						|
                    else ""
 | 
						|
                ),
 | 
						|
                "stdout": (
 | 
						|
                    json.loads(ffprobe.stdout.decode("unicode_escape").strip())
 | 
						|
                    if ffprobe.returncode == 0
 | 
						|
                    else ""
 | 
						|
                ),
 | 
						|
            }
 | 
						|
        )
 | 
						|
 | 
						|
    return jsonify(output)
 | 
						|
 | 
						|
 | 
						|
@bp.route("/vainfo", methods=["GET"])
 | 
						|
def vainfo():
 | 
						|
    vainfo = vainfo_hwaccel()
 | 
						|
    return jsonify(
 | 
						|
        {
 | 
						|
            "return_code": vainfo.returncode,
 | 
						|
            "stderr": (
 | 
						|
                vainfo.stderr.decode("unicode_escape").strip()
 | 
						|
                if vainfo.returncode != 0
 | 
						|
                else ""
 | 
						|
            ),
 | 
						|
            "stdout": (
 | 
						|
                vainfo.stdout.decode("unicode_escape").strip()
 | 
						|
                if vainfo.returncode == 0
 | 
						|
                else ""
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@bp.route("/logs/<service>", methods=["GET"])
 | 
						|
def logs(service: str):
 | 
						|
    log_locations = {
 | 
						|
        "frigate": "/dev/shm/logs/frigate/current",
 | 
						|
        "go2rtc": "/dev/shm/logs/go2rtc/current",
 | 
						|
        "nginx": "/dev/shm/logs/nginx/current",
 | 
						|
    }
 | 
						|
    service_location = log_locations.get(service)
 | 
						|
 | 
						|
    if not service_location:
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Not a valid service"}),
 | 
						|
            404,
 | 
						|
        )
 | 
						|
 | 
						|
    start = request.args.get("start", type=int, default=0)
 | 
						|
    end = request.args.get("end", type=int)
 | 
						|
 | 
						|
    try:
 | 
						|
        file = open(service_location, "r")
 | 
						|
        contents = file.read()
 | 
						|
        file.close()
 | 
						|
 | 
						|
        # use the start timestamp to group logs together``
 | 
						|
        logLines = []
 | 
						|
        keyLength = 0
 | 
						|
        dateEnd = 0
 | 
						|
        currentKey = ""
 | 
						|
        currentLine = ""
 | 
						|
 | 
						|
        for rawLine in contents.splitlines():
 | 
						|
            cleanLine = rawLine.strip()
 | 
						|
 | 
						|
            if len(cleanLine) < 10:
 | 
						|
                continue
 | 
						|
 | 
						|
            if dateEnd == 0:
 | 
						|
                dateEnd = cleanLine.index("  ")
 | 
						|
                keyLength = dateEnd - (6 if service_location == "frigate" else 0)
 | 
						|
 | 
						|
            newKey = cleanLine[0:keyLength]
 | 
						|
 | 
						|
            if newKey == currentKey:
 | 
						|
                currentLine += f"\n{cleanLine[dateEnd:].strip()}"
 | 
						|
                continue
 | 
						|
            else:
 | 
						|
                if len(currentLine) > 0:
 | 
						|
                    logLines.append(currentLine)
 | 
						|
 | 
						|
                currentKey = newKey
 | 
						|
                currentLine = cleanLine
 | 
						|
 | 
						|
        logLines.append(currentLine)
 | 
						|
 | 
						|
        return make_response(
 | 
						|
            jsonify({"totalLines": len(logLines), "lines": logLines[start:end]}),
 | 
						|
            200,
 | 
						|
        )
 | 
						|
    except FileNotFoundError as e:
 | 
						|
        logger.error(e)
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Could not find log file"}),
 | 
						|
            500,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
@bp.route("/restart", methods=["POST"])
 | 
						|
def restart():
 | 
						|
    try:
 | 
						|
        restart_frigate()
 | 
						|
    except Exception as e:
 | 
						|
        logging.error(f"Error restarting Frigate: {e}")
 | 
						|
        return make_response(
 | 
						|
            jsonify(
 | 
						|
                {
 | 
						|
                    "success": False,
 | 
						|
                    "message": "Unable to restart Frigate.",
 | 
						|
                }
 | 
						|
            ),
 | 
						|
            500,
 | 
						|
        )
 | 
						|
 | 
						|
    return make_response(
 | 
						|
        jsonify(
 | 
						|
            {
 | 
						|
                "success": True,
 | 
						|
                "message": "Restarting (this can take up to one minute)...",
 | 
						|
            }
 | 
						|
        ),
 | 
						|
        200,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@bp.route("/labels")
 | 
						|
def get_labels():
 | 
						|
    camera = request.args.get("camera", type=str, default="")
 | 
						|
 | 
						|
    try:
 | 
						|
        if camera:
 | 
						|
            events = Event.select(Event.label).where(Event.camera == camera).distinct()
 | 
						|
        else:
 | 
						|
            events = Event.select(Event.label).distinct()
 | 
						|
    except Exception as e:
 | 
						|
        logger.error(e)
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Failed to get labels"}), 404
 | 
						|
        )
 | 
						|
 | 
						|
    labels = sorted([e.label for e in events])
 | 
						|
    return jsonify(labels)
 | 
						|
 | 
						|
 | 
						|
@bp.route("/sub_labels")
 | 
						|
def get_sub_labels():
 | 
						|
    split_joined = request.args.get("split_joined", type=int)
 | 
						|
 | 
						|
    try:
 | 
						|
        events = Event.select(Event.sub_label).distinct()
 | 
						|
    except Exception:
 | 
						|
        return make_response(
 | 
						|
            jsonify({"success": False, "message": "Failed to get sub_labels"}),
 | 
						|
            404,
 | 
						|
        )
 | 
						|
 | 
						|
    sub_labels = [e.sub_label for e in events]
 | 
						|
 | 
						|
    if None in sub_labels:
 | 
						|
        sub_labels.remove(None)
 | 
						|
 | 
						|
    if split_joined:
 | 
						|
        original_labels = sub_labels.copy()
 | 
						|
 | 
						|
        for label in original_labels:
 | 
						|
            if "," in label:
 | 
						|
                sub_labels.remove(label)
 | 
						|
                parts = label.split(",")
 | 
						|
 | 
						|
                for part in parts:
 | 
						|
                    if part.strip() not in sub_labels:
 | 
						|
                        sub_labels.append(part.strip())
 | 
						|
 | 
						|
    sub_labels.sort()
 | 
						|
    return jsonify(sub_labels)
 | 
						|
 | 
						|
 | 
						|
@bp.route("/timeline")
 | 
						|
def timeline():
 | 
						|
    camera = request.args.get("camera", "all")
 | 
						|
    source_id = request.args.get("source_id", type=str)
 | 
						|
    limit = request.args.get("limit", 100)
 | 
						|
 | 
						|
    clauses = []
 | 
						|
 | 
						|
    selected_columns = [
 | 
						|
        Timeline.timestamp,
 | 
						|
        Timeline.camera,
 | 
						|
        Timeline.source,
 | 
						|
        Timeline.source_id,
 | 
						|
        Timeline.class_type,
 | 
						|
        Timeline.data,
 | 
						|
    ]
 | 
						|
 | 
						|
    if camera != "all":
 | 
						|
        clauses.append((Timeline.camera == camera))
 | 
						|
 | 
						|
    if source_id:
 | 
						|
        clauses.append((Timeline.source_id == source_id))
 | 
						|
 | 
						|
    if len(clauses) == 0:
 | 
						|
        clauses.append((True))
 | 
						|
 | 
						|
    timeline = (
 | 
						|
        Timeline.select(*selected_columns)
 | 
						|
        .where(reduce(operator.and_, clauses))
 | 
						|
        .order_by(Timeline.timestamp.asc())
 | 
						|
        .limit(limit)
 | 
						|
        .dicts()
 | 
						|
    )
 | 
						|
 | 
						|
    return jsonify([t for t in timeline])
 | 
						|
 | 
						|
 | 
						|
@bp.route("/timeline/hourly")
 | 
						|
def hourly_timeline():
 | 
						|
    """Get hourly summary for timeline."""
 | 
						|
    cameras = request.args.get("cameras", "all")
 | 
						|
    labels = request.args.get("labels", "all")
 | 
						|
    before = request.args.get("before", type=float)
 | 
						|
    after = request.args.get("after", type=float)
 | 
						|
    limit = request.args.get("limit", 200)
 | 
						|
    tz_name = request.args.get("timezone", default="utc", type=str)
 | 
						|
 | 
						|
    _, minute_modifier, _ = get_tz_modifiers(tz_name)
 | 
						|
    minute_offset = int(minute_modifier.split(" ")[0])
 | 
						|
 | 
						|
    clauses = []
 | 
						|
 | 
						|
    if cameras != "all":
 | 
						|
        camera_list = cameras.split(",")
 | 
						|
        clauses.append((Timeline.camera << camera_list))
 | 
						|
 | 
						|
    if labels != "all":
 | 
						|
        label_list = labels.split(",")
 | 
						|
        clauses.append((Timeline.data["label"] << label_list))
 | 
						|
 | 
						|
    if before:
 | 
						|
        clauses.append((Timeline.timestamp < before))
 | 
						|
 | 
						|
    if after:
 | 
						|
        clauses.append((Timeline.timestamp > after))
 | 
						|
 | 
						|
    if len(clauses) == 0:
 | 
						|
        clauses.append((True))
 | 
						|
 | 
						|
    timeline = (
 | 
						|
        Timeline.select(
 | 
						|
            Timeline.camera,
 | 
						|
            Timeline.timestamp,
 | 
						|
            Timeline.data,
 | 
						|
            Timeline.class_type,
 | 
						|
            Timeline.source_id,
 | 
						|
            Timeline.source,
 | 
						|
        )
 | 
						|
        .where(reduce(operator.and_, clauses))
 | 
						|
        .order_by(Timeline.timestamp.desc())
 | 
						|
        .limit(limit)
 | 
						|
        .dicts()
 | 
						|
        .iterator()
 | 
						|
    )
 | 
						|
 | 
						|
    count = 0
 | 
						|
    start = 0
 | 
						|
    end = 0
 | 
						|
    hours: dict[str, list[dict[str, any]]] = {}
 | 
						|
 | 
						|
    for t in timeline:
 | 
						|
        if count == 0:
 | 
						|
            start = t["timestamp"]
 | 
						|
        else:
 | 
						|
            end = t["timestamp"]
 | 
						|
 | 
						|
        count += 1
 | 
						|
 | 
						|
        hour = (
 | 
						|
            datetime.fromtimestamp(t["timestamp"]).replace(
 | 
						|
                minute=0, second=0, microsecond=0
 | 
						|
            )
 | 
						|
            + timedelta(
 | 
						|
                minutes=minute_offset,
 | 
						|
            )
 | 
						|
        ).timestamp()
 | 
						|
        if hour not in hours:
 | 
						|
            hours[hour] = [t]
 | 
						|
        else:
 | 
						|
            hours[hour].insert(0, t)
 | 
						|
 | 
						|
    return jsonify(
 | 
						|
        {
 | 
						|
            "start": start,
 | 
						|
            "end": end,
 | 
						|
            "count": count,
 | 
						|
            "hours": hours,
 | 
						|
        }
 | 
						|
    )
 |