diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 2d2d93f08..3ed6540d0 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -1,5 +1,6 @@ """Maintain recording segments in cache.""" +import asyncio import datetime import logging import multiprocessing as mp @@ -20,7 +21,7 @@ from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR from frigate.models import Event, Recordings from frigate.types import RecordMetricsTypes -from frigate.util import area +from frigate.util import area, get_video_properties logger = logging.getLogger(__name__) @@ -42,7 +43,7 @@ class RecordingMaintainer(threading.Thread): self.recordings_info: dict[str, Any] = defaultdict(list) self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {} - def move_files(self) -> None: + async def move_files(self) -> None: cache_files = sorted( [ d @@ -121,115 +122,100 @@ class RecordingMaintainer(threading.Thread): ) .order_by(Event.start_time) ) - for r in recordings: - cache_path = r["cache_path"] - start_time = r["start_time"] - # Just delete files if recordings are turned off - if ( - camera not in self.config.cameras - or not self.process_info[camera]["record_enabled"].value - ): + await asyncio.gather( + *(self.validate_and_move_segment(camera, events, r) for r in recordings) + ) + + async def validate_and_move_segment( + self, camera: str, events: Event, recording: dict[str, any] + ) -> None: + cache_path = recording["cache_path"] + start_time = recording["start_time"] + + # Just delete files if recordings are turned off + if ( + camera not in self.config.cameras + or not self.process_info[camera]["record_enabled"].value + ): + Path(cache_path).unlink(missing_ok=True) + self.end_time_cache.pop(cache_path, None) + return + + if cache_path in self.end_time_cache: + end_time, duration = self.end_time_cache[cache_path] + else: + segment_info = get_video_properties(cache_path, get_duration=True) + + if segment_info["duration"]: + duration = float(segment_info["duration"]) + else: + duration = -1 + + # ensure duration is within expected length + if 0 < duration < MAX_SEGMENT_DURATION: + end_time = start_time + datetime.timedelta(seconds=duration) + self.end_time_cache[cache_path] = (end_time, duration) + else: + if duration == -1: + logger.warning(f"Failed to probe corrupt segment {cache_path}") + + logger.warning(f"Discarding a corrupt recording segment: {cache_path}") + Path(cache_path).unlink(missing_ok=True) + return + + # if cached file's start_time is earlier than the retain days for the camera + if start_time <= ( + ( + datetime.datetime.now() + - datetime.timedelta( + days=self.config.cameras[camera].record.retain.days + ) + ) + ): + # if the cached segment overlaps with the events: + overlaps = False + for event in events: + # if the event starts in the future, stop checking events + # and remove this segment + if event.start_time > end_time.timestamp(): + overlaps = False Path(cache_path).unlink(missing_ok=True) self.end_time_cache.pop(cache_path, None) - continue + break - if cache_path in self.end_time_cache: - end_time, duration = self.end_time_cache[cache_path] - else: - ffprobe_cmd = [ - "ffprobe", - "-v", - "error", - "-show_entries", - "format=duration", - "-of", - "default=noprint_wrappers=1:nokey=1", - f"{cache_path}", - ] - p = sp.run(ffprobe_cmd, capture_output=True) - if p.returncode == 0 and p.stdout.decode(): - duration = float(p.stdout.decode().strip()) - else: - duration = -1 + # if the event is in progress or ends after the recording starts, keep it + # and stop looking at events + if event.end_time is None or event.end_time >= start_time.timestamp(): + overlaps = True + break - # ensure duration is within expected length - if 0 < duration < MAX_SEGMENT_DURATION: - end_time = start_time + datetime.timedelta(seconds=duration) - self.end_time_cache[cache_path] = (end_time, duration) - else: - if duration == -1: - logger.warning( - f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}" - ) - - logger.warning( - f"Discarding a corrupt recording segment: {cache_path}" - ) - Path(cache_path).unlink(missing_ok=True) - continue - - # if cached file's start_time is earlier than the retain days for the camera - if start_time <= ( - ( - datetime.datetime.now() - - datetime.timedelta( - days=self.config.cameras[camera].record.retain.days - ) - ) - ): - # if the cached segment overlaps with the events: - overlaps = False - for event in events: - # if the event starts in the future, stop checking events - # and remove this segment - if event.start_time > end_time.timestamp(): - overlaps = False - Path(cache_path).unlink(missing_ok=True) - self.end_time_cache.pop(cache_path, None) - break - - # if the event is in progress or ends after the recording starts, keep it - # and stop looking at events - if ( - event.end_time is None - or event.end_time >= start_time.timestamp() - ): - overlaps = True - break - - if overlaps: - record_mode = self.config.cameras[ - camera - ].record.events.retain.mode - # move from cache to recordings immediately - self.store_segment( - camera, - start_time, - end_time, - duration, - cache_path, - record_mode, - ) - # if it doesn't overlap with an event, go ahead and drop the segment - # if it ends more than the configured pre_capture for the camera - else: - pre_capture = self.config.cameras[ - camera - ].record.events.pre_capture - most_recently_processed_frame_time = self.recordings_info[ - camera - ][-1][0] - retain_cutoff = most_recently_processed_frame_time - pre_capture - if end_time.timestamp() < retain_cutoff: - Path(cache_path).unlink(missing_ok=True) - self.end_time_cache.pop(cache_path, None) - # else retain days includes this segment - else: - record_mode = self.config.cameras[camera].record.retain.mode - self.store_segment( - camera, start_time, end_time, duration, cache_path, record_mode - ) + if overlaps: + record_mode = self.config.cameras[camera].record.events.retain.mode + # move from cache to recordings immediately + self.store_segment( + camera, + start_time, + end_time, + duration, + cache_path, + record_mode, + ) + # if it doesn't overlap with an event, go ahead and drop the segment + # if it ends more than the configured pre_capture for the camera + else: + pre_capture = self.config.cameras[camera].record.events.pre_capture + most_recently_processed_frame_time = self.recordings_info[camera][-1][0] + retain_cutoff = most_recently_processed_frame_time - pre_capture + if end_time.timestamp() < retain_cutoff: + Path(cache_path).unlink(missing_ok=True) + self.end_time_cache.pop(cache_path, None) + # else retain days includes this segment + else: + record_mode = self.config.cameras[camera].record.retain.mode + self.store_segment( + camera, start_time, end_time, duration, cache_path, record_mode + ) def segment_stats( self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime @@ -386,7 +372,7 @@ class RecordingMaintainer(threading.Thread): break try: - self.move_files() + asyncio.run(self.move_files()) except Exception as e: logger.error( "Error occurred when attempting to maintain recording cache" diff --git a/frigate/util.py b/frigate/util.py index 396a0f874..f18d3279e 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -1144,3 +1144,54 @@ def to_relative_box( (box[2] - box[0]) / width, # w (box[3] - box[1]) / height, # h ) + + +def get_video_properties(url, get_duration=False): + width = height = 0 + # Open the video stream + video = cv2.VideoCapture(url) + + # Check if the video stream was opened successfully + if not video.isOpened(): + logger.debug(f"Error opening video stream {url}.") + return None + + # Get the width of frames in the video stream + width = video.get(cv2.CAP_PROP_FRAME_WIDTH) + + # Get the height of frames in the video stream + height = video.get(cv2.CAP_PROP_FRAME_HEIGHT) + + # Release the video stream + video.release() + + result = {"width": round(width), "height": round(height)} + + if get_duration: + # Get the frames per second (fps) of the video stream + fps = video.get(cv2.CAP_PROP_FPS) + total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) + + if fps and total_frames: + duration = total_frames / fps + else: + # if cv2 failed need to use ffprobe + ffprobe_cmd = [ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + f"{url}", + ] + p = sp.run(ffprobe_cmd, capture_output=True) + if p.returncode == 0 and p.stdout.decode(): + duration = float(p.stdout.decode().strip()) + else: + duration = -1 + + result["duration"] = duration + + return result