Write a low resolution low fps stream from decoded frames (#8673)

* Generate low res low fps previews for recordings viewer

* Make sure previews end on the hour

* Fix durations and decrase keyframe interval to ensure smooth scrubbing

* Ensure minimized resolution is compatible with yuv

* Add ability to configure preview quality

* Fix

* Clean up previews more efficiently

* Use iterator
This commit is contained in:
Nicolas Mowen 2023-12-03 07:16:01 -07:00 committed by Blake Blackshear
parent c80b8140b8
commit cfda531f5a
16 changed files with 939 additions and 248 deletions

View File

@ -325,6 +325,11 @@ record:
# The -r (framerate) dictates how smooth the output video is.
# So the args would be -vf setpts=0.02*PTS -r 30 in that case.
timelapse_args: "-vf setpts=0.04*PTS -r 30"
# Optional: Recording Preview Settings
preview:
# Optional: Quality of recording preview (default: shown below).
# Options are: very_low, low, medium, high, very_high
quality: medium
# Optional: Event recording settings
events:
# Optional: Number of seconds before the event to include (default: shown below)

View File

@ -37,10 +37,17 @@ from frigate.events.external import ExternalEventProcessor
from frigate.events.maintainer import EventProcessor
from frigate.http import create_app
from frigate.log import log_process, root_configurer
from frigate.models import Event, Recordings, RecordingsToDelete, Regions, Timeline
from frigate.models import (
Event,
Previews,
Recordings,
RecordingsToDelete,
Regions,
Timeline,
)
from frigate.object_detection import ObjectDetectProcess
from frigate.object_processing import TrackedObjectProcessor
from frigate.output import output_frames
from frigate.output.output import output_frames
from frigate.plus import PlusApi
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
@ -369,7 +376,7 @@ class FrigateApp:
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
),
)
models = [Event, Recordings, RecordingsToDelete, Regions, Timeline]
models = [Event, Recordings, RecordingsToDelete, Previews, Regions, Timeline]
self.db.bind(models)
def init_stats(self) -> None:
@ -488,6 +495,7 @@ class FrigateApp:
args=(
self.config,
self.video_output_queue,
self.inter_process_queue,
self.camera_metrics,
),
)

View File

@ -5,8 +5,8 @@ from abc import ABC, abstractmethod
from typing import Any, Callable
from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import INSERT_MANY_RECORDINGS, REQUEST_REGION_GRID
from frigate.models import Recordings
from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID
from frigate.models import Previews, Recordings
from frigate.ptz.onvif import OnvifCommandEnum, OnvifController
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes
from frigate.util.object import get_camera_regions_grid
@ -102,6 +102,8 @@ class Dispatcher:
max(self.config.model.width, self.config.model.height),
)
)
elif topic == INSERT_PREVIEW:
Previews.insert(payload).execute()
else:
self.publish(topic, payload, retain=False)

View File

@ -260,6 +260,20 @@ class RecordExportConfig(FrigateBaseModel):
)
class RecordQualityEnum(str, Enum):
very_low = "very_low"
low = "low"
medium = "medium"
high = "high"
very_high = "very_high"
class RecordPreviewConfig(FrigateBaseModel):
quality: RecordQualityEnum = Field(
default=RecordQualityEnum.medium, title="Quality of recording preview."
)
class RecordConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable record on all cameras.")
sync_recordings: bool = Field(
@ -278,6 +292,9 @@ class RecordConfig(FrigateBaseModel):
export: RecordExportConfig = Field(
default_factory=RecordExportConfig, title="Recording Export Config"
)
preview: RecordPreviewConfig = Field(
default_factory=RecordPreviewConfig, title="Recording Preview Config"
)
enabled_in_config: Optional[bool] = Field(
title="Keep track of original state of recording."
)

View File

@ -59,6 +59,7 @@ MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to
# Internal Comms Topics
INSERT_MANY_RECORDINGS = "insert_many_recordings"
INSERT_PREVIEW = "insert_preview"
REQUEST_REGION_GRID = "request_region_grid"
# Autotracking

View File

@ -42,6 +42,11 @@ class LibvaGpuSelector:
return ""
FPS_VFR_PARAM = (
"-fps_mode vfr"
if int(os.getenv("LIBAVFORMAT_VERSION_MAJOR", "59")) >= 59
else "-vsync 2"
)
TIMEOUT_PARAM = (
"-timeout"
if int(os.getenv("LIBAVFORMAT_VERSION_MAJOR", "59")) >= 59
@ -114,6 +119,11 @@ PRESETS_HW_ACCEL_ENCODE_TIMELAPSE = {
"default": "ffmpeg -hide_banner {0} -c:v libx264 -preset:v ultrafast -tune:v zerolatency {1}",
}
# encoding of previews is only done on CPU due to comparable encode times and better quality from libx264
PRESETS_HW_ACCEL_ENCODE_PREVIEW = {
"default": "ffmpeg -hide_banner {0} -c:v libx264 -profile:v baseline -preset:v ultrafast {1}",
}
def parse_preset_hardware_acceleration_decode(
arg: Any,
@ -153,6 +163,7 @@ def parse_preset_hardware_acceleration_scale(
class EncodeTypeEnum(str, Enum):
birdseye = "birdseye"
preview = "preview"
timelapse = "timelapse"
@ -162,6 +173,8 @@ def parse_preset_hardware_acceleration_encode(
"""Return the correct scaling preset or default preset if none is set."""
if type == EncodeTypeEnum.birdseye:
arg_map = PRESETS_HW_ACCEL_ENCODE_BIRDSEYE
elif type == EncodeTypeEnum.preview:
arg_map = PRESETS_HW_ACCEL_ENCODE_PREVIEW
elif type == EncodeTypeEnum.timelapse:
arg_map = PRESETS_HW_ACCEL_ENCODE_TIMELAPSE

View File

@ -43,7 +43,7 @@ from frigate.const import (
RECORD_DIR,
)
from frigate.events.external import ExternalEventProcessor
from frigate.models import Event, Recordings, Regions, Timeline
from frigate.models import Event, Previews, Recordings, Regions, Timeline
from frigate.object_processing import TrackedObject
from frigate.plus import PlusApi
from frigate.ptz.onvif import OnvifController
@ -1845,7 +1845,6 @@ def vod_hour_no_timezone(year_month, day, hour, camera_name):
)
# TODO make this nicer when vod module is removed
@bp.route("/vod/<year_month>/<day>/<hour>/<camera_name>/<tz_name>")
def vod_hour(year_month, day, hour, camera_name, tz_name):
parts = year_month.split("-")
@ -1860,6 +1859,66 @@ def vod_hour(year_month, day, hour, camera_name, tz_name):
return vod_ts(camera_name, start_ts, end_ts)
@bp.route("/preview/<camera_name>/start/<int:start_ts>/end/<int:end_ts>")
@bp.route("/preview/<camera_name>/start/<float:start_ts>/end/<float:end_ts>")
def preview_ts(camera_name, start_ts, end_ts):
"""Get all mp4 previews relevant for time period."""
previews = (
Previews.select(
Previews.path, Previews.duration, Previews.start_time, Previews.end_time
)
.where(
Previews.start_time.between(start_ts, end_ts)
| Previews.end_time.between(start_ts, end_ts)
| ((start_ts > Previews.start_time) & (end_ts < Previews.end_time))
)
.where(Previews.camera == camera_name)
.order_by(Previews.start_time.asc())
.iterator()
)
clips = []
preview: Previews
for preview in previews:
clips.append(
{
"src": preview.path.replace("/media/frigate", ""),
"type": "video/mp4",
"start": preview.start_time,
"end": preview.end_time,
}
)
if not clips:
logger.error("No previews found for the requested time range")
return make_response(
jsonify(
{
"success": False,
"message": "No previews found.",
}
),
404,
)
return make_response(jsonify(clips), 200)
@bp.route("/preview/<year_month>/<day>/<hour>/<camera_name>/<tz_name>")
def preview_hour(year_month, day, hour, camera_name, tz_name):
parts = year_month.split("-")
start_date = (
datetime(int(parts[0]), int(parts[1]), int(day), int(hour), tzinfo=timezone.utc)
- datetime.now(pytz.timezone(tz_name.replace(",", "/"))).utcoffset()
)
end_date = start_date + timedelta(hours=1) - timedelta(milliseconds=1)
start_ts = start_date.timestamp()
end_ts = end_date.timestamp()
return preview_ts(camera_name, start_ts, end_ts)
@bp.route("/vod/event/<id>")
def vod_event(id):
try:

View File

@ -76,6 +76,15 @@ class Recordings(Model): # type: ignore[misc]
segment_size = FloatField(default=0) # this should be stored as MB
class Previews(Model): # type: ignore[misc]
id = CharField(null=False, primary_key=True, max_length=30)
camera = CharField(index=True, max_length=20)
path = CharField(unique=True)
start_time = DateTimeField()
end_time = DateTimeField()
duration = FloatField()
# Used for temporary table in record/cleanup.py
class RecordingsToDelete(Model): # type: ignore[misc]
id = CharField(null=False, primary_key=False, max_length=30)

View File

@ -1,3 +1,5 @@
"""Handle outputting birdseye frames via jsmpeg and go2rtc."""
import datetime
import glob
import logging
@ -5,23 +7,13 @@ import math
import multiprocessing as mp
import os
import queue
import signal
import subprocess as sp
import threading
import traceback
from wsgiref.simple_server import make_server
import cv2
import numpy as np
from setproctitle import setproctitle
from ws4py.server.wsgirefserver import (
WebSocketWSGIHandler,
WebSocketWSGIRequestHandler,
WSGIServer,
)
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from frigate.comms.ws import WebSocket
from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import BASE_DIR, BIRDSEYE_PIPE
from frigate.types import CameraMetricsTypes
@ -672,66 +664,19 @@ class BirdsEyeFrameManager:
return False
def output_frames(
class Birdseye:
def __init__(
self,
config: FrigateConfig,
video_output_queue,
camera_metrics: dict[str, CameraMetricsTypes],
):
threading.current_thread().name = "output"
setproctitle("frigate.output")
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
previous_frames = {}
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
inputs: dict[str, queue.Queue] = {}
converters = {}
broadcasters = {}
for camera, cam_config in config.cameras.items():
inputs[camera] = queue.Queue(maxsize=cam_config.detect.fps)
width = int(
cam_config.live.height
* (cam_config.frame_shape[1] / cam_config.frame_shape[0])
)
converters[camera] = FFMpegConverter(
camera,
inputs[camera],
stop_event,
cam_config.frame_shape[1],
cam_config.frame_shape[0],
width,
cam_config.live.height,
cam_config.live.quality,
)
broadcasters[camera] = BroadcastThread(
camera, converters[camera], websocket_server, stop_event
)
if config.birdseye.enabled:
inputs["birdseye"] = queue.Queue(maxsize=10)
converters["birdseye"] = FFMpegConverter(
stop_event: mp.Event,
websocket_server,
) -> None:
self.config = config
self.input = queue.Queue(maxsize=10)
self.converter = FFMpegConverter(
"birdseye",
inputs["birdseye"],
self.input,
stop_event,
config.birdseye.width,
config.birdseye.height,
@ -740,107 +685,49 @@ def output_frames(
config.birdseye.quality,
config.birdseye.restream,
)
broadcasters["birdseye"] = BroadcastThread(
"birdseye",
converters["birdseye"],
websocket_server,
stop_event,
self.broadcaster = BroadcastThread(
"birdseye", self.converter, websocket_server, stop_event
)
websocket_thread.start()
for t in converters.values():
t.start()
for t in broadcasters.values():
t.start()
birdseye_manager = BirdsEyeFrameManager(
frame_manager = SharedMemoryFrameManager()
self.birdseye_manager = BirdsEyeFrameManager(
config, frame_manager, stop_event, camera_metrics
)
if config.birdseye.restream:
birdseye_buffer = frame_manager.create(
self.birdseye_buffer = frame_manager.create(
"birdseye",
birdseye_manager.yuv_shape[0] * birdseye_manager.yuv_shape[1],
self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1],
)
while not stop_event.is_set():
try:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = video_output_queue.get(True, 1)
except queue.Empty:
continue
self.converter.start()
self.broadcaster.start()
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
try:
inputs[camera].put_nowait(frame.tobytes())
except queue.Full:
# drop frames if queue is full
pass
if config.birdseye.enabled and (
config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
)
):
if birdseye_manager.update(
def write_data(
self,
camera: str,
current_tracked_objects: list[dict[str, any]],
motion_boxes: list[list[int]],
frame_time: float,
frame,
) -> None:
if self.birdseye_manager.update(
camera,
len([o for o in current_tracked_objects if not o["stationary"]]),
len(motion_boxes),
frame_time,
frame,
):
frame_bytes = birdseye_manager.frame.tobytes()
frame_bytes = self.birdseye_manager.frame.tobytes()
if config.birdseye.restream:
birdseye_buffer[:] = frame_bytes
if self.config.birdseye.restream:
self.birdseye_buffer[:] = frame_bytes
try:
inputs["birdseye"].put_nowait(frame_bytes)
self.input.put_nowait(frame_bytes)
except queue.Full:
# drop frames if queue is full
pass
if camera in previous_frames:
frame_manager.delete(f"{camera}{previous_frames[camera]}")
previous_frames[camera] = frame_time
while not video_output_queue.empty():
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = video_output_queue.get(True, 10)
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.delete(frame_id)
for b in broadcasters.values():
b.join()
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
logger.info("exiting output process...")
def stop(self) -> None:
self.converter.join()
self.broadcaster.join()

165
frigate/output/camera.py Normal file
View File

@ -0,0 +1,165 @@
"""Handle outputting individual cameras via jsmpeg."""
import logging
import multiprocessing as mp
import queue
import subprocess as sp
import threading
from frigate.config import CameraConfig
logger = logging.getLogger(__name__)
class FFMpegConverter(threading.Thread):
def __init__(
self,
camera: str,
input_queue: queue.Queue,
stop_event: mp.Event,
in_width: int,
in_height: int,
out_width: int,
out_height: int,
quality: int,
):
threading.Thread.__init__(self)
self.name = f"{camera}_output_converter"
self.camera = camera
self.input_queue = input_queue
self.stop_event = stop_event
ffmpeg_cmd = [
"ffmpeg",
"-f",
"rawvideo",
"-pix_fmt",
"yuv420p",
"-video_size",
f"{in_width}x{in_height}",
"-i",
"pipe:",
"-f",
"mpegts",
"-s",
f"{out_width}x{out_height}",
"-codec:v",
"mpeg1video",
"-q",
f"{quality}",
"-bf",
"0",
"pipe:",
]
self.process = sp.Popen(
ffmpeg_cmd,
stdout=sp.PIPE,
stderr=sp.DEVNULL,
stdin=sp.PIPE,
start_new_session=True,
)
def __write(self, b) -> None:
self.process.stdin.write(b)
def read(self, length):
try:
return self.process.stdout.read1(length)
except ValueError:
return False
def exit(self):
self.process.terminate()
try:
self.process.communicate(timeout=30)
except sp.TimeoutExpired:
self.process.kill()
self.process.communicate()
def run(self) -> None:
while not self.stop_event.is_set():
try:
frame = self.input_queue.get(True, timeout=1)
self.__write(frame)
except queue.Empty:
pass
self.exit()
class BroadcastThread(threading.Thread):
def __init__(
self,
camera: str,
converter: FFMpegConverter,
websocket_server,
stop_event: mp.Event,
):
super(BroadcastThread, self).__init__()
self.camera = camera
self.converter = converter
self.websocket_server = websocket_server
self.stop_event = stop_event
def run(self):
while not self.stop_event.is_set():
buf = self.converter.read(65536)
if buf:
manager = self.websocket_server.manager
with manager.lock:
websockets = manager.websockets.copy()
ws_iter = iter(websockets.values())
for ws in ws_iter:
if (
not ws.terminated
and ws.environ["PATH_INFO"] == f"/{self.camera}"
):
try:
ws.send(buf, binary=True)
except ValueError:
pass
except (BrokenPipeError, ConnectionResetError) as e:
logger.debug(f"Websocket unexpectedly closed {e}")
elif self.converter.process.poll() is not None:
break
class JsmpegCamera:
def __init__(
self, config: CameraConfig, stop_event: mp.Event, websocket_server
) -> None:
self.config = config
self.input = queue.Queue(maxsize=config.detect.fps)
width = int(
config.live.height * (config.frame_shape[1] / config.frame_shape[0])
)
self.converter = FFMpegConverter(
config.name,
self.input,
stop_event,
config.frame_shape[1],
config.frame_shape[0],
width,
config.live.height,
config.live.quality,
)
self.broadcaster = BroadcastThread(
config.name, self.converter, websocket_server, stop_event
)
self.converter.start()
self.broadcaster.start()
def write_frame(self, frame_bytes) -> None:
try:
self.input.put_nowait(frame_bytes)
except queue.Full:
# drop frames if queue is full
pass
def stop(self) -> None:
self.converter.join()
self.broadcaster.join()

155
frigate/output/output.py Normal file
View File

@ -0,0 +1,155 @@
"""Handle outputting raw frigate frames"""
import logging
import multiprocessing as mp
import queue
import signal
import threading
from typing import Optional
from wsgiref.simple_server import make_server
from setproctitle import setproctitle
from ws4py.server.wsgirefserver import (
WebSocketWSGIHandler,
WebSocketWSGIRequestHandler,
WSGIServer,
)
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from frigate.comms.ws import WebSocket
from frigate.config import FrigateConfig
from frigate.output.birdseye import Birdseye
from frigate.output.camera import JsmpegCamera
from frigate.output.preview import PreviewRecorder
from frigate.types import CameraMetricsTypes
from frigate.util.image import SharedMemoryFrameManager
logger = logging.getLogger(__name__)
def output_frames(
config: FrigateConfig,
video_output_queue: mp.Queue,
inter_process_queue: mp.Queue,
camera_metrics: dict[str, CameraMetricsTypes],
):
threading.current_thread().name = "output"
setproctitle("frigate.output")
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
previous_frames = {}
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Optional[Birdseye] = None
preview_recorders: dict[str, PreviewRecorder] = {}
for camera, cam_config in config.cameras.items():
if not cam_config.enabled:
continue
jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server)
preview_recorders[camera] = PreviewRecorder(cam_config, inter_process_queue)
if config.birdseye.enabled:
birdseye = Birdseye(config, camera_metrics, stop_event, websocket_server)
websocket_thread.start()
while not stop_event.is_set():
try:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = video_output_queue.get(True, 1)
except queue.Empty:
continue
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
jsmpeg_cameras[camera].write_frame(frame.tobytes())
# send output data to birdseye if websocket is connected or restreaming
if config.birdseye.enabled and (
config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
)
):
birdseye.write_data(
camera,
current_tracked_objects,
motion_boxes,
frame_time,
frame,
)
# send frames for low fps recording
preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)
# delete frames after they have been used for output
if camera in previous_frames:
frame_manager.delete(f"{camera}{previous_frames[camera]}")
previous_frames[camera] = frame_time
while not video_output_queue.empty():
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = video_output_queue.get(True, 10)
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.delete(frame_id)
for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop()
for preview in preview_recorders.values():
preview.stop()
if birdseye is not None:
birdseye.stop()
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
logger.info("exiting output process...")

265
frigate/output/preview.py Normal file
View File

@ -0,0 +1,265 @@
"""Handle outputting low res / fps preview segments from decoded frames."""
import datetime
import logging
import multiprocessing as mp
import os
import shutil
import subprocess as sp
import threading
from pathlib import Path
import cv2
import numpy as np
from frigate.config import CameraConfig, RecordQualityEnum
from frigate.const import CACHE_DIR, CLIPS_DIR, INSERT_PREVIEW
from frigate.ffmpeg_presets import (
FPS_VFR_PARAM,
EncodeTypeEnum,
parse_preset_hardware_acceleration_encode,
)
from frigate.models import Previews
from frigate.util.image import copy_yuv_to_position, get_yuv_crop
logger = logging.getLogger(__name__)
FOLDER_PREVIEW_FRAMES = "preview_frames"
PREVIEW_OUTPUT_FPS = 1
PREVIEW_SEGMENT_DURATION = 3600 # one hour
# important to have lower keyframe to maintain scrubbing performance
PREVIEW_KEYFRAME_INTERVAL = 60
PREVIEW_BIT_RATES = {
RecordQualityEnum.very_low: 4096,
RecordQualityEnum.low: 6144,
RecordQualityEnum.medium: 8192,
RecordQualityEnum.high: 12288,
RecordQualityEnum.very_high: 16384,
}
def get_cache_image_name(camera: str, frame_time: float) -> str:
"""Get the image name in cache."""
return os.path.join(
CACHE_DIR,
f"{FOLDER_PREVIEW_FRAMES}/preview_{camera}-{frame_time}.jpg",
)
class FFMpegConverter(threading.Thread):
"""Convert a list of jpg frames into a vfr mp4."""
def __init__(
self,
config: CameraConfig,
frame_times: list[float],
inter_process_queue: mp.Queue,
):
threading.Thread.__init__(self)
self.name = f"{config.name}_preview_converter"
self.config = config
self.frame_times = frame_times
self.inter_process_queue = inter_process_queue
self.path = os.path.join(
CLIPS_DIR,
f"previews/{self.config.name}/{self.frame_times[0]}-{self.frame_times[-1]}.mp4",
)
# write a PREVIEW at fps and 1 key frame per clip
self.ffmpeg_cmd = parse_preset_hardware_acceleration_encode(
config.ffmpeg.hwaccel_args,
input="-f concat -y -protocol_whitelist pipe,file -safe 0 -i /dev/stdin",
output=f"-g {PREVIEW_KEYFRAME_INTERVAL} -fpsmax {PREVIEW_OUTPUT_FPS} -bf 0 -b:v {PREVIEW_BIT_RATES[self.config.record.preview.quality]} {FPS_VFR_PARAM} -movflags +faststart -pix_fmt yuv420p {self.path}",
type=EncodeTypeEnum.preview,
)
def run(self) -> None:
# generate input list
item_count = len(self.frame_times)
playlist = []
for t_idx in range(0, item_count):
if t_idx == item_count - 1:
# last frame does not get a duration
playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'"
)
continue
playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'"
)
playlist.append(
f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}"
)
p = sp.run(
self.ffmpeg_cmd.split(" "),
input="\n".join(playlist),
encoding="ascii",
capture_output=True,
)
start = self.frame_times[0]
end = self.frame_times[-1]
if p.returncode == 0:
logger.debug("successfully saved preview")
self.inter_process_queue.put_nowait(
(
INSERT_PREVIEW,
{
Previews.id: f"{self.config.name}_{end}",
Previews.camera: self.config.name,
Previews.path: self.path,
Previews.start_time: start,
Previews.end_time: end,
Previews.duration: end - start,
},
)
)
else:
logger.error(f"Error saving preview for {self.config.name} :: {p.stderr}")
# unlink files from cache
# don't delete last frame as it will be used as first frame in next segment
for t in self.frame_times[0:-1]:
Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True)
class PreviewRecorder:
def __init__(self, config: CameraConfig, inter_process_queue: mp.Queue) -> None:
self.config = config
self.inter_process_queue = inter_process_queue
self.start_time = 0
self.last_output_time = 0
self.output_frames = []
self.out_height = 160
self.out_width = (
int((config.detect.width / config.detect.height) * self.out_height) // 4 * 4
)
y, u1, u2, v1, v2 = get_yuv_crop(
self.config.frame_shape_yuv,
(
0,
0,
self.config.frame_shape[1],
self.config.frame_shape[0],
),
)
self.channel_dims = {
"y": y,
"u1": u1,
"u2": u2,
"v1": v1,
"v2": v2,
}
# end segment at end of hour
self.segment_end = (
(datetime.datetime.now() + datetime.timedelta(hours=1))
.replace(minute=0, second=0, microsecond=0)
.timestamp()
)
Path(os.path.join(CACHE_DIR, "preview_frames")).mkdir(exist_ok=True)
Path(os.path.join(CLIPS_DIR, f"previews/{config.name}")).mkdir(
parents=True, exist_ok=True
)
def should_write_frame(
self,
current_tracked_objects: list[dict[str, any]],
motion_boxes: list[list[int]],
frame_time: float,
) -> bool:
"""Decide if this frame should be added to PREVIEW."""
# limit output to 1 fps
if (frame_time - self.last_output_time) < 1 / PREVIEW_OUTPUT_FPS:
return False
# send frame if a non-stationary object is in a zone
if any(
(len(o["current_zones"]) > 0 and not o["stationary"])
for o in current_tracked_objects
):
self.last_output_time = frame_time
return True
if len(motion_boxes) > 0:
self.last_output_time = frame_time
return True
return False
def write_frame_to_cache(self, frame_time: float, frame) -> None:
# resize yuv frame
small_frame = np.zeros((self.out_height * 3 // 2, self.out_width), np.uint8)
copy_yuv_to_position(
small_frame,
(0, 0),
(self.out_height, self.out_width),
frame,
self.channel_dims,
cv2.INTER_AREA,
)
small_frame = cv2.cvtColor(
small_frame,
cv2.COLOR_YUV2BGR_I420,
)
_, jpg = cv2.imencode(".jpg", small_frame)
with open(
get_cache_image_name(self.config.name, frame_time),
"wb",
) as j:
j.write(jpg.tobytes())
def write_data(
self,
current_tracked_objects: list[dict[str, any]],
motion_boxes: list[list[int]],
frame_time: float,
frame,
) -> None:
# always write the first frame
if self.start_time == 0:
self.start_time = frame_time
self.output_frames.append(frame_time)
self.write_frame_to_cache(frame_time, frame)
return
if self.should_write_frame(current_tracked_objects, motion_boxes, frame_time):
self.output_frames.append(frame_time)
self.write_frame_to_cache(frame_time, frame)
# check if PREVIEW clip should be generated and cached frames reset
if frame_time >= self.segment_end:
# save last frame to ensure consistent duration
self.output_frames.append(frame_time)
self.write_frame_to_cache(frame_time, frame)
FFMpegConverter(
self.config,
self.output_frames,
self.inter_process_queue,
).start()
# reset frame cache
self.segment_end = (
(datetime.datetime.now() + datetime.timedelta(hours=1))
.replace(minute=0, second=0, microsecond=0)
.timestamp()
)
self.start_time = frame_time
self.last_output_time = frame_time
self.output_frames = []
# include first frame to ensure consistent duration
self.output_frames.append(frame_time)
self.write_frame_to_cache(frame_time, frame)
def stop(self) -> None:
try:
shutil.rmtree(os.path.join(CACHE_DIR, FOLDER_PREVIEW_FRAMES))
except FileNotFoundError:
pass

View File

@ -7,9 +7,9 @@ import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from frigate.config import FrigateConfig, RetainModeEnum
from frigate.config import CameraConfig, FrigateConfig, RetainModeEnum
from frigate.const import CACHE_DIR, RECORD_DIR
from frigate.models import Event, Recordings
from frigate.models import Event, Previews, Recordings
from frigate.record.util import remove_empty_directories, sync_recordings
from frigate.util.builtin import clear_and_unlink, get_tomorrow_at_time
@ -33,10 +33,152 @@ class RecordingCleanup(threading.Thread):
logger.debug("Deleting tmp clip.")
clear_and_unlink(p)
def expire_existing_camera_recordings(
self, expire_date: float, config: CameraConfig, events: Event
) -> None:
"""Delete recordings for existing camera based on retention config."""
# Get the timestamp for cutoff of retained days
# Get recordings to check for expiration
recordings: Recordings = (
Recordings.select(
Recordings.id,
Recordings.start_time,
Recordings.end_time,
Recordings.path,
Recordings.objects,
Recordings.motion,
)
.where(
Recordings.camera == config.name,
Recordings.end_time < expire_date,
)
.order_by(Recordings.start_time)
.namedtuples()
.iterator()
)
# loop over recordings and see if they overlap with any non-expired events
# TODO: expire segments based on segment stats according to config
event_start = 0
deleted_recordings = set()
kept_recordings: list[tuple[float, float]] = []
for recording in recordings:
keep = False
# Now look for a reason to keep this recording segment
for idx in range(event_start, len(events)):
event: Event = events[idx]
# if the event starts in the future, stop checking events
# and let this recording segment expire
if event.start_time > recording.end_time:
keep = False
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event.end_time is None or event.end_time >= recording.start_time:
keep = True
break
# if the event ends before this recording segment starts, skip
# this event and check the next event for an overlap.
# since the events and recordings are sorted, we can skip events
# that end before the previous recording segment started on future segments
if event.end_time < recording.start_time:
event_start = idx
# Delete recordings outside of the retention window or based on the retention mode
if (
not keep
or (
config.record.events.retain.mode == RetainModeEnum.motion
and recording.motion == 0
)
or (
config.record.events.retain.mode == RetainModeEnum.active_objects
and recording.objects == 0
)
):
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
else:
kept_recordings.append((recording.start_time, recording.end_time))
# expire recordings
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
# delete up to 100,000 at a time
max_deletes = 100000
deleted_recordings_list = list(deleted_recordings)
for i in range(0, len(deleted_recordings_list), max_deletes):
Recordings.delete().where(
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
previews: Previews = (
Previews.select(
Previews.id,
Previews.start_time,
Previews.end_time,
Previews.path,
)
.where(
Previews.camera == config.name,
Previews.end_time < expire_date,
)
.order_by(Previews.start_time)
.namedtuples()
.iterator()
)
# expire previews
recording_start = 0
deleted_previews = set()
for preview in previews:
keep = False
# look for a reason to keep this preview
for idx in range(recording_start, len(kept_recordings)):
start_time, end_time = kept_recordings[idx]
# if the recording starts in the future, stop checking recordings
# and let this preview expire
if start_time > preview.end_time:
keep = False
break
# if the recording ends after the preview starts, keep it
# and stop looking at recordings
if end_time >= preview.start_time:
keep = True
break
# if the recording ends before this preview starts, skip
# this recording and check the next recording for an overlap.
# since the kept recordings and previews are sorted, we can skip recordings
# that end before the current preview started
if end_time < preview.start_time:
recording_start = idx
# Delete previews without any relevant recordings
if not keep:
Path(preview.path).unlink(missing_ok=True)
deleted_previews.add(preview.id)
# expire previews
logger.debug(f"Expiring {len(deleted_previews)} previews")
# delete up to 100,000 at a time
max_deletes = 100000
deleted_previews_list = list(deleted_previews)
for i in range(0, len(deleted_previews_list), max_deletes):
Previews.delete().where(
Previews.id << deleted_previews_list[i : i + max_deletes]
).execute()
def expire_recordings(self) -> None:
"""Delete recordings based on retention config."""
logger.debug("Start expire recordings.")
logger.debug("Start deleted cameras.")
# Handle deleted cameras
expire_days = self.config.record.retain.days
expire_before = (
@ -73,31 +215,12 @@ class RecordingCleanup(threading.Thread):
logger.debug("Start all cameras.")
for camera, config in self.config.cameras.items():
logger.debug(f"Start camera: {camera}.")
# Get the timestamp for cutoff of retained days
expire_days = config.record.retain.days
expire_date = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
# Get recordings to check for expiration
recordings: Recordings = (
Recordings.select(
Recordings.id,
Recordings.start_time,
Recordings.end_time,
Recordings.path,
Recordings.objects,
Recordings.motion,
)
.where(
Recordings.camera == camera,
Recordings.end_time < expire_date,
)
.order_by(Recordings.start_time)
.namedtuples()
.iterator()
)
# Get all the events to check against
events: Event = (
Event.select(
@ -115,60 +238,7 @@ class RecordingCleanup(threading.Thread):
.namedtuples()
)
# loop over recordings and see if they overlap with any non-expired events
# TODO: expire segments based on segment stats according to config
event_start = 0
deleted_recordings = set()
for recording in recordings:
keep = False
# Now look for a reason to keep this recording segment
for idx in range(event_start, len(events)):
event: Event = events[idx]
# if the event starts in the future, stop checking events
# and let this recording segment expire
if event.start_time > recording.end_time:
keep = False
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event.end_time is None or event.end_time >= recording.start_time:
keep = True
break
# if the event ends before this recording segment starts, skip
# this event and check the next event for an overlap.
# since the events and recordings are sorted, we can skip events
# that end before the previous recording segment started on future segments
if event.end_time < recording.start_time:
event_start = idx
# Delete recordings outside of the retention window or based on the retention mode
if (
not keep
or (
config.record.events.retain.mode == RetainModeEnum.motion
and recording.motion == 0
)
or (
config.record.events.retain.mode
== RetainModeEnum.active_objects
and recording.objects == 0
)
):
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
# delete up to 100,000 at a time
max_deletes = 100000
deleted_recordings_list = list(deleted_recordings)
for i in range(0, len(deleted_recordings_list), max_deletes):
Recordings.delete().where(
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
self.expire_existing_camera_recordings(expire_date, config, events)
logger.debug(f"End camera: {camera}.")
logger.debug("End all cameras.")

View File

@ -2,7 +2,7 @@
import unittest
from frigate.output import get_canvas_shape
from frigate.output.birdseye import get_canvas_shape
class TestBirdseye(unittest.TestCase):

View File

@ -387,6 +387,7 @@ def copy_yuv_to_position(
destination_shape,
source_frame=None,
source_channel_dim=None,
interpolation=cv2.INTER_LINEAR,
):
# get the coordinates of the channels for this position in the layout
y, u1, u2, v1, v2 = get_yuv_crop(
@ -435,7 +436,6 @@ def copy_yuv_to_position(
uv_y_offset = y_y_offset // 4
uv_x_offset = y_x_offset // 2
interpolation = cv2.INTER_LINEAR
# resize/copy y channel
destination_frame[
y[1] + y_y_offset : y[1] + y_y_offset + y_resize_height,

View File

@ -0,0 +1,35 @@
"""Peewee migrations -- 021_create_previews_table.py.
Some examples (model - class or model name)::
> Model = migrator.orm['model_name'] # Return model in current state by name
> migrator.sql(sql) # Run custom SQL
> migrator.python(func, *args, **kwargs) # Run python code
> migrator.create_model(Model) # Create a model (could be used as decorator)
> migrator.remove_model(model, cascade=True) # Remove a model
> migrator.add_fields(model, **fields) # Add fields to a model
> migrator.change_fields(model, **fields) # Change fields
> migrator.remove_fields(model, *field_names, cascade=True)
> migrator.rename_field(model, old_field_name, new_field_name)
> migrator.rename_table(model, new_table_name)
> migrator.add_index(model, *col_names, unique=False)
> migrator.drop_index(model, *col_names)
> migrator.add_not_null(model, *field_names)
> migrator.drop_not_null(model, *field_names)
> migrator.add_default(model, field_name, default)
"""
import peewee as pw
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.sql(
'CREATE TABLE IF NOT EXISTS "previews" ("id" VARCHAR(30) NOT NULL PRIMARY KEY, "camera" VARCHAR(20) NOT NULL, "path" VARCHAR(255) NOT NULL, "start_time" DATETIME NOT NULL, "end_time" DATETIME NOT NULL, "duration" REAL NOT NULL)'
)
def rollback(migrator, database, fake=False, **kwargs):
pass