import base64 import copy import glob import json import logging import os import re import subprocess as sp import time import traceback from collections import defaultdict from datetime import datetime, timedelta, timezone from functools import reduce from pathlib import Path from urllib.parse import unquote import cv2 import numpy as np import pandas as pd import pytz import requests from flask import ( Blueprint, Flask, Response, current_app, escape, jsonify, make_response, request, ) from peewee import DoesNotExist, fn, operator from playhouse.shortcuts import model_to_dict from playhouse.sqliteq import SqliteQueueDatabase from tzlocal import get_localzone_name from werkzeug.utils import secure_filename from frigate.config import FrigateConfig from frigate.const import ( CACHE_DIR, CLIPS_DIR, CONFIG_DIR, EXPORT_DIR, MAX_SEGMENT_DURATION, RECORD_DIR, ) from frigate.events.external import ExternalEventProcessor from frigate.models import Event, Previews, Recordings, Regions, Timeline from frigate.object_processing import TrackedObject from frigate.plus import PlusApi from frigate.ptz.onvif import OnvifController from frigate.record.export import PlaybackFactorEnum, RecordingExporter from frigate.stats import stats_snapshot from frigate.storage import StorageMaintainer from frigate.util.builtin import ( clean_camera_user_pass, get_tz_modifiers, update_yaml_from_url, ) from frigate.util.services import ffprobe_stream, restart_frigate, vainfo_hwaccel from frigate.version import VERSION logger = logging.getLogger(__name__) DEFAULT_TIME_RANGE = "00:00,24:00" bp = Blueprint("frigate", __name__) def create_app( frigate_config, database: SqliteQueueDatabase, stats_tracking, detected_frames_processor, storage_maintainer: StorageMaintainer, onvif: OnvifController, external_processor: ExternalEventProcessor, plus_api: PlusApi, ): app = Flask(__name__) @app.before_request def check_csrf(): if request.method in ["GET", "HEAD", "OPTIONS", "TRACE"]: pass if "origin" in request.headers and "x-csrf-token" not in request.headers: return jsonify({"success": False, "message": "Missing CSRF header"}), 401 @app.before_request def _db_connect(): if database.is_closed(): database.connect() @app.teardown_request def _db_close(exc): if not database.is_closed(): database.close() app.frigate_config = frigate_config app.stats_tracking = stats_tracking app.detected_frames_processor = detected_frames_processor app.storage_maintainer = storage_maintainer app.onvif = onvif app.external_processor = external_processor app.plus_api = plus_api app.camera_error_image = None app.hwaccel_errors = [] app.register_blueprint(bp) return app @bp.route("/") def is_healthy(): return "Frigate is running. Alive and healthy!" @bp.route("/events/summary") def events_summary(): tz_name = request.args.get("timezone", default="utc", type=str) hour_modifier, minute_modifier, seconds_offset = get_tz_modifiers(tz_name) has_clip = request.args.get("has_clip", type=int) has_snapshot = request.args.get("has_snapshot", type=int) clauses = [] if has_clip is not None: clauses.append((Event.has_clip == has_clip)) if has_snapshot is not None: clauses.append((Event.has_snapshot == has_snapshot)) if len(clauses) == 0: clauses.append((True)) groups = ( Event.select( Event.camera, Event.label, Event.sub_label, fn.strftime( "%Y-%m-%d", fn.datetime( Event.start_time, "unixepoch", hour_modifier, minute_modifier ), ).alias("day"), Event.zones, fn.COUNT(Event.id).alias("count"), ) .where(reduce(operator.and_, clauses)) .group_by( Event.camera, Event.label, Event.sub_label, (Event.start_time + seconds_offset).cast("int") / (3600 * 24), Event.zones, ) ) return jsonify([e for e in groups.dicts()]) @bp.route("/events/", methods=("GET",)) def event(id): try: return model_to_dict(Event.get(Event.id == id)) except DoesNotExist: return "Event not found", 404 @bp.route("/events//retain", methods=("POST",)) def set_retain(id): try: event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) event.retain_indefinitely = True event.save() return make_response( jsonify({"success": True, "message": "Event " + id + " retained"}), 200 ) @bp.route("/events//plus", methods=("POST",)) def send_to_plus(id): if not current_app.plus_api.is_active(): message = "PLUS_API_KEY environment variable is not set" logger.error(message) return make_response( jsonify( { "success": False, "message": message, } ), 400, ) include_annotation = ( request.json.get("include_annotation") if request.is_json else None ) try: event = Event.get(Event.id == id) except DoesNotExist: message = f"Event {id} not found" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 404) # events from before the conversion to relative dimensions cant include annotations if event.data.get("box") is None: include_annotation = None if event.end_time is None: logger.error(f"Unable to load clean png for in-progress event: {event.id}") return make_response( jsonify( { "success": False, "message": "Unable to load clean png for in-progress event", } ), 400, ) if event.plus_id: message = "Already submitted to plus" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 400) # load clean.png try: filename = f"{event.camera}-{event.id}-clean.png" image = cv2.imread(os.path.join(CLIPS_DIR, filename)) except Exception: logger.error(f"Unable to load clean png for event: {event.id}") return make_response( jsonify( {"success": False, "message": "Unable to load clean png for event"} ), 400, ) if image is None or image.size == 0: logger.error(f"Unable to load clean png for event: {event.id}") return make_response( jsonify( {"success": False, "message": "Unable to load clean png for event"} ), 400, ) try: plus_id = current_app.plus_api.upload_image(image, event.camera) except Exception as ex: logger.exception(ex) return make_response( jsonify({"success": False, "message": "Error uploading image"}), 400, ) # store image id in the database event.plus_id = plus_id event.save() if include_annotation is not None: box = event.data["box"] try: current_app.plus_api.add_annotation( event.plus_id, box, event.label, ) except Exception as ex: logger.exception(ex) return make_response( jsonify({"success": False, "message": "Error uploading annotation"}), 400, ) return make_response(jsonify({"success": True, "plus_id": plus_id}), 200) @bp.route("/events//false_positive", methods=("PUT",)) def false_positive(id): if not current_app.plus_api.is_active(): message = "PLUS_API_KEY environment variable is not set" logger.error(message) return make_response( jsonify( { "success": False, "message": message, } ), 400, ) try: event = Event.get(Event.id == id) except DoesNotExist: message = f"Event {id} not found" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 404) # events from before the conversion to relative dimensions cant include annotations if event.data.get("box") is None: message = "Events prior to 0.13 cannot be submitted as false positives" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 400) if event.false_positive: message = "False positive already submitted to Frigate+" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 400) if not event.plus_id: plus_response = send_to_plus(id) if plus_response.status_code != 200: return plus_response # need to refetch the event now that it has a plus_id event = Event.get(Event.id == id) region = event.data["region"] box = event.data["box"] # provide top score if score is unavailable score = ( (event.data["top_score"] if event.data["top_score"] else event.top_score) if event.data["score"] is None else event.data["score"] ) try: current_app.plus_api.add_false_positive( event.plus_id, region, box, score, event.label, event.model_hash, event.model_type, event.detector_type, ) except Exception as ex: logger.exception(ex) return make_response( jsonify({"success": False, "message": "Error uploading false positive"}), 400, ) event.false_positive = True event.save() return make_response(jsonify({"success": True, "plus_id": event.plus_id}), 200) @bp.route("/events//retain", methods=("DELETE",)) def delete_retain(id): try: event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) event.retain_indefinitely = False event.save() return make_response( jsonify({"success": True, "message": "Event " + id + " un-retained"}), 200 ) @bp.route("/events//sub_label", methods=("POST",)) def set_sub_label(id): try: event: Event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) json: dict[str, any] = request.get_json(silent=True) or {} new_sub_label = json.get("subLabel") new_score = json.get("subLabelScore") if new_sub_label is None: return make_response( jsonify( { "success": False, "message": "A sub label must be supplied", } ), 400, ) if new_sub_label and len(new_sub_label) > 100: return make_response( jsonify( { "success": False, "message": new_sub_label + " exceeds the 100 character limit for sub_label", } ), 400, ) if new_score is not None and (new_score > 1.0 or new_score < 0): return make_response( jsonify( { "success": False, "message": new_score + " does not fit within the expected bounds 0 <= score <= 1.0", } ), 400, ) if not event.end_time: # update tracked object tracked_obj: TrackedObject = ( current_app.detected_frames_processor.camera_states[ event.camera ].tracked_objects.get(event.id) ) if tracked_obj: tracked_obj.obj_data["sub_label"] = (new_sub_label, new_score) # update timeline items Timeline.update( data=Timeline.data.update({"sub_label": (new_sub_label, new_score)}) ).where(Timeline.source_id == id).execute() event.sub_label = new_sub_label if new_score: data = event.data data["sub_label_score"] = new_score event.data = data event.save() return make_response( jsonify( { "success": True, "message": "Event " + id + " sub label set to " + new_sub_label, } ), 200, ) @bp.route("/labels") def get_labels(): camera = request.args.get("camera", type=str, default="") try: if camera: events = Event.select(Event.label).where(Event.camera == camera).distinct() else: events = Event.select(Event.label).distinct() except Exception as e: logger.error(e) return make_response( jsonify({"success": False, "message": "Failed to get labels"}), 404 ) labels = sorted([e.label for e in events]) return jsonify(labels) @bp.route("/sub_labels") def get_sub_labels(): split_joined = request.args.get("split_joined", type=int) try: events = Event.select(Event.sub_label).distinct() except Exception: return make_response( jsonify({"success": False, "message": "Failed to get sub_labels"}), 404, ) sub_labels = [e.sub_label for e in events] if None in sub_labels: sub_labels.remove(None) if split_joined: original_labels = sub_labels.copy() for label in original_labels: if "," in label: sub_labels.remove(label) parts = label.split(",") for part in parts: if part.strip() not in sub_labels: sub_labels.append(part.strip()) sub_labels.sort() return jsonify(sub_labels) @bp.route("/events/", methods=("DELETE",)) def delete_event(id): try: event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) media_name = f"{event.camera}-{event.id}" if event.has_snapshot: media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg") media.unlink(missing_ok=True) media = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png") media.unlink(missing_ok=True) if event.has_clip: media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4") media.unlink(missing_ok=True) event.delete_instance() Timeline.delete().where(Timeline.source_id == id).execute() return make_response( jsonify({"success": True, "message": "Event " + id + " deleted"}), 200 ) @bp.route("/events//thumbnail.jpg") def event_thumbnail(id, max_cache_age=2592000): format = request.args.get("format", "ios") thumbnail_bytes = None event_complete = False try: event = Event.get(Event.id == id) if event.end_time is not None: event_complete = True thumbnail_bytes = base64.b64decode(event.thumbnail) except DoesNotExist: # see if the object is currently being tracked try: camera_states = current_app.detected_frames_processor.camera_states.values() for camera_state in camera_states: if id in camera_state.tracked_objects: tracked_obj = camera_state.tracked_objects.get(id) if tracked_obj is not None: thumbnail_bytes = tracked_obj.get_thumbnail() except Exception: return make_response( jsonify({"success": False, "message": "Event not found"}), 404 ) if thumbnail_bytes is None: return make_response( jsonify({"success": False, "message": "Event not found"}), 404 ) # android notifications prefer a 2:1 ratio if format == "android": jpg_as_np = np.frombuffer(thumbnail_bytes, dtype=np.uint8) img = cv2.imdecode(jpg_as_np, flags=1) thumbnail = cv2.copyMakeBorder( img, 0, 0, int(img.shape[1] * 0.5), int(img.shape[1] * 0.5), cv2.BORDER_CONSTANT, (0, 0, 0), ) ret, jpg = cv2.imencode(".jpg", thumbnail, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) thumbnail_bytes = jpg.tobytes() response = make_response(thumbnail_bytes) response.headers["Content-Type"] = "image/jpeg" if event_complete: response.headers["Cache-Control"] = f"private, max-age={max_cache_age}" else: response.headers["Cache-Control"] = "no-store" return response @bp.route("/events//preview.gif") def event_preview(id: str, max_cache_age=2592000): try: event: Event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event not found"}), 404 ) start_ts = event.start_time end_ts = ( start_ts + min(event.end_time - event.start_time, 20) if event.end_time else 20 ) if datetime.fromtimestamp(event.start_time) < datetime.now().replace( minute=0, second=0 ): # has preview mp4 preview: Previews = ( Previews.select( Previews.camera, Previews.path, Previews.duration, Previews.start_time, Previews.end_time, ) .where( Previews.start_time.between(start_ts, end_ts) | Previews.end_time.between(start_ts, end_ts) | ((start_ts > Previews.start_time) & (end_ts < Previews.end_time)) ) .where(Previews.camera == event.camera) .limit(1) .get() ) if not preview: return make_response( jsonify({"success": False, "message": "Preview not found"}), 404 ) diff = event.start_time - preview.start_time minutes = int(diff / 60) seconds = int(diff % 60) ffmpeg_cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "warning", "-ss", f"00:{minutes}:{seconds}", "-t", f"{end_ts - start_ts}", "-i", preview.path, "-r", "8", "-vf", "setpts=0.12*PTS", "-loop", "0", "-c:v", "gif", "-f", "gif", "-", ] process = sp.run( ffmpeg_cmd, capture_output=True, ) if process.returncode != 0: logger.error(process.stderr) return make_response( jsonify({"success": False, "message": "Unable to create preview gif"}), 500, ) gif_bytes = process.stdout else: # need to generate from existing images preview_dir = os.path.join(CACHE_DIR, "preview_frames") file_start = f"preview_{event.camera}" start_file = f"{file_start}-{start_ts}.jpg" end_file = f"{file_start}-{end_ts}.jpg" selected_previews = [] for file in sorted(os.listdir(preview_dir)): if not file.startswith(file_start): continue if file < start_file: continue if file > end_file: break selected_previews.append(f"file '/tmp/cache/preview_frames/{file}'") selected_previews.append("duration 0.12") if not selected_previews: return make_response( jsonify({"success": False, "message": "Preview not found"}), 404 ) last_file = selected_previews[-2] selected_previews.append(last_file) ffmpeg_cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "warning", "-f", "concat", "-y", "-protocol_whitelist", "pipe,file", "-safe", "0", "-i", "/dev/stdin", "-loop", "0", "-c:v", "gif", "-f", "gif", "-", ] process = sp.run( ffmpeg_cmd, input=str.encode("\n".join(selected_previews)), capture_output=True, ) if process.returncode != 0: logger.error(process.stderr) return make_response( jsonify({"success": False, "message": "Unable to create preview gif"}), 500, ) gif_bytes = process.stdout response = make_response(gif_bytes) response.headers["Content-Type"] = "image/gif" response.headers["Cache-Control"] = f"private, max-age={max_cache_age}" return response @bp.route("/timeline") def timeline(): camera = request.args.get("camera", "all") source_id = request.args.get("source_id", type=str) limit = request.args.get("limit", 100) clauses = [] selected_columns = [ Timeline.timestamp, Timeline.camera, Timeline.source, Timeline.source_id, Timeline.class_type, Timeline.data, ] if camera != "all": clauses.append((Timeline.camera == camera)) if source_id: clauses.append((Timeline.source_id == source_id)) if len(clauses) == 0: clauses.append((True)) timeline = ( Timeline.select(*selected_columns) .where(reduce(operator.and_, clauses)) .order_by(Timeline.timestamp.asc()) .limit(limit) .dicts() ) return jsonify([t for t in timeline]) @bp.route("/timeline/hourly") def hourly_timeline(): """Get hourly summary for timeline.""" cameras = request.args.get("cameras", "all") labels = request.args.get("labels", "all") before = request.args.get("before", type=float) after = request.args.get("after", type=float) limit = request.args.get("limit", 200) tz_name = request.args.get("timezone", default="utc", type=str) _, minute_modifier, _ = get_tz_modifiers(tz_name) minute_offset = int(minute_modifier.split(" ")[0]) clauses = [] if cameras != "all": camera_list = cameras.split(",") clauses.append((Timeline.camera << camera_list)) if labels != "all": label_list = labels.split(",") clauses.append((Timeline.data["label"] << label_list)) if before: clauses.append((Timeline.timestamp < before)) if after: clauses.append((Timeline.timestamp > after)) if len(clauses) == 0: clauses.append((True)) timeline = ( Timeline.select( Timeline.camera, Timeline.timestamp, Timeline.data, Timeline.class_type, Timeline.source_id, Timeline.source, ) .where(reduce(operator.and_, clauses)) .order_by(Timeline.timestamp.desc()) .limit(limit) .dicts() .iterator() ) count = 0 start = 0 end = 0 hours: dict[str, list[dict[str, any]]] = {} for t in timeline: if count == 0: start = t["timestamp"] else: end = t["timestamp"] count += 1 hour = ( datetime.fromtimestamp(t["timestamp"]).replace( minute=0, second=0, microsecond=0 ) + timedelta( minutes=minute_offset, ) ).timestamp() if hour not in hours: hours[hour] = [t] else: hours[hour].insert(0, t) return jsonify( { "start": start, "end": end, "count": count, "hours": hours, } ) @bp.route("//recording/hourly/activity") def hourly_timeline_activity(camera_name: str): """Get hourly summary for timeline.""" if camera_name not in current_app.frigate_config.cameras: return make_response( jsonify({"success": False, "message": "Camera not found"}), 404, ) before = request.args.get("before", type=float, default=datetime.now()) after = request.args.get( "after", type=float, default=datetime.now() - timedelta(hours=1) ) tz_name = request.args.get("timezone", default="utc", type=str) _, minute_modifier, _ = get_tz_modifiers(tz_name) minute_offset = int(minute_modifier.split(" ")[0]) all_recordings: list[Recordings] = ( Recordings.select( Recordings.start_time, Recordings.duration, Recordings.objects, Recordings.motion, ) .where(Recordings.camera == camera_name) .where(Recordings.motion > 0) .where((Recordings.start_time > after) & (Recordings.end_time < before)) .order_by(Recordings.start_time.asc()) .iterator() ) # data format is ex: # {timestamp: [{ date: 1, count: 1, type: motion }]}] }} hours: dict[int, list[dict[str, any]]] = defaultdict(list) key = datetime.fromtimestamp(after).replace(second=0, microsecond=0) + timedelta( minutes=minute_offset ) check = (key + timedelta(hours=1)).timestamp() # set initial start so data is representative of full hour hours[int(key.timestamp())].append( [ key.timestamp(), 0, False, ] ) for recording in all_recordings: if recording.start_time > check: hours[int(key.timestamp())].append( [ (key + timedelta(minutes=59, seconds=59)).timestamp(), 0, False, ] ) key = key + timedelta(hours=1) check = (key + timedelta(hours=1)).timestamp() hours[int(key.timestamp())].append( [ key.timestamp(), 0, False, ] ) data_type = recording.objects > 0 count = recording.motion + recording.objects hours[int(key.timestamp())].append( [ recording.start_time + (recording.duration / 2), 0 if count == 0 else np.log2(count), data_type, ] ) # resample data using pandas to get activity on minute to minute basis for key, data in hours.items(): df = pd.DataFrame(data, columns=["date", "count", "hasObjects"]) # set date as datetime index df["date"] = pd.to_datetime(df["date"], unit="s") df.set_index(["date"], inplace=True) # normalize data df = df.resample("T").mean().fillna(0) # change types for output df.index = df.index.astype(int) // (10**9) df["count"] = df["count"].astype(int) df["hasObjects"] = df["hasObjects"].astype(bool) hours[key] = df.reset_index().to_dict("records") return jsonify(hours) @bp.route("//