Optimize stacked recordings (#6809)

* Make camera recordings mover asynchronous

* Formatting

* Move to using cv2 instead of external ffmpeg process

* Use ffprobe if cv2 failed

* Formatting

* Fix bad access

* Formatting

* Update frigate/record/maintainer.py

Co-authored-by: Blake Blackshear <blakeb@blakeshome.com>

* Update name of caller

---------

Co-authored-by: Blake Blackshear <blakeb@blakeshome.com>
This commit is contained in:
Nicolas Mowen 2023-06-16 07:09:13 -06:00 committed by GitHub
parent 0592cedcde
commit b66810247e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 144 additions and 107 deletions

View File

@ -1,5 +1,6 @@
"""Maintain recording segments in cache.""" """Maintain recording segments in cache."""
import asyncio
import datetime import datetime
import logging import logging
import multiprocessing as mp import multiprocessing as mp
@ -20,7 +21,7 @@ from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR
from frigate.models import Event, Recordings from frigate.models import Event, Recordings
from frigate.types import RecordMetricsTypes from frigate.types import RecordMetricsTypes
from frigate.util import area from frigate.util import area, get_video_properties
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,7 +43,7 @@ class RecordingMaintainer(threading.Thread):
self.recordings_info: dict[str, Any] = defaultdict(list) self.recordings_info: dict[str, Any] = defaultdict(list)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {} self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
def move_files(self) -> None: async def move_files(self) -> None:
cache_files = sorted( cache_files = sorted(
[ [
d d
@ -121,9 +122,16 @@ class RecordingMaintainer(threading.Thread):
) )
.order_by(Event.start_time) .order_by(Event.start_time)
) )
for r in recordings:
cache_path = r["cache_path"] await asyncio.gather(
start_time = r["start_time"] *(self.validate_and_move_segment(camera, events, r) for r in recordings)
)
async def validate_and_move_segment(
self, camera: str, events: Event, recording: dict[str, any]
) -> None:
cache_path = recording["cache_path"]
start_time = recording["start_time"]
# Just delete files if recordings are turned off # Just delete files if recordings are turned off
if ( if (
@ -132,24 +140,15 @@ class RecordingMaintainer(threading.Thread):
): ):
Path(cache_path).unlink(missing_ok=True) Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None) self.end_time_cache.pop(cache_path, None)
continue return
if cache_path in self.end_time_cache: if cache_path in self.end_time_cache:
end_time, duration = self.end_time_cache[cache_path] end_time, duration = self.end_time_cache[cache_path]
else: else:
ffprobe_cmd = [ segment_info = get_video_properties(cache_path, get_duration=True)
"ffprobe",
"-v", if segment_info["duration"]:
"error", duration = float(segment_info["duration"])
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{cache_path}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else: else:
duration = -1 duration = -1
@ -159,15 +158,11 @@ class RecordingMaintainer(threading.Thread):
self.end_time_cache[cache_path] = (end_time, duration) self.end_time_cache[cache_path] = (end_time, duration)
else: else:
if duration == -1: if duration == -1:
logger.warning( logger.warning(f"Failed to probe corrupt segment {cache_path}")
f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}"
)
logger.warning( logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
f"Discarding a corrupt recording segment: {cache_path}"
)
Path(cache_path).unlink(missing_ok=True) Path(cache_path).unlink(missing_ok=True)
continue return
# if cached file's start_time is earlier than the retain days for the camera # if cached file's start_time is earlier than the retain days for the camera
if start_time <= ( if start_time <= (
@ -191,17 +186,12 @@ class RecordingMaintainer(threading.Thread):
# if the event is in progress or ends after the recording starts, keep it # if the event is in progress or ends after the recording starts, keep it
# and stop looking at events # and stop looking at events
if ( if event.end_time is None or event.end_time >= start_time.timestamp():
event.end_time is None
or event.end_time >= start_time.timestamp()
):
overlaps = True overlaps = True
break break
if overlaps: if overlaps:
record_mode = self.config.cameras[ record_mode = self.config.cameras[camera].record.events.retain.mode
camera
].record.events.retain.mode
# move from cache to recordings immediately # move from cache to recordings immediately
self.store_segment( self.store_segment(
camera, camera,
@ -214,12 +204,8 @@ class RecordingMaintainer(threading.Thread):
# if it doesn't overlap with an event, go ahead and drop the segment # if it doesn't overlap with an event, go ahead and drop the segment
# if it ends more than the configured pre_capture for the camera # if it ends more than the configured pre_capture for the camera
else: else:
pre_capture = self.config.cameras[ pre_capture = self.config.cameras[camera].record.events.pre_capture
camera most_recently_processed_frame_time = self.recordings_info[camera][-1][0]
].record.events.pre_capture
most_recently_processed_frame_time = self.recordings_info[
camera
][-1][0]
retain_cutoff = most_recently_processed_frame_time - pre_capture retain_cutoff = most_recently_processed_frame_time - pre_capture
if end_time.timestamp() < retain_cutoff: if end_time.timestamp() < retain_cutoff:
Path(cache_path).unlink(missing_ok=True) Path(cache_path).unlink(missing_ok=True)
@ -386,7 +372,7 @@ class RecordingMaintainer(threading.Thread):
break break
try: try:
self.move_files() asyncio.run(self.move_files())
except Exception as e: except Exception as e:
logger.error( logger.error(
"Error occurred when attempting to maintain recording cache" "Error occurred when attempting to maintain recording cache"

View File

@ -1144,3 +1144,54 @@ def to_relative_box(
(box[2] - box[0]) / width, # w (box[2] - box[0]) / width, # w
(box[3] - box[1]) / height, # h (box[3] - box[1]) / height, # h
) )
def get_video_properties(url, get_duration=False):
width = height = 0
# Open the video stream
video = cv2.VideoCapture(url)
# Check if the video stream was opened successfully
if not video.isOpened():
logger.debug(f"Error opening video stream {url}.")
return None
# Get the width of frames in the video stream
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Release the video stream
video.release()
result = {"width": round(width), "height": round(height)}
if get_duration:
# Get the frames per second (fps) of the video stream
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if fps and total_frames:
duration = total_frames / fps
else:
# if cv2 failed need to use ffprobe
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{url}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else:
duration = -1
result["duration"] = duration
return result