From cfda531f5a1779f7836ef87234f9c9b4e3686e46 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Sun, 3 Dec 2023 07:16:01 -0700 Subject: [PATCH] Write a low resolution low fps stream from decoded frames (#8673) * Generate low res low fps previews for recordings viewer * Make sure previews end on the hour * Fix durations and decrase keyframe interval to ensure smooth scrubbing * Ensure minimized resolution is compatible with yuv * Add ability to configure preview quality * Fix * Clean up previews more efficiently * Use iterator --- docs/docs/configuration/reference.md | 5 + frigate/app.py | 14 +- frigate/comms/dispatcher.py | 6 +- frigate/config.py | 17 ++ frigate/const.py | 1 + frigate/ffmpeg_presets.py | 13 ++ frigate/http.py | 63 ++++- frigate/models.py | 9 + frigate/{output.py => output/birdseye.py} | 213 ++++------------- frigate/output/camera.py | 165 ++++++++++++++ frigate/output/output.py | 155 +++++++++++++ frigate/output/preview.py | 265 ++++++++++++++++++++++ frigate/record/cleanup.py | 222 +++++++++++------- frigate/test/test_birdseye.py | 2 +- frigate/util/image.py | 2 +- migrations/021_create_previews_table.py | 35 +++ 16 files changed, 939 insertions(+), 248 deletions(-) rename frigate/{output.py => output/birdseye.py} (81%) create mode 100644 frigate/output/camera.py create mode 100644 frigate/output/output.py create mode 100644 frigate/output/preview.py create mode 100644 migrations/021_create_previews_table.py diff --git a/docs/docs/configuration/reference.md b/docs/docs/configuration/reference.md index d500060a7..cb10b7bcb 100644 --- a/docs/docs/configuration/reference.md +++ b/docs/docs/configuration/reference.md @@ -325,6 +325,11 @@ record: # The -r (framerate) dictates how smooth the output video is. # So the args would be -vf setpts=0.02*PTS -r 30 in that case. timelapse_args: "-vf setpts=0.04*PTS -r 30" + # Optional: Recording Preview Settings + preview: + # Optional: Quality of recording preview (default: shown below). + # Options are: very_low, low, medium, high, very_high + quality: medium # Optional: Event recording settings events: # Optional: Number of seconds before the event to include (default: shown below) diff --git a/frigate/app.py b/frigate/app.py index 4a3cf48d6..5aa738d93 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -37,10 +37,17 @@ from frigate.events.external import ExternalEventProcessor from frigate.events.maintainer import EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer -from frigate.models import Event, Recordings, RecordingsToDelete, Regions, Timeline +from frigate.models import ( + Event, + Previews, + Recordings, + RecordingsToDelete, + Regions, + Timeline, +) from frigate.object_detection import ObjectDetectProcess from frigate.object_processing import TrackedObjectProcessor -from frigate.output import output_frames +from frigate.output.output import output_frames from frigate.plus import PlusApi from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController @@ -369,7 +376,7 @@ class FrigateApp: 60, 10 * len([c for c in self.config.cameras.values() if c.enabled]) ), ) - models = [Event, Recordings, RecordingsToDelete, Regions, Timeline] + models = [Event, Recordings, RecordingsToDelete, Previews, Regions, Timeline] self.db.bind(models) def init_stats(self) -> None: @@ -488,6 +495,7 @@ class FrigateApp: args=( self.config, self.video_output_queue, + self.inter_process_queue, self.camera_metrics, ), ) diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 010154bef..d83371c01 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -5,8 +5,8 @@ from abc import ABC, abstractmethod from typing import Any, Callable from frigate.config import BirdseyeModeEnum, FrigateConfig -from frigate.const import INSERT_MANY_RECORDINGS, REQUEST_REGION_GRID -from frigate.models import Recordings +from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID +from frigate.models import Previews, Recordings from frigate.ptz.onvif import OnvifCommandEnum, OnvifController from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes from frigate.util.object import get_camera_regions_grid @@ -102,6 +102,8 @@ class Dispatcher: max(self.config.model.width, self.config.model.height), ) ) + elif topic == INSERT_PREVIEW: + Previews.insert(payload).execute() else: self.publish(topic, payload, retain=False) diff --git a/frigate/config.py b/frigate/config.py index 6760ea5e6..59ca519fa 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -260,6 +260,20 @@ class RecordExportConfig(FrigateBaseModel): ) +class RecordQualityEnum(str, Enum): + very_low = "very_low" + low = "low" + medium = "medium" + high = "high" + very_high = "very_high" + + +class RecordPreviewConfig(FrigateBaseModel): + quality: RecordQualityEnum = Field( + default=RecordQualityEnum.medium, title="Quality of recording preview." + ) + + class RecordConfig(FrigateBaseModel): enabled: bool = Field(default=False, title="Enable record on all cameras.") sync_recordings: bool = Field( @@ -278,6 +292,9 @@ class RecordConfig(FrigateBaseModel): export: RecordExportConfig = Field( default_factory=RecordExportConfig, title="Recording Export Config" ) + preview: RecordPreviewConfig = Field( + default_factory=RecordPreviewConfig, title="Recording Preview Config" + ) enabled_in_config: Optional[bool] = Field( title="Keep track of original state of recording." ) diff --git a/frigate/const.py b/frigate/const.py index ebb680333..28bc95f2e 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -59,6 +59,7 @@ MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to # Internal Comms Topics INSERT_MANY_RECORDINGS = "insert_many_recordings" +INSERT_PREVIEW = "insert_preview" REQUEST_REGION_GRID = "request_region_grid" # Autotracking diff --git a/frigate/ffmpeg_presets.py b/frigate/ffmpeg_presets.py index bb8848a0c..fe672b25e 100644 --- a/frigate/ffmpeg_presets.py +++ b/frigate/ffmpeg_presets.py @@ -42,6 +42,11 @@ class LibvaGpuSelector: return "" +FPS_VFR_PARAM = ( + "-fps_mode vfr" + if int(os.getenv("LIBAVFORMAT_VERSION_MAJOR", "59")) >= 59 + else "-vsync 2" +) TIMEOUT_PARAM = ( "-timeout" if int(os.getenv("LIBAVFORMAT_VERSION_MAJOR", "59")) >= 59 @@ -114,6 +119,11 @@ PRESETS_HW_ACCEL_ENCODE_TIMELAPSE = { "default": "ffmpeg -hide_banner {0} -c:v libx264 -preset:v ultrafast -tune:v zerolatency {1}", } +# encoding of previews is only done on CPU due to comparable encode times and better quality from libx264 +PRESETS_HW_ACCEL_ENCODE_PREVIEW = { + "default": "ffmpeg -hide_banner {0} -c:v libx264 -profile:v baseline -preset:v ultrafast {1}", +} + def parse_preset_hardware_acceleration_decode( arg: Any, @@ -153,6 +163,7 @@ def parse_preset_hardware_acceleration_scale( class EncodeTypeEnum(str, Enum): birdseye = "birdseye" + preview = "preview" timelapse = "timelapse" @@ -162,6 +173,8 @@ def parse_preset_hardware_acceleration_encode( """Return the correct scaling preset or default preset if none is set.""" if type == EncodeTypeEnum.birdseye: arg_map = PRESETS_HW_ACCEL_ENCODE_BIRDSEYE + elif type == EncodeTypeEnum.preview: + arg_map = PRESETS_HW_ACCEL_ENCODE_PREVIEW elif type == EncodeTypeEnum.timelapse: arg_map = PRESETS_HW_ACCEL_ENCODE_TIMELAPSE diff --git a/frigate/http.py b/frigate/http.py index d9bd5c29f..c43feee9f 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -43,7 +43,7 @@ from frigate.const import ( RECORD_DIR, ) from frigate.events.external import ExternalEventProcessor -from frigate.models import Event, Recordings, Regions, Timeline +from frigate.models import Event, Previews, Recordings, Regions, Timeline from frigate.object_processing import TrackedObject from frigate.plus import PlusApi from frigate.ptz.onvif import OnvifController @@ -1845,7 +1845,6 @@ def vod_hour_no_timezone(year_month, day, hour, camera_name): ) -# TODO make this nicer when vod module is removed @bp.route("/vod/////") def vod_hour(year_month, day, hour, camera_name, tz_name): parts = year_month.split("-") @@ -1860,6 +1859,66 @@ def vod_hour(year_month, day, hour, camera_name, tz_name): return vod_ts(camera_name, start_ts, end_ts) +@bp.route("/preview//start//end/") +@bp.route("/preview//start//end/") +def preview_ts(camera_name, start_ts, end_ts): + """Get all mp4 previews relevant for time period.""" + previews = ( + Previews.select( + Previews.path, Previews.duration, Previews.start_time, Previews.end_time + ) + .where( + Previews.start_time.between(start_ts, end_ts) + | Previews.end_time.between(start_ts, end_ts) + | ((start_ts > Previews.start_time) & (end_ts < Previews.end_time)) + ) + .where(Previews.camera == camera_name) + .order_by(Previews.start_time.asc()) + .iterator() + ) + + clips = [] + + preview: Previews + for preview in previews: + clips.append( + { + "src": preview.path.replace("/media/frigate", ""), + "type": "video/mp4", + "start": preview.start_time, + "end": preview.end_time, + } + ) + + if not clips: + logger.error("No previews found for the requested time range") + return make_response( + jsonify( + { + "success": False, + "message": "No previews found.", + } + ), + 404, + ) + + return make_response(jsonify(clips), 200) + + +@bp.route("/preview/////") +def preview_hour(year_month, day, hour, camera_name, tz_name): + parts = year_month.split("-") + start_date = ( + datetime(int(parts[0]), int(parts[1]), int(day), int(hour), tzinfo=timezone.utc) + - datetime.now(pytz.timezone(tz_name.replace(",", "/"))).utcoffset() + ) + end_date = start_date + timedelta(hours=1) - timedelta(milliseconds=1) + start_ts = start_date.timestamp() + end_ts = end_date.timestamp() + + return preview_ts(camera_name, start_ts, end_ts) + + @bp.route("/vod/event/") def vod_event(id): try: diff --git a/frigate/models.py b/frigate/models.py index 65cbfbaac..56d429b19 100644 --- a/frigate/models.py +++ b/frigate/models.py @@ -76,6 +76,15 @@ class Recordings(Model): # type: ignore[misc] segment_size = FloatField(default=0) # this should be stored as MB +class Previews(Model): # type: ignore[misc] + id = CharField(null=False, primary_key=True, max_length=30) + camera = CharField(index=True, max_length=20) + path = CharField(unique=True) + start_time = DateTimeField() + end_time = DateTimeField() + duration = FloatField() + + # Used for temporary table in record/cleanup.py class RecordingsToDelete(Model): # type: ignore[misc] id = CharField(null=False, primary_key=False, max_length=30) diff --git a/frigate/output.py b/frigate/output/birdseye.py similarity index 81% rename from frigate/output.py rename to frigate/output/birdseye.py index a70e5a804..94830d695 100644 --- a/frigate/output.py +++ b/frigate/output/birdseye.py @@ -1,3 +1,5 @@ +"""Handle outputting birdseye frames via jsmpeg and go2rtc.""" + import datetime import glob import logging @@ -5,23 +7,13 @@ import math import multiprocessing as mp import os import queue -import signal import subprocess as sp import threading import traceback -from wsgiref.simple_server import make_server import cv2 import numpy as np -from setproctitle import setproctitle -from ws4py.server.wsgirefserver import ( - WebSocketWSGIHandler, - WebSocketWSGIRequestHandler, - WSGIServer, -) -from ws4py.server.wsgiutils import WebSocketWSGIApplication -from frigate.comms.ws import WebSocket from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.const import BASE_DIR, BIRDSEYE_PIPE from frigate.types import CameraMetricsTypes @@ -672,66 +664,19 @@ class BirdsEyeFrameManager: return False -def output_frames( - config: FrigateConfig, - video_output_queue, - camera_metrics: dict[str, CameraMetricsTypes], -): - threading.current_thread().name = "output" - setproctitle("frigate.output") - - stop_event = mp.Event() - - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - frame_manager = SharedMemoryFrameManager() - previous_frames = {} - - # start a websocket server on 8082 - WebSocketWSGIHandler.http_version = "1.1" - websocket_server = make_server( - "127.0.0.1", - 8082, - server_class=WSGIServer, - handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=WebSocket), - ) - websocket_server.initialize_websockets_manager() - websocket_thread = threading.Thread(target=websocket_server.serve_forever) - - inputs: dict[str, queue.Queue] = {} - converters = {} - broadcasters = {} - - for camera, cam_config in config.cameras.items(): - inputs[camera] = queue.Queue(maxsize=cam_config.detect.fps) - width = int( - cam_config.live.height - * (cam_config.frame_shape[1] / cam_config.frame_shape[0]) - ) - converters[camera] = FFMpegConverter( - camera, - inputs[camera], - stop_event, - cam_config.frame_shape[1], - cam_config.frame_shape[0], - width, - cam_config.live.height, - cam_config.live.quality, - ) - broadcasters[camera] = BroadcastThread( - camera, converters[camera], websocket_server, stop_event - ) - - if config.birdseye.enabled: - inputs["birdseye"] = queue.Queue(maxsize=10) - converters["birdseye"] = FFMpegConverter( +class Birdseye: + def __init__( + self, + config: FrigateConfig, + camera_metrics: dict[str, CameraMetricsTypes], + stop_event: mp.Event, + websocket_server, + ) -> None: + self.config = config + self.input = queue.Queue(maxsize=10) + self.converter = FFMpegConverter( "birdseye", - inputs["birdseye"], + self.input, stop_event, config.birdseye.width, config.birdseye.height, @@ -740,107 +685,49 @@ def output_frames( config.birdseye.quality, config.birdseye.restream, ) - broadcasters["birdseye"] = BroadcastThread( - "birdseye", - converters["birdseye"], - websocket_server, - stop_event, + self.broadcaster = BroadcastThread( + "birdseye", self.converter, websocket_server, stop_event + ) + frame_manager = SharedMemoryFrameManager() + self.birdseye_manager = BirdsEyeFrameManager( + config, frame_manager, stop_event, camera_metrics ) - websocket_thread.start() + if config.birdseye.restream: + self.birdseye_buffer = frame_manager.create( + "birdseye", + self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1], + ) - for t in converters.values(): - t.start() + self.converter.start() + self.broadcaster.start() - for t in broadcasters.values(): - t.start() - - birdseye_manager = BirdsEyeFrameManager( - config, frame_manager, stop_event, camera_metrics - ) - - if config.birdseye.restream: - birdseye_buffer = frame_manager.create( - "birdseye", - birdseye_manager.yuv_shape[0] * birdseye_manager.yuv_shape[1], - ) - - while not stop_event.is_set(): - try: - ( - camera, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = video_output_queue.get(True, 1) - except queue.Empty: - continue - - frame_id = f"{camera}{frame_time}" - - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - - # send camera frame to ffmpeg process if websockets are connected - if any( - ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager + def write_data( + self, + camera: str, + current_tracked_objects: list[dict[str, any]], + motion_boxes: list[list[int]], + frame_time: float, + frame, + ) -> None: + if self.birdseye_manager.update( + camera, + len([o for o in current_tracked_objects if not o["stationary"]]), + len(motion_boxes), + frame_time, + frame, ): - # write to the converter for the camera if clients are listening to the specific camera + frame_bytes = self.birdseye_manager.frame.tobytes() + + if self.config.birdseye.restream: + self.birdseye_buffer[:] = frame_bytes + try: - inputs[camera].put_nowait(frame.tobytes()) + self.input.put_nowait(frame_bytes) except queue.Full: # drop frames if queue is full pass - if config.birdseye.enabled and ( - config.birdseye.restream - or any( - ws.environ["PATH_INFO"].endswith("birdseye") - for ws in websocket_server.manager - ) - ): - if birdseye_manager.update( - camera, - len([o for o in current_tracked_objects if not o["stationary"]]), - len(motion_boxes), - frame_time, - frame, - ): - frame_bytes = birdseye_manager.frame.tobytes() - - if config.birdseye.restream: - birdseye_buffer[:] = frame_bytes - - try: - inputs["birdseye"].put_nowait(frame_bytes) - except queue.Full: - # drop frames if queue is full - pass - - if camera in previous_frames: - frame_manager.delete(f"{camera}{previous_frames[camera]}") - - previous_frames[camera] = frame_time - - while not video_output_queue.empty(): - ( - camera, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = video_output_queue.get(True, 10) - - frame_id = f"{camera}{frame_time}" - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - frame_manager.delete(frame_id) - - for b in broadcasters.values(): - b.join() - - websocket_server.manager.close_all() - websocket_server.manager.stop() - websocket_server.manager.join() - websocket_server.shutdown() - websocket_thread.join() - logger.info("exiting output process...") + def stop(self) -> None: + self.converter.join() + self.broadcaster.join() diff --git a/frigate/output/camera.py b/frigate/output/camera.py new file mode 100644 index 000000000..b63e0983a --- /dev/null +++ b/frigate/output/camera.py @@ -0,0 +1,165 @@ +"""Handle outputting individual cameras via jsmpeg.""" + +import logging +import multiprocessing as mp +import queue +import subprocess as sp +import threading + +from frigate.config import CameraConfig + +logger = logging.getLogger(__name__) + + +class FFMpegConverter(threading.Thread): + def __init__( + self, + camera: str, + input_queue: queue.Queue, + stop_event: mp.Event, + in_width: int, + in_height: int, + out_width: int, + out_height: int, + quality: int, + ): + threading.Thread.__init__(self) + self.name = f"{camera}_output_converter" + self.camera = camera + self.input_queue = input_queue + self.stop_event = stop_event + + ffmpeg_cmd = [ + "ffmpeg", + "-f", + "rawvideo", + "-pix_fmt", + "yuv420p", + "-video_size", + f"{in_width}x{in_height}", + "-i", + "pipe:", + "-f", + "mpegts", + "-s", + f"{out_width}x{out_height}", + "-codec:v", + "mpeg1video", + "-q", + f"{quality}", + "-bf", + "0", + "pipe:", + ] + + self.process = sp.Popen( + ffmpeg_cmd, + stdout=sp.PIPE, + stderr=sp.DEVNULL, + stdin=sp.PIPE, + start_new_session=True, + ) + + def __write(self, b) -> None: + self.process.stdin.write(b) + + def read(self, length): + try: + return self.process.stdout.read1(length) + except ValueError: + return False + + def exit(self): + self.process.terminate() + + try: + self.process.communicate(timeout=30) + except sp.TimeoutExpired: + self.process.kill() + self.process.communicate() + + def run(self) -> None: + while not self.stop_event.is_set(): + try: + frame = self.input_queue.get(True, timeout=1) + self.__write(frame) + except queue.Empty: + pass + + self.exit() + + +class BroadcastThread(threading.Thread): + def __init__( + self, + camera: str, + converter: FFMpegConverter, + websocket_server, + stop_event: mp.Event, + ): + super(BroadcastThread, self).__init__() + self.camera = camera + self.converter = converter + self.websocket_server = websocket_server + self.stop_event = stop_event + + def run(self): + while not self.stop_event.is_set(): + buf = self.converter.read(65536) + if buf: + manager = self.websocket_server.manager + with manager.lock: + websockets = manager.websockets.copy() + ws_iter = iter(websockets.values()) + + for ws in ws_iter: + if ( + not ws.terminated + and ws.environ["PATH_INFO"] == f"/{self.camera}" + ): + try: + ws.send(buf, binary=True) + except ValueError: + pass + except (BrokenPipeError, ConnectionResetError) as e: + logger.debug(f"Websocket unexpectedly closed {e}") + elif self.converter.process.poll() is not None: + break + + +class JsmpegCamera: + def __init__( + self, config: CameraConfig, stop_event: mp.Event, websocket_server + ) -> None: + self.config = config + self.input = queue.Queue(maxsize=config.detect.fps) + width = int( + config.live.height * (config.frame_shape[1] / config.frame_shape[0]) + ) + self.converter = FFMpegConverter( + config.name, + self.input, + stop_event, + config.frame_shape[1], + config.frame_shape[0], + width, + config.live.height, + config.live.quality, + ) + self.broadcaster = BroadcastThread( + config.name, self.converter, websocket_server, stop_event + ) + + self.converter.start() + self.broadcaster.start() + + def write_frame(self, frame_bytes) -> None: + try: + self.input.put_nowait(frame_bytes) + except queue.Full: + # drop frames if queue is full + pass + + def stop(self) -> None: + self.converter.join() + self.broadcaster.join() diff --git a/frigate/output/output.py b/frigate/output/output.py new file mode 100644 index 000000000..7284dfa0b --- /dev/null +++ b/frigate/output/output.py @@ -0,0 +1,155 @@ +"""Handle outputting raw frigate frames""" + +import logging +import multiprocessing as mp +import queue +import signal +import threading +from typing import Optional +from wsgiref.simple_server import make_server + +from setproctitle import setproctitle +from ws4py.server.wsgirefserver import ( + WebSocketWSGIHandler, + WebSocketWSGIRequestHandler, + WSGIServer, +) +from ws4py.server.wsgiutils import WebSocketWSGIApplication + +from frigate.comms.ws import WebSocket +from frigate.config import FrigateConfig +from frigate.output.birdseye import Birdseye +from frigate.output.camera import JsmpegCamera +from frigate.output.preview import PreviewRecorder +from frigate.types import CameraMetricsTypes +from frigate.util.image import SharedMemoryFrameManager + +logger = logging.getLogger(__name__) + + +def output_frames( + config: FrigateConfig, + video_output_queue: mp.Queue, + inter_process_queue: mp.Queue, + camera_metrics: dict[str, CameraMetricsTypes], +): + threading.current_thread().name = "output" + setproctitle("frigate.output") + + stop_event = mp.Event() + + def receiveSignal(signalNumber, frame): + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + frame_manager = SharedMemoryFrameManager() + previous_frames = {} + + # start a websocket server on 8082 + WebSocketWSGIHandler.http_version = "1.1" + websocket_server = make_server( + "127.0.0.1", + 8082, + server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=WebSocket), + ) + websocket_server.initialize_websockets_manager() + websocket_thread = threading.Thread(target=websocket_server.serve_forever) + + jsmpeg_cameras: dict[str, JsmpegCamera] = {} + birdseye: Optional[Birdseye] = None + preview_recorders: dict[str, PreviewRecorder] = {} + + for camera, cam_config in config.cameras.items(): + if not cam_config.enabled: + continue + + jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server) + preview_recorders[camera] = PreviewRecorder(cam_config, inter_process_queue) + + if config.birdseye.enabled: + birdseye = Birdseye(config, camera_metrics, stop_event, websocket_server) + + websocket_thread.start() + + while not stop_event.is_set(): + try: + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = video_output_queue.get(True, 1) + except queue.Empty: + continue + + frame_id = f"{camera}{frame_time}" + + frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + + # send camera frame to ffmpeg process if websockets are connected + if any( + ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager + ): + # write to the converter for the camera if clients are listening to the specific camera + jsmpeg_cameras[camera].write_frame(frame.tobytes()) + + # send output data to birdseye if websocket is connected or restreaming + if config.birdseye.enabled and ( + config.birdseye.restream + or any( + ws.environ["PATH_INFO"].endswith("birdseye") + for ws in websocket_server.manager + ) + ): + birdseye.write_data( + camera, + current_tracked_objects, + motion_boxes, + frame_time, + frame, + ) + + # send frames for low fps recording + preview_recorders[camera].write_data( + current_tracked_objects, motion_boxes, frame_time, frame + ) + + # delete frames after they have been used for output + if camera in previous_frames: + frame_manager.delete(f"{camera}{previous_frames[camera]}") + + previous_frames[camera] = frame_time + + while not video_output_queue.empty(): + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = video_output_queue.get(True, 10) + + frame_id = f"{camera}{frame_time}" + frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + frame_manager.delete(frame_id) + + for jsmpeg in jsmpeg_cameras.values(): + jsmpeg.stop() + + for preview in preview_recorders.values(): + preview.stop() + + if birdseye is not None: + birdseye.stop() + + websocket_server.manager.close_all() + websocket_server.manager.stop() + websocket_server.manager.join() + websocket_server.shutdown() + websocket_thread.join() + logger.info("exiting output process...") diff --git a/frigate/output/preview.py b/frigate/output/preview.py new file mode 100644 index 000000000..3346274d4 --- /dev/null +++ b/frigate/output/preview.py @@ -0,0 +1,265 @@ +"""Handle outputting low res / fps preview segments from decoded frames.""" + +import datetime +import logging +import multiprocessing as mp +import os +import shutil +import subprocess as sp +import threading +from pathlib import Path + +import cv2 +import numpy as np + +from frigate.config import CameraConfig, RecordQualityEnum +from frigate.const import CACHE_DIR, CLIPS_DIR, INSERT_PREVIEW +from frigate.ffmpeg_presets import ( + FPS_VFR_PARAM, + EncodeTypeEnum, + parse_preset_hardware_acceleration_encode, +) +from frigate.models import Previews +from frigate.util.image import copy_yuv_to_position, get_yuv_crop + +logger = logging.getLogger(__name__) + +FOLDER_PREVIEW_FRAMES = "preview_frames" +PREVIEW_OUTPUT_FPS = 1 +PREVIEW_SEGMENT_DURATION = 3600 # one hour +# important to have lower keyframe to maintain scrubbing performance +PREVIEW_KEYFRAME_INTERVAL = 60 +PREVIEW_BIT_RATES = { + RecordQualityEnum.very_low: 4096, + RecordQualityEnum.low: 6144, + RecordQualityEnum.medium: 8192, + RecordQualityEnum.high: 12288, + RecordQualityEnum.very_high: 16384, +} + + +def get_cache_image_name(camera: str, frame_time: float) -> str: + """Get the image name in cache.""" + return os.path.join( + CACHE_DIR, + f"{FOLDER_PREVIEW_FRAMES}/preview_{camera}-{frame_time}.jpg", + ) + + +class FFMpegConverter(threading.Thread): + """Convert a list of jpg frames into a vfr mp4.""" + + def __init__( + self, + config: CameraConfig, + frame_times: list[float], + inter_process_queue: mp.Queue, + ): + threading.Thread.__init__(self) + self.name = f"{config.name}_preview_converter" + self.config = config + self.frame_times = frame_times + self.inter_process_queue = inter_process_queue + self.path = os.path.join( + CLIPS_DIR, + f"previews/{self.config.name}/{self.frame_times[0]}-{self.frame_times[-1]}.mp4", + ) + + # write a PREVIEW at fps and 1 key frame per clip + self.ffmpeg_cmd = parse_preset_hardware_acceleration_encode( + config.ffmpeg.hwaccel_args, + input="-f concat -y -protocol_whitelist pipe,file -safe 0 -i /dev/stdin", + output=f"-g {PREVIEW_KEYFRAME_INTERVAL} -fpsmax {PREVIEW_OUTPUT_FPS} -bf 0 -b:v {PREVIEW_BIT_RATES[self.config.record.preview.quality]} {FPS_VFR_PARAM} -movflags +faststart -pix_fmt yuv420p {self.path}", + type=EncodeTypeEnum.preview, + ) + + def run(self) -> None: + # generate input list + item_count = len(self.frame_times) + playlist = [] + + for t_idx in range(0, item_count): + if t_idx == item_count - 1: + # last frame does not get a duration + playlist.append( + f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" + ) + continue + + playlist.append( + f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" + ) + playlist.append( + f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}" + ) + + p = sp.run( + self.ffmpeg_cmd.split(" "), + input="\n".join(playlist), + encoding="ascii", + capture_output=True, + ) + + start = self.frame_times[0] + end = self.frame_times[-1] + + if p.returncode == 0: + logger.debug("successfully saved preview") + self.inter_process_queue.put_nowait( + ( + INSERT_PREVIEW, + { + Previews.id: f"{self.config.name}_{end}", + Previews.camera: self.config.name, + Previews.path: self.path, + Previews.start_time: start, + Previews.end_time: end, + Previews.duration: end - start, + }, + ) + ) + else: + logger.error(f"Error saving preview for {self.config.name} :: {p.stderr}") + + # unlink files from cache + # don't delete last frame as it will be used as first frame in next segment + for t in self.frame_times[0:-1]: + Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) + + +class PreviewRecorder: + def __init__(self, config: CameraConfig, inter_process_queue: mp.Queue) -> None: + self.config = config + self.inter_process_queue = inter_process_queue + self.start_time = 0 + self.last_output_time = 0 + self.output_frames = [] + self.out_height = 160 + self.out_width = ( + int((config.detect.width / config.detect.height) * self.out_height) // 4 * 4 + ) + + y, u1, u2, v1, v2 = get_yuv_crop( + self.config.frame_shape_yuv, + ( + 0, + 0, + self.config.frame_shape[1], + self.config.frame_shape[0], + ), + ) + self.channel_dims = { + "y": y, + "u1": u1, + "u2": u2, + "v1": v1, + "v2": v2, + } + + # end segment at end of hour + self.segment_end = ( + (datetime.datetime.now() + datetime.timedelta(hours=1)) + .replace(minute=0, second=0, microsecond=0) + .timestamp() + ) + + Path(os.path.join(CACHE_DIR, "preview_frames")).mkdir(exist_ok=True) + Path(os.path.join(CLIPS_DIR, f"previews/{config.name}")).mkdir( + parents=True, exist_ok=True + ) + + def should_write_frame( + self, + current_tracked_objects: list[dict[str, any]], + motion_boxes: list[list[int]], + frame_time: float, + ) -> bool: + """Decide if this frame should be added to PREVIEW.""" + # limit output to 1 fps + if (frame_time - self.last_output_time) < 1 / PREVIEW_OUTPUT_FPS: + return False + + # send frame if a non-stationary object is in a zone + if any( + (len(o["current_zones"]) > 0 and not o["stationary"]) + for o in current_tracked_objects + ): + self.last_output_time = frame_time + return True + + if len(motion_boxes) > 0: + self.last_output_time = frame_time + return True + + return False + + def write_frame_to_cache(self, frame_time: float, frame) -> None: + # resize yuv frame + small_frame = np.zeros((self.out_height * 3 // 2, self.out_width), np.uint8) + copy_yuv_to_position( + small_frame, + (0, 0), + (self.out_height, self.out_width), + frame, + self.channel_dims, + cv2.INTER_AREA, + ) + small_frame = cv2.cvtColor( + small_frame, + cv2.COLOR_YUV2BGR_I420, + ) + _, jpg = cv2.imencode(".jpg", small_frame) + with open( + get_cache_image_name(self.config.name, frame_time), + "wb", + ) as j: + j.write(jpg.tobytes()) + + def write_data( + self, + current_tracked_objects: list[dict[str, any]], + motion_boxes: list[list[int]], + frame_time: float, + frame, + ) -> None: + # always write the first frame + if self.start_time == 0: + self.start_time = frame_time + self.output_frames.append(frame_time) + self.write_frame_to_cache(frame_time, frame) + return + + if self.should_write_frame(current_tracked_objects, motion_boxes, frame_time): + self.output_frames.append(frame_time) + self.write_frame_to_cache(frame_time, frame) + + # check if PREVIEW clip should be generated and cached frames reset + if frame_time >= self.segment_end: + # save last frame to ensure consistent duration + self.output_frames.append(frame_time) + self.write_frame_to_cache(frame_time, frame) + FFMpegConverter( + self.config, + self.output_frames, + self.inter_process_queue, + ).start() + + # reset frame cache + self.segment_end = ( + (datetime.datetime.now() + datetime.timedelta(hours=1)) + .replace(minute=0, second=0, microsecond=0) + .timestamp() + ) + self.start_time = frame_time + self.last_output_time = frame_time + self.output_frames = [] + + # include first frame to ensure consistent duration + self.output_frames.append(frame_time) + self.write_frame_to_cache(frame_time, frame) + + def stop(self) -> None: + try: + shutil.rmtree(os.path.join(CACHE_DIR, FOLDER_PREVIEW_FRAMES)) + except FileNotFoundError: + pass diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index c7aa0e167..c2c7d32e7 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -7,9 +7,9 @@ import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path -from frigate.config import FrigateConfig, RetainModeEnum +from frigate.config import CameraConfig, FrigateConfig, RetainModeEnum from frigate.const import CACHE_DIR, RECORD_DIR -from frigate.models import Event, Recordings +from frigate.models import Event, Previews, Recordings from frigate.record.util import remove_empty_directories, sync_recordings from frigate.util.builtin import clear_and_unlink, get_tomorrow_at_time @@ -33,10 +33,152 @@ class RecordingCleanup(threading.Thread): logger.debug("Deleting tmp clip.") clear_and_unlink(p) + def expire_existing_camera_recordings( + self, expire_date: float, config: CameraConfig, events: Event + ) -> None: + """Delete recordings for existing camera based on retention config.""" + # Get the timestamp for cutoff of retained days + + # Get recordings to check for expiration + recordings: Recordings = ( + Recordings.select( + Recordings.id, + Recordings.start_time, + Recordings.end_time, + Recordings.path, + Recordings.objects, + Recordings.motion, + ) + .where( + Recordings.camera == config.name, + Recordings.end_time < expire_date, + ) + .order_by(Recordings.start_time) + .namedtuples() + .iterator() + ) + + # loop over recordings and see if they overlap with any non-expired events + # TODO: expire segments based on segment stats according to config + event_start = 0 + deleted_recordings = set() + kept_recordings: list[tuple[float, float]] = [] + for recording in recordings: + keep = False + # Now look for a reason to keep this recording segment + for idx in range(event_start, len(events)): + event: Event = events[idx] + + # if the event starts in the future, stop checking events + # and let this recording segment expire + if event.start_time > recording.end_time: + keep = False + break + + # if the event is in progress or ends after the recording starts, keep it + # and stop looking at events + if event.end_time is None or event.end_time >= recording.start_time: + keep = True + break + + # if the event ends before this recording segment starts, skip + # this event and check the next event for an overlap. + # since the events and recordings are sorted, we can skip events + # that end before the previous recording segment started on future segments + if event.end_time < recording.start_time: + event_start = idx + + # Delete recordings outside of the retention window or based on the retention mode + if ( + not keep + or ( + config.record.events.retain.mode == RetainModeEnum.motion + and recording.motion == 0 + ) + or ( + config.record.events.retain.mode == RetainModeEnum.active_objects + and recording.objects == 0 + ) + ): + Path(recording.path).unlink(missing_ok=True) + deleted_recordings.add(recording.id) + else: + kept_recordings.append((recording.start_time, recording.end_time)) + + # expire recordings + logger.debug(f"Expiring {len(deleted_recordings)} recordings") + # delete up to 100,000 at a time + max_deletes = 100000 + deleted_recordings_list = list(deleted_recordings) + for i in range(0, len(deleted_recordings_list), max_deletes): + Recordings.delete().where( + Recordings.id << deleted_recordings_list[i : i + max_deletes] + ).execute() + + previews: Previews = ( + Previews.select( + Previews.id, + Previews.start_time, + Previews.end_time, + Previews.path, + ) + .where( + Previews.camera == config.name, + Previews.end_time < expire_date, + ) + .order_by(Previews.start_time) + .namedtuples() + .iterator() + ) + + # expire previews + recording_start = 0 + deleted_previews = set() + for preview in previews: + keep = False + # look for a reason to keep this preview + for idx in range(recording_start, len(kept_recordings)): + start_time, end_time = kept_recordings[idx] + + # if the recording starts in the future, stop checking recordings + # and let this preview expire + if start_time > preview.end_time: + keep = False + break + + # if the recording ends after the preview starts, keep it + # and stop looking at recordings + if end_time >= preview.start_time: + keep = True + break + + # if the recording ends before this preview starts, skip + # this recording and check the next recording for an overlap. + # since the kept recordings and previews are sorted, we can skip recordings + # that end before the current preview started + if end_time < preview.start_time: + recording_start = idx + + # Delete previews without any relevant recordings + if not keep: + Path(preview.path).unlink(missing_ok=True) + deleted_previews.add(preview.id) + + # expire previews + logger.debug(f"Expiring {len(deleted_previews)} previews") + # delete up to 100,000 at a time + max_deletes = 100000 + deleted_previews_list = list(deleted_previews) + for i in range(0, len(deleted_previews_list), max_deletes): + Previews.delete().where( + Previews.id << deleted_previews_list[i : i + max_deletes] + ).execute() + def expire_recordings(self) -> None: """Delete recordings based on retention config.""" logger.debug("Start expire recordings.") logger.debug("Start deleted cameras.") + # Handle deleted cameras expire_days = self.config.record.retain.days expire_before = ( @@ -73,31 +215,12 @@ class RecordingCleanup(threading.Thread): logger.debug("Start all cameras.") for camera, config in self.config.cameras.items(): logger.debug(f"Start camera: {camera}.") - # Get the timestamp for cutoff of retained days + expire_days = config.record.retain.days expire_date = ( datetime.datetime.now() - datetime.timedelta(days=expire_days) ).timestamp() - # Get recordings to check for expiration - recordings: Recordings = ( - Recordings.select( - Recordings.id, - Recordings.start_time, - Recordings.end_time, - Recordings.path, - Recordings.objects, - Recordings.motion, - ) - .where( - Recordings.camera == camera, - Recordings.end_time < expire_date, - ) - .order_by(Recordings.start_time) - .namedtuples() - .iterator() - ) - # Get all the events to check against events: Event = ( Event.select( @@ -115,60 +238,7 @@ class RecordingCleanup(threading.Thread): .namedtuples() ) - # loop over recordings and see if they overlap with any non-expired events - # TODO: expire segments based on segment stats according to config - event_start = 0 - deleted_recordings = set() - for recording in recordings: - keep = False - # Now look for a reason to keep this recording segment - for idx in range(event_start, len(events)): - event: Event = events[idx] - - # if the event starts in the future, stop checking events - # and let this recording segment expire - if event.start_time > recording.end_time: - keep = False - break - - # if the event is in progress or ends after the recording starts, keep it - # and stop looking at events - if event.end_time is None or event.end_time >= recording.start_time: - keep = True - break - - # if the event ends before this recording segment starts, skip - # this event and check the next event for an overlap. - # since the events and recordings are sorted, we can skip events - # that end before the previous recording segment started on future segments - if event.end_time < recording.start_time: - event_start = idx - - # Delete recordings outside of the retention window or based on the retention mode - if ( - not keep - or ( - config.record.events.retain.mode == RetainModeEnum.motion - and recording.motion == 0 - ) - or ( - config.record.events.retain.mode - == RetainModeEnum.active_objects - and recording.objects == 0 - ) - ): - Path(recording.path).unlink(missing_ok=True) - deleted_recordings.add(recording.id) - - logger.debug(f"Expiring {len(deleted_recordings)} recordings") - # delete up to 100,000 at a time - max_deletes = 100000 - deleted_recordings_list = list(deleted_recordings) - for i in range(0, len(deleted_recordings_list), max_deletes): - Recordings.delete().where( - Recordings.id << deleted_recordings_list[i : i + max_deletes] - ).execute() - + self.expire_existing_camera_recordings(expire_date, config, events) logger.debug(f"End camera: {camera}.") logger.debug("End all cameras.") diff --git a/frigate/test/test_birdseye.py b/frigate/test/test_birdseye.py index 8c24b48ec..33683f5c4 100644 --- a/frigate/test/test_birdseye.py +++ b/frigate/test/test_birdseye.py @@ -2,7 +2,7 @@ import unittest -from frigate.output import get_canvas_shape +from frigate.output.birdseye import get_canvas_shape class TestBirdseye(unittest.TestCase): diff --git a/frigate/util/image.py b/frigate/util/image.py index 55c745eb0..4fc3c2fd8 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -387,6 +387,7 @@ def copy_yuv_to_position( destination_shape, source_frame=None, source_channel_dim=None, + interpolation=cv2.INTER_LINEAR, ): # get the coordinates of the channels for this position in the layout y, u1, u2, v1, v2 = get_yuv_crop( @@ -435,7 +436,6 @@ def copy_yuv_to_position( uv_y_offset = y_y_offset // 4 uv_x_offset = y_x_offset // 2 - interpolation = cv2.INTER_LINEAR # resize/copy y channel destination_frame[ y[1] + y_y_offset : y[1] + y_y_offset + y_resize_height, diff --git a/migrations/021_create_previews_table.py b/migrations/021_create_previews_table.py new file mode 100644 index 000000000..3ad131e0d --- /dev/null +++ b/migrations/021_create_previews_table.py @@ -0,0 +1,35 @@ +"""Peewee migrations -- 021_create_previews_table.py. + +Some examples (model - class or model name):: + + > Model = migrator.orm['model_name'] # Return model in current state by name + + > migrator.sql(sql) # Run custom SQL + > migrator.python(func, *args, **kwargs) # Run python code + > migrator.create_model(Model) # Create a model (could be used as decorator) + > migrator.remove_model(model, cascade=True) # Remove a model + > migrator.add_fields(model, **fields) # Add fields to a model + > migrator.change_fields(model, **fields) # Change fields + > migrator.remove_fields(model, *field_names, cascade=True) + > migrator.rename_field(model, old_field_name, new_field_name) + > migrator.rename_table(model, new_table_name) + > migrator.add_index(model, *col_names, unique=False) + > migrator.drop_index(model, *col_names) + > migrator.add_not_null(model, *field_names) + > migrator.drop_not_null(model, *field_names) + > migrator.add_default(model, field_name, default) + +""" +import peewee as pw + +SQL = pw.SQL + + +def migrate(migrator, database, fake=False, **kwargs): + migrator.sql( + 'CREATE TABLE IF NOT EXISTS "previews" ("id" VARCHAR(30) NOT NULL PRIMARY KEY, "camera" VARCHAR(20) NOT NULL, "path" VARCHAR(255) NOT NULL, "start_time" DATETIME NOT NULL, "end_time" DATETIME NOT NULL, "duration" REAL NOT NULL)' + ) + + +def rollback(migrator, database, fake=False, **kwargs): + pass