"""Handle outputting low res / fps preview segments from decoded frames.""" import datetime import logging import os import shutil import subprocess as sp import threading import time from pathlib import Path import cv2 import numpy as np from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, RecordQualityEnum from frigate.const import CACHE_DIR, CLIPS_DIR, INSERT_PREVIEW, PREVIEW_FRAME_TYPE from frigate.ffmpeg_presets import ( FPS_VFR_PARAM, EncodeTypeEnum, parse_preset_hardware_acceleration_encode, ) from frigate.models import Previews from frigate.object_processing import TrackedObject from frigate.util.image import copy_yuv_to_position, get_yuv_crop logger = logging.getLogger(__name__) FOLDER_PREVIEW_FRAMES = "preview_frames" PREVIEW_CACHE_DIR = os.path.join(CACHE_DIR, FOLDER_PREVIEW_FRAMES) PREVIEW_SEGMENT_DURATION = 3600 # one hour # important to have lower keyframe to maintain scrubbing performance PREVIEW_KEYFRAME_INTERVAL = 40 PREVIEW_HEIGHT = 180 PREVIEW_QUALITY_WEBP = { RecordQualityEnum.very_low: 70, RecordQualityEnum.low: 80, RecordQualityEnum.medium: 80, RecordQualityEnum.high: 80, RecordQualityEnum.very_high: 86, } PREVIEW_QUALITY_BIT_RATES = { RecordQualityEnum.very_low: 7168, RecordQualityEnum.low: 8196, RecordQualityEnum.medium: 9216, RecordQualityEnum.high: 9864, RecordQualityEnum.very_high: 10096, } def get_cache_image_name(camera: str, frame_time: float) -> str: """Get the image name in cache.""" return os.path.join( CACHE_DIR, f"{FOLDER_PREVIEW_FRAMES}/preview_{camera}-{frame_time}.{PREVIEW_FRAME_TYPE}", ) class FFMpegConverter(threading.Thread): """Convert a list of still frames into a vfr mp4.""" def __init__( self, config: CameraConfig, frame_times: list[float], requestor: InterProcessRequestor, ): threading.Thread.__init__(self) self.name = f"{config.name}_preview_converter" self.config = config self.frame_times = frame_times self.requestor = requestor self.path = os.path.join( CLIPS_DIR, f"previews/{self.config.name}/{self.frame_times[0]}-{self.frame_times[-1]}.mp4", ) # write a PREVIEW at fps and 1 key frame per clip self.ffmpeg_cmd = parse_preset_hardware_acceleration_encode( config.ffmpeg.hwaccel_args, input="-f concat -y -protocol_whitelist pipe,file -safe 0 -threads 1 -i /dev/stdin", output=f"-threads 1 -g {PREVIEW_KEYFRAME_INTERVAL} -bf 0 -b:v {PREVIEW_QUALITY_BIT_RATES[self.config.record.preview.quality]} {FPS_VFR_PARAM} -movflags +faststart -pix_fmt yuv420p {self.path}", type=EncodeTypeEnum.preview, ) def run(self) -> None: # generate input list item_count = len(self.frame_times) playlist = [] for t_idx in range(0, item_count): if t_idx == item_count - 1: # last frame does not get a duration playlist.append( f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" ) continue playlist.append( f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" ) playlist.append( f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}" ) try: p = sp.run( self.ffmpeg_cmd.split(" "), input="\n".join(playlist), encoding="ascii", capture_output=True, ) except BlockingIOError: logger.warning( f"Failed to create preview for {self.config.name}, retrying..." ) time.sleep(2) p = sp.run( self.ffmpeg_cmd.split(" "), input="\n".join(playlist), encoding="ascii", capture_output=True, ) start = self.frame_times[0] end = self.frame_times[-1] if p.returncode == 0: logger.debug("successfully saved preview") self.requestor.send_data( INSERT_PREVIEW, { Previews.id.name: f"{self.config.name}_{end}", Previews.camera.name: self.config.name, Previews.path.name: self.path, Previews.start_time.name: start, Previews.end_time.name: end, Previews.duration.name: end - start, }, ) else: logger.error(f"Error saving preview for {self.config.name} :: {p.stderr}") # unlink files from cache # don't delete last frame as it will be used as first frame in next segment for t in self.frame_times[0:-1]: Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) class PreviewRecorder: def __init__(self, config: CameraConfig) -> None: self.config = config self.start_time = 0 self.last_output_time = 0 self.output_frames = [] if config.detect.width > config.detect.height: self.out_height = PREVIEW_HEIGHT self.out_width = ( int((config.detect.width / config.detect.height) * self.out_height) // 4 * 4 ) else: self.out_width = PREVIEW_HEIGHT self.out_height = ( int((config.detect.height / config.detect.width) * self.out_width) // 4 * 4 ) # create communication for finished previews self.requestor = InterProcessRequestor() self.config_subscriber = ConfigSubscriber(f"config/record/{self.config.name}") y, u1, u2, v1, v2 = get_yuv_crop( self.config.frame_shape_yuv, ( 0, 0, self.config.frame_shape[1], self.config.frame_shape[0], ), ) self.channel_dims = { "y": y, "u1": u1, "u2": u2, "v1": v1, "v2": v2, } # end segment at end of hour self.segment_end = ( (datetime.datetime.now() + datetime.timedelta(hours=1)) .astimezone(datetime.timezone.utc) .replace(minute=0, second=0, microsecond=0) .timestamp() ) Path(PREVIEW_CACHE_DIR).mkdir(exist_ok=True) Path(os.path.join(CLIPS_DIR, f"previews/{config.name}")).mkdir( parents=True, exist_ok=True ) # check for existing items in cache start_ts = ( datetime.datetime.now() .astimezone(datetime.timezone.utc) .replace(minute=0, second=0, microsecond=0) .timestamp() ) file_start = f"preview_{config.name}" start_file = f"{file_start}-{start_ts}.webp" for file in sorted(os.listdir(os.path.join(CACHE_DIR, FOLDER_PREVIEW_FRAMES))): if not file.startswith(file_start): continue if file < start_file: os.unlink(os.path.join(PREVIEW_CACHE_DIR, file)) continue try: file_time = file.split("-")[-1][: -(len(PREVIEW_FRAME_TYPE) + 1)] if not file_time: continue ts = float(file_time) except ValueError: continue if self.start_time == 0: self.start_time = ts self.last_output_time = ts self.output_frames.append(ts) def should_write_frame( self, current_tracked_objects: list[dict[str, any]], motion_boxes: list[list[int]], frame_time: float, ) -> bool: """Decide if this frame should be added to PREVIEW.""" if not self.config.record.enabled: return False active_objs = get_active_objects( frame_time, self.config, current_tracked_objects ) preview_output_fps = 2 if any(o["label"] == "car" for o in active_objs) else 1 # limit output to 1 fps if (frame_time - self.last_output_time) < 1 / preview_output_fps: return False # send frame if a non-stationary object is in a zone if len(active_objs) > 0: self.last_output_time = frame_time return True if len(motion_boxes) > 0: self.last_output_time = frame_time return True # ensure that at least 2 frames are written every minute if frame_time - self.last_output_time > 30: self.last_output_time = frame_time return True return False def write_frame_to_cache(self, frame_time: float, frame) -> None: # resize yuv frame small_frame = np.zeros((self.out_height * 3 // 2, self.out_width), np.uint8) copy_yuv_to_position( small_frame, (0, 0), (self.out_height, self.out_width), frame, self.channel_dims, cv2.INTER_AREA, ) small_frame = cv2.cvtColor( small_frame, cv2.COLOR_YUV2BGR_I420, ) cv2.imwrite( get_cache_image_name(self.config.name, frame_time), small_frame, [ int(cv2.IMWRITE_WEBP_QUALITY), PREVIEW_QUALITY_WEBP[self.config.record.preview.quality], ], ) def write_data( self, current_tracked_objects: list[dict[str, any]], motion_boxes: list[list[int]], frame_time: float, frame, ) -> bool: # check for updated record config _, updated_record_config = self.config_subscriber.check_for_update() if updated_record_config: self.config.record = updated_record_config # always write the first frame if self.start_time == 0: self.start_time = frame_time self.output_frames.append(frame_time) self.write_frame_to_cache(frame_time, frame) return False # check if PREVIEW clip should be generated and cached frames reset if frame_time >= self.segment_end: if len(self.output_frames) > 0: # save last frame to ensure consistent duration if self.config.record: self.output_frames.append(frame_time) self.write_frame_to_cache(frame_time, frame) # write the preview if any frames exist for this hour FFMpegConverter( self.config, self.output_frames, self.requestor, ).start() # reset frame cache self.segment_end = ( (datetime.datetime.now() + datetime.timedelta(hours=1)) .astimezone(datetime.timezone.utc) .replace(minute=0, second=0, microsecond=0) .timestamp() ) self.start_time = frame_time self.last_output_time = frame_time self.output_frames: list[float] = [] # include first frame to ensure consistent duration if self.config.record.enabled: self.output_frames.append(frame_time) self.write_frame_to_cache(frame_time, frame) return True elif self.should_write_frame(current_tracked_objects, motion_boxes, frame_time): self.output_frames.append(frame_time) self.write_frame_to_cache(frame_time, frame) return False def flag_offline(self, frame_time: float) -> None: # check if PREVIEW clip should be generated and cached frames reset if frame_time >= self.segment_end: if len(self.output_frames) == 0: return old_frame_path = get_cache_image_name( self.config.name, self.output_frames[-1] ) new_frame_path = get_cache_image_name(self.config.name, frame_time) shutil.copy(old_frame_path, new_frame_path) # save last frame to ensure consistent duration self.output_frames.append(frame_time) FFMpegConverter( self.config, self.output_frames, self.requestor, ).start() # reset frame cache self.segment_end = ( (datetime.datetime.now() + datetime.timedelta(hours=1)) .astimezone(datetime.timezone.utc) .replace(minute=0, second=0, microsecond=0) .timestamp() ) self.start_time = frame_time self.last_output_time = frame_time self.output_frames = [] def stop(self) -> None: self.requestor.stop() def get_active_objects( frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject] ) -> list[TrackedObject]: """get active objects for detection.""" return [ o for o in all_objects if o["motionless_count"] < camera_config.detect.stationary.threshold and o["position_changes"] > 0 and o["frame_time"] == frame_time and not o["false_positive"] ]