blakeblackshear.frigate/frigate/output/preview.py

320 lines
10 KiB
Python
Raw Normal View History

"""Handle outputting low res / fps preview segments from decoded frames."""
import datetime
import logging
import os
import subprocess as sp
import threading
from pathlib import Path
import cv2
import numpy as np
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, RecordQualityEnum
from frigate.const import CACHE_DIR, CLIPS_DIR, INSERT_PREVIEW, PREVIEW_FRAME_TYPE
from frigate.ffmpeg_presets import (
FPS_VFR_PARAM,
EncodeTypeEnum,
parse_preset_hardware_acceleration_encode,
)
from frigate.models import Previews
from frigate.object_processing import TrackedObject
from frigate.util.image import copy_yuv_to_position, get_yuv_crop
logger = logging.getLogger(__name__)
FOLDER_PREVIEW_FRAMES = "preview_frames"
2024-03-05 20:56:38 +01:00
PREVIEW_CACHE_DIR = os.path.join(CACHE_DIR, FOLDER_PREVIEW_FRAMES)
PREVIEW_SEGMENT_DURATION = 3600 # one hour
# important to have lower keyframe to maintain scrubbing performance
PREVIEW_KEYFRAME_INTERVAL = 40
PREVIEW_HEIGHT = 180
PREVIEW_QUALITY_WEBP = {
RecordQualityEnum.very_low: 70,
RecordQualityEnum.low: 80,
RecordQualityEnum.medium: 80,
RecordQualityEnum.high: 80,
RecordQualityEnum.very_high: 86,
}
PREVIEW_QUALITY_BIT_RATES = {
RecordQualityEnum.very_low: 7168,
RecordQualityEnum.low: 8196,
RecordQualityEnum.medium: 9216,
RecordQualityEnum.high: 9864,
RecordQualityEnum.very_high: 10096,
}
def get_cache_image_name(camera: str, frame_time: float) -> str:
"""Get the image name in cache."""
return os.path.join(
CACHE_DIR,
f"{FOLDER_PREVIEW_FRAMES}/preview_{camera}-{frame_time}.{PREVIEW_FRAME_TYPE}",
)
class FFMpegConverter(threading.Thread):
"""Convert a list of still frames into a vfr mp4."""
def __init__(
self,
config: CameraConfig,
frame_times: list[float],
requestor: InterProcessRequestor,
):
threading.Thread.__init__(self)
self.name = f"{config.name}_preview_converter"
self.config = config
self.frame_times = frame_times
self.requestor = requestor
self.path = os.path.join(
CLIPS_DIR,
f"previews/{self.config.name}/{self.frame_times[0]}-{self.frame_times[-1]}.mp4",
)
# write a PREVIEW at fps and 1 key frame per clip
self.ffmpeg_cmd = parse_preset_hardware_acceleration_encode(
config.ffmpeg.hwaccel_args,
input="-f concat -y -protocol_whitelist pipe,file -safe 0 -i /dev/stdin",
2024-05-22 20:25:58 +02:00
output=f"-g {PREVIEW_KEYFRAME_INTERVAL}{' -fpsmax 2' if int(os.getenv('LIBAVFORMAT_VERSION_MAJOR', '59')) >= 59 else ''} -bf 0 -b:v {PREVIEW_QUALITY_BIT_RATES[self.config.record.preview.quality]} {FPS_VFR_PARAM} -movflags +faststart -pix_fmt yuv420p {self.path}",
type=EncodeTypeEnum.preview,
)
def run(self) -> None:
# generate input list
item_count = len(self.frame_times)
playlist = []
for t_idx in range(0, item_count):
if t_idx == item_count - 1:
# last frame does not get a duration
playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'"
)
continue
playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'"
)
playlist.append(
f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}"
)
p = sp.run(
self.ffmpeg_cmd.split(" "),
input="\n".join(playlist),
encoding="ascii",
capture_output=True,
)
start = self.frame_times[0]
end = self.frame_times[-1]
if p.returncode == 0:
logger.debug("successfully saved preview")
self.requestor.send_data(
INSERT_PREVIEW,
{
Previews.id: f"{self.config.name}_{end}",
Previews.camera: self.config.name,
Previews.path: self.path,
Previews.start_time: start,
Previews.end_time: end,
Previews.duration: end - start,
},
)
else:
logger.error(f"Error saving preview for {self.config.name} :: {p.stderr}")
# unlink files from cache
# don't delete last frame as it will be used as first frame in next segment
for t in self.frame_times[0:-1]:
Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True)
class PreviewRecorder:
def __init__(self, config: CameraConfig) -> None:
self.config = config
self.start_time = 0
self.last_output_time = 0
self.output_frames = []
self.out_height = PREVIEW_HEIGHT
self.out_width = (
int((config.detect.width / config.detect.height) * self.out_height) // 4 * 4
)
# create communication for finished previews
self.requestor = InterProcessRequestor()
y, u1, u2, v1, v2 = get_yuv_crop(
self.config.frame_shape_yuv,
(
0,
0,
self.config.frame_shape[1],
self.config.frame_shape[0],
),
)
self.channel_dims = {
"y": y,
"u1": u1,
"u2": u2,
"v1": v1,
"v2": v2,
}
# end segment at end of hour
self.segment_end = (
(datetime.datetime.now() + datetime.timedelta(hours=1))
.replace(minute=0, second=0, microsecond=0)
.timestamp()
)
2024-03-05 20:56:38 +01:00
Path(PREVIEW_CACHE_DIR).mkdir(exist_ok=True)
Path(os.path.join(CLIPS_DIR, f"previews/{config.name}")).mkdir(
parents=True, exist_ok=True
)
2024-03-05 20:56:38 +01:00
# check for existing items in cache
start_ts = (
datetime.datetime.now()
.replace(minute=0, second=0, microsecond=0)
.timestamp()
)
file_start = f"preview_{config.name}"
start_file = f"{file_start}-{start_ts}.webp"
2024-03-05 20:56:38 +01:00
for file in sorted(os.listdir(os.path.join(CACHE_DIR, FOLDER_PREVIEW_FRAMES))):
if not file.startswith(file_start):
continue
if file < start_file:
os.unlink(os.path.join(PREVIEW_CACHE_DIR, file))
continue
ts = float(file.split("-")[1][: -(len(PREVIEW_FRAME_TYPE) + 1)])
2024-03-05 20:56:38 +01:00
if self.start_time == 0:
self.start_time = ts
self.last_output_time = ts
self.output_frames.append(ts)
def should_write_frame(
self,
current_tracked_objects: list[dict[str, any]],
motion_boxes: list[list[int]],
frame_time: float,
) -> bool:
"""Decide if this frame should be added to PREVIEW."""
active_objs = get_active_objects(
frame_time, self.config, current_tracked_objects
)
preview_output_fps = 2 if any(o["label"] == "car" for o in active_objs) else 1
# limit output to 1 fps
if (frame_time - self.last_output_time) < 1 / preview_output_fps:
return False
# send frame if a non-stationary object is in a zone
2024-03-09 00:45:42 +01:00
if len(active_objs) > 0:
self.last_output_time = frame_time
return True
if len(motion_boxes) > 0:
self.last_output_time = frame_time
return True
# ensure that at least 2 frames are written every minute
if frame_time - self.last_output_time > 30:
self.last_output_time = frame_time
return True
return False
def write_frame_to_cache(self, frame_time: float, frame) -> None:
# resize yuv frame
small_frame = np.zeros((self.out_height * 3 // 2, self.out_width), np.uint8)
copy_yuv_to_position(
small_frame,
(0, 0),
(self.out_height, self.out_width),
frame,
self.channel_dims,
cv2.INTER_AREA,
)
small_frame = cv2.cvtColor(
small_frame,
cv2.COLOR_YUV2BGR_I420,
)
cv2.imwrite(
get_cache_image_name(self.config.name, frame_time),
small_frame,
[
int(cv2.IMWRITE_WEBP_QUALITY),
PREVIEW_QUALITY_WEBP[self.config.record.preview.quality],
],
)
def write_data(
self,
current_tracked_objects: list[dict[str, any]],
motion_boxes: list[list[int]],
frame_time: float,
frame,
) -> None:
# always write the first frame
if self.start_time == 0:
self.start_time = frame_time
self.output_frames.append(frame_time)
self.write_frame_to_cache(frame_time, frame)
return
# check if PREVIEW clip should be generated and cached frames reset
if frame_time >= self.segment_end:
# save last frame to ensure consistent duration
self.output_frames.append(frame_time)
self.write_frame_to_cache(frame_time, frame)
FFMpegConverter(
self.config,
self.output_frames,
self.requestor,
).start()
# reset frame cache
self.segment_end = (
(datetime.datetime.now() + datetime.timedelta(hours=1))
.replace(minute=0, second=0, microsecond=0)
.timestamp()
)
self.start_time = frame_time
self.last_output_time = frame_time
self.output_frames = []
# include first frame to ensure consistent duration
self.output_frames.append(frame_time)
self.write_frame_to_cache(frame_time, frame)
elif self.should_write_frame(current_tracked_objects, motion_boxes, frame_time):
self.output_frames.append(frame_time)
self.write_frame_to_cache(frame_time, frame)
def stop(self) -> None:
self.requestor.stop()
def get_active_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject]
) -> list[TrackedObject]:
"""get active objects for detection."""
return [
o
for o in all_objects
if o["motionless_count"] < camera_config.detect.stationary.threshold
and o["position_changes"] > 0
and o["frame_time"] == frame_time
and not o["false_positive"]
]