"""Image and video apis.""" import base64 import glob import logging import os import re import subprocess as sp import time from datetime import datetime, timedelta, timezone from urllib.parse import unquote import cv2 import numpy as np import pytz from flask import ( Blueprint, Response, current_app, jsonify, make_response, request, ) from peewee import DoesNotExist, fn from tzlocal import get_localzone_name from werkzeug.utils import secure_filename from frigate.const import ( CACHE_DIR, CLIPS_DIR, EXPORT_DIR, MAX_SEGMENT_DURATION, RECORD_DIR, ) from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment from frigate.record.export import PlaybackFactorEnum, RecordingExporter from frigate.util.builtin import ( get_tz_modifiers, ) logger = logging.getLogger(__name__) MediaBp = Blueprint("media", __name__) @MediaBp.route("/") def mjpeg_feed(camera_name): fps = int(request.args.get("fps", "3")) height = int(request.args.get("h", "360")) draw_options = { "bounding_boxes": request.args.get("bbox", type=int), "timestamp": request.args.get("timestamp", type=int), "zones": request.args.get("zones", type=int), "mask": request.args.get("mask", type=int), "motion_boxes": request.args.get("motion", type=int), "regions": request.args.get("regions", type=int), } if camera_name in current_app.frigate_config.cameras: # return a multipart response return Response( imagestream( current_app.detected_frames_processor, camera_name, fps, height, draw_options, ), mimetype="multipart/x-mixed-replace; boundary=frame", ) else: return make_response( jsonify({"success": False, "message": "Camera not found"}), 404, ) def imagestream(detected_frames_processor, camera_name, fps, height, draw_options): while True: # max out at specified FPS time.sleep(1 / fps) frame = detected_frames_processor.get_current_frame(camera_name, draw_options) if frame is None: frame = np.zeros((height, int(height * 16 / 9), 3), np.uint8) width = int(height * frame.shape[1] / frame.shape[0]) frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_LINEAR) ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + jpg.tobytes() + b"\r\n\r\n" ) @MediaBp.route("//ptz/info") def camera_ptz_info(camera_name): if camera_name in current_app.frigate_config.cameras: return jsonify(current_app.onvif.get_camera_info(camera_name)) else: return make_response( jsonify({"success": False, "message": "Camera not found"}), 404, ) @MediaBp.route("//latest.jpg") def latest_frame(camera_name): draw_options = { "bounding_boxes": request.args.get("bbox", type=int), "timestamp": request.args.get("timestamp", type=int), "zones": request.args.get("zones", type=int), "mask": request.args.get("mask", type=int), "motion_boxes": request.args.get("motion", type=int), "regions": request.args.get("regions", type=int), } resize_quality = request.args.get("quality", default=70, type=int) if camera_name in current_app.frigate_config.cameras: frame = current_app.detected_frames_processor.get_current_frame( camera_name, draw_options ) retry_interval = float( current_app.frigate_config.cameras.get(camera_name).ffmpeg.retry_interval or 10 ) if frame is None or datetime.now().timestamp() > ( current_app.detected_frames_processor.get_current_frame_time(camera_name) + retry_interval ): if current_app.camera_error_image is None: error_image = glob.glob("/opt/frigate/frigate/images/camera-error.jpg") if len(error_image) > 0: current_app.camera_error_image = cv2.imread( error_image[0], cv2.IMREAD_UNCHANGED ) frame = current_app.camera_error_image height = int(request.args.get("h", str(frame.shape[0]))) width = int(height * frame.shape[1] / frame.shape[0]) if frame is None: return make_response( jsonify({"success": False, "message": "Unable to get valid frame"}), 500, ) if height < 1 or width < 1: return ( "Invalid height / width requested :: {} / {}".format(height, width), 400, ) frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA) ret, jpg = cv2.imencode( ".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), resize_quality] ) response = make_response(jpg.tobytes()) response.headers["Content-Type"] = "image/jpeg" response.headers["Cache-Control"] = "no-store" return response elif camera_name == "birdseye" and current_app.frigate_config.birdseye.restream: frame = cv2.cvtColor( current_app.detected_frames_processor.get_current_frame(camera_name), cv2.COLOR_YUV2BGR_I420, ) height = int(request.args.get("h", str(frame.shape[0]))) width = int(height * frame.shape[1] / frame.shape[0]) frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA) ret, jpg = cv2.imencode( ".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), resize_quality] ) response = make_response(jpg.tobytes()) response.headers["Content-Type"] = "image/jpeg" response.headers["Cache-Control"] = "no-store" return response else: return make_response( jsonify({"success": False, "message": "Camera not found"}), 404, ) @MediaBp.route("//recordings//snapshot.png") def get_snapshot_from_recording(camera_name: str, frame_time: str): if camera_name not in current_app.frigate_config.cameras: return make_response( jsonify({"success": False, "message": "Camera not found"}), 404, ) frame_time = float(frame_time) recording_query = ( Recordings.select( Recordings.path, Recordings.start_time, ) .where( ( (frame_time >= Recordings.start_time) & (frame_time <= Recordings.end_time) ) ) .where(Recordings.camera == camera_name) .order_by(Recordings.start_time.desc()) .limit(1) ) try: recording: Recordings = recording_query.get() time_in_segment = frame_time - recording.start_time ffmpeg_cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "warning", "-ss", f"00:00:{time_in_segment}", "-i", recording.path, "-frames:v", "1", "-c:v", "png", "-f", "image2pipe", "-", ] process = sp.run( ffmpeg_cmd, capture_output=True, ) response = make_response(process.stdout) response.headers["Content-Type"] = "image/png" return response except DoesNotExist: return make_response( jsonify( { "success": False, "message": "Recording not found at {}".format(frame_time), } ), 404, ) @MediaBp.route("/recordings/storage", methods=["GET"]) def get_recordings_storage_usage(): recording_stats = current_app.stats_emitter.get_latest_stats()["service"][ "storage" ][RECORD_DIR] if not recording_stats: return jsonify({}) total_mb = recording_stats["total"] camera_usages: dict[str, dict] = ( current_app.storage_maintainer.calculate_camera_usages() ) for camera_name in camera_usages.keys(): if camera_usages.get(camera_name, {}).get("usage"): camera_usages[camera_name]["usage_percent"] = ( camera_usages.get(camera_name, {}).get("usage", 0) / total_mb ) * 100 return jsonify(camera_usages) # return hourly summary for recordings of camera @MediaBp.route("//recordings/summary") def recordings_summary(camera_name): tz_name = request.args.get("timezone", default="utc", type=str) hour_modifier, minute_modifier, seconds_offset = get_tz_modifiers(tz_name) recording_groups = ( Recordings.select( fn.strftime( "%Y-%m-%d %H", fn.datetime( Recordings.start_time, "unixepoch", hour_modifier, minute_modifier ), ).alias("hour"), fn.SUM(Recordings.duration).alias("duration"), fn.SUM(Recordings.motion).alias("motion"), fn.SUM(Recordings.objects).alias("objects"), ) .where(Recordings.camera == camera_name) .group_by((Recordings.start_time + seconds_offset).cast("int") / 3600) .order_by(Recordings.start_time.desc()) .namedtuples() ) event_groups = ( Event.select( fn.strftime( "%Y-%m-%d %H", fn.datetime( Event.start_time, "unixepoch", hour_modifier, minute_modifier ), ).alias("hour"), fn.COUNT(Event.id).alias("count"), ) .where(Event.camera == camera_name, Event.has_clip) .group_by((Event.start_time + seconds_offset).cast("int") / 3600) .namedtuples() ) event_map = {g.hour: g.count for g in event_groups} days = {} for recording_group in recording_groups: parts = recording_group.hour.split() hour = parts[1] day = parts[0] events_count = event_map.get(recording_group.hour, 0) hour_data = { "hour": hour, "events": events_count, "motion": recording_group.motion, "objects": recording_group.objects, "duration": round(recording_group.duration), } if day not in days: days[day] = {"events": events_count, "hours": [hour_data], "day": day} else: days[day]["events"] += events_count days[day]["hours"].append(hour_data) return jsonify(list(days.values())) # return hour of recordings data for camera @MediaBp.route("//recordings") def recordings(camera_name): after = request.args.get( "after", type=float, default=(datetime.now() - timedelta(hours=1)).timestamp() ) before = request.args.get("before", type=float, default=datetime.now().timestamp()) recordings = ( Recordings.select( Recordings.id, Recordings.start_time, Recordings.end_time, Recordings.segment_size, Recordings.motion, Recordings.objects, Recordings.duration, ) .where( Recordings.camera == camera_name, Recordings.end_time >= after, Recordings.start_time <= before, ) .order_by(Recordings.start_time) .dicts() .iterator() ) return jsonify(list(recordings)) @MediaBp.route("//start//end//clip.mp4") @MediaBp.route("//start//end//clip.mp4") def recording_clip(camera_name, start_ts, end_ts): download = request.args.get("download", type=bool) recordings = ( Recordings.select( Recordings.path, Recordings.start_time, Recordings.end_time, ) .where( (Recordings.start_time.between(start_ts, end_ts)) | (Recordings.end_time.between(start_ts, end_ts)) | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) ) .where(Recordings.camera == camera_name) .order_by(Recordings.start_time.asc()) ) playlist_lines = [] clip: Recordings for clip in recordings: playlist_lines.append(f"file '{clip.path}'") # if this is the starting clip, add an inpoint if clip.start_time < start_ts: playlist_lines.append(f"inpoint {int(start_ts - clip.start_time)}") # if this is the ending clip, add an outpoint if clip.end_time > end_ts: playlist_lines.append(f"outpoint {int(end_ts - clip.start_time)}") file_name = secure_filename(f"clip_{camera_name}_{start_ts}-{end_ts}.mp4") path = os.path.join(CACHE_DIR, file_name) if not os.path.exists(path): ffmpeg_cmd = [ "ffmpeg", "-hide_banner", "-y", "-protocol_whitelist", "pipe,file", "-f", "concat", "-safe", "0", "-i", "/dev/stdin", "-c", "copy", "-movflags", "+faststart", path, ] p = sp.run( ffmpeg_cmd, input="\n".join(playlist_lines), encoding="ascii", capture_output=True, ) if p.returncode != 0: logger.error(p.stderr) return make_response( jsonify( { "success": False, "message": "Could not create clip from recordings", } ), 500, ) else: logger.debug( f"Ignoring subsequent request for {path} as it already exists in the cache." ) response = make_response() response.headers["Content-Description"] = "File Transfer" response.headers["Cache-Control"] = "no-cache" response.headers["Content-Type"] = "video/mp4" if download: response.headers["Content-Disposition"] = "attachment; filename=%s" % file_name response.headers["Content-Length"] = os.path.getsize(path) response.headers["X-Accel-Redirect"] = ( f"/cache/{file_name}" # nginx: https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_ignore_headers ) return response @MediaBp.route("/vod//start//end/") @MediaBp.route("/vod//start//end/") def vod_ts(camera_name, start_ts, end_ts): recordings = ( Recordings.select(Recordings.path, Recordings.duration, Recordings.end_time) .where( Recordings.start_time.between(start_ts, end_ts) | Recordings.end_time.between(start_ts, end_ts) | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) ) .where(Recordings.camera == camera_name) .order_by(Recordings.start_time.asc()) .iterator() ) clips = [] durations = [] max_duration_ms = MAX_SEGMENT_DURATION * 1000 recording: Recordings for recording in recordings: clip = {"type": "source", "path": recording.path} duration = int(recording.duration * 1000) # Determine if we need to end the last clip early if recording.end_time > end_ts: duration -= int((recording.end_time - end_ts) * 1000) if 0 < duration < max_duration_ms: clip["keyFrameDurations"] = [duration] clips.append(clip) durations.append(duration) else: logger.warning(f"Recording clip is missing or empty: {recording.path}") if not clips: logger.error("No recordings found for the requested time range") return make_response( jsonify( { "success": False, "message": "No recordings found.", } ), 404, ) hour_ago = datetime.now() - timedelta(hours=1) return jsonify( { "cache": hour_ago.timestamp() > start_ts, "discontinuity": False, "consistentSequenceMediaInfo": True, "durations": durations, "segment_duration": max(durations), "sequences": [{"clips": clips}], } ) @MediaBp.route("/vod////") def vod_hour_no_timezone(year_month, day, hour, camera_name): return vod_hour( year_month, day, hour, camera_name, get_localzone_name().replace("/", ",") ) @MediaBp.route("/vod/////") def vod_hour(year_month, day, hour, camera_name, tz_name): parts = year_month.split("-") start_date = ( datetime(int(parts[0]), int(parts[1]), int(day), int(hour), tzinfo=timezone.utc) - datetime.now(pytz.timezone(tz_name.replace(",", "/"))).utcoffset() ) end_date = start_date + timedelta(hours=1) - timedelta(milliseconds=1) start_ts = start_date.timestamp() end_ts = end_date.timestamp() return vod_ts(camera_name, start_ts, end_ts) @MediaBp.route("/vod/event/") def vod_event(id): try: event: Event = Event.get(Event.id == id) except DoesNotExist: logger.error(f"Event not found: {id}") return make_response( jsonify( { "success": False, "message": "Event not found.", } ), 404, ) if not event.has_clip: logger.error(f"Event does not have recordings: {id}") return make_response( jsonify( { "success": False, "message": "Recordings not available.", } ), 404, ) clip_path = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.mp4") if not os.path.isfile(clip_path): end_ts = ( datetime.now().timestamp() if event.end_time is None else event.end_time ) vod_response = vod_ts(event.camera, event.start_time, end_ts) # If the recordings are not found and the event started more than 5 minutes ago, set has_clip to false if ( event.start_time < datetime.now().timestamp() - 300 and type(vod_response) == tuple and len(vod_response) == 2 and vod_response[1] == 404 ): Event.update(has_clip=False).where(Event.id == id).execute() return vod_response duration = int((event.end_time - event.start_time) * 1000) return jsonify( { "cache": True, "discontinuity": False, "durations": [duration], "sequences": [{"clips": [{"type": "source", "path": clip_path}]}], } ) @MediaBp.route( "/export//start//end/", methods=["POST"] ) @MediaBp.route( "/export//start//end/", methods=["POST"], ) def export_recording(camera_name: str, start_time, end_time): if not camera_name or not current_app.frigate_config.cameras.get(camera_name): return make_response( jsonify( {"success": False, "message": f"{camera_name} is not a valid camera."} ), 404, ) json: dict[str, any] = request.get_json(silent=True) or {} playback_factor = json.get("playback", "realtime") recordings_count = ( Recordings.select() .where( Recordings.start_time.between(start_time, end_time) | Recordings.end_time.between(start_time, end_time) | ((start_time > Recordings.start_time) & (end_time < Recordings.end_time)) ) .where(Recordings.camera == camera_name) .count() ) if recordings_count <= 0: return make_response( jsonify( {"success": False, "message": "No recordings found for time range"} ), 400, ) exporter = RecordingExporter( current_app.frigate_config, camera_name, int(start_time), int(end_time), ( PlaybackFactorEnum[playback_factor] if playback_factor in PlaybackFactorEnum.__members__.values() else PlaybackFactorEnum.realtime ), ) exporter.start() return make_response( jsonify( { "success": True, "message": "Starting export of recording.", } ), 200, ) def export_filename_check_extension(filename: str): if filename.endswith(".mp4"): return filename else: return filename + ".mp4" def export_filename_is_valid(filename: str): if re.search(r"[^:_A-Za-z0-9]", filename) or filename.startswith("in_progress."): return False else: return True @MediaBp.route("/export//", methods=["PATCH"]) def export_rename(file_name_current, file_name_new: str): safe_file_name_current = secure_filename( export_filename_check_extension(file_name_current) ) file_current = os.path.join(EXPORT_DIR, safe_file_name_current) if not os.path.exists(file_current): return make_response( jsonify({"success": False, "message": f"{file_name_current} not found."}), 404, ) if not export_filename_is_valid(file_name_new): return make_response( jsonify( { "success": False, "message": f"{file_name_new} contains illegal characters.", } ), 400, ) safe_file_name_new = secure_filename(export_filename_check_extension(file_name_new)) file_new = os.path.join(EXPORT_DIR, safe_file_name_new) if os.path.exists(file_new): return make_response( jsonify({"success": False, "message": f"{file_name_new} already exists."}), 400, ) os.rename(file_current, file_new) return make_response( jsonify( { "success": True, "message": "Successfully renamed file.", } ), 200, ) @MediaBp.route("/export/", methods=["DELETE"]) def export_delete(file_name: str): safe_file_name = secure_filename(export_filename_check_extension(file_name)) file = os.path.join(EXPORT_DIR, safe_file_name) if not os.path.exists(file): return make_response( jsonify({"success": False, "message": f"{file_name} not found."}), 404, ) os.unlink(file) return make_response( jsonify( { "success": True, "message": "Successfully deleted file.", } ), 200, ) @MediaBp.route("//