blakeblackshear.frigate/frigate/record/maintainer.py

575 lines
22 KiB
Python
Raw Permalink Normal View History

"""Maintain recording segments in cache."""
import asyncio
2020-11-30 04:31:02 +01:00
import datetime
import logging
import os
import random
import string
2020-11-30 04:31:02 +01:00
import threading
import time
from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent
2020-11-30 04:31:02 +01:00
from pathlib import Path
from typing import Any, Optional, Tuple
2021-07-09 22:14:16 +02:00
import numpy as np
import psutil
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import (
CACHE_DIR,
CACHE_SEGMENT_FORMAT,
INSERT_MANY_RECORDINGS,
MAX_SEGMENT_DURATION,
2023-11-18 22:37:06 +01:00
MAX_SEGMENTS_IN_CACHE,
RECORD_DIR,
)
from frigate.models import Recordings, ReviewSegment
from frigate.util.services import get_video_properties
2020-11-30 04:31:02 +01:00
logger = logging.getLogger(__name__)
QUEUE_READ_TIMEOUT = 0.00001 # seconds
2020-12-01 04:08:47 +01:00
class SegmentInfo:
def __init__(
self,
motion_count: int,
active_object_count: int,
region_count: int,
average_dBFS: int,
) -> None:
self.motion_count = motion_count
self.active_object_count = active_object_count
self.region_count = region_count
self.average_dBFS = average_dBFS
def should_discard_segment(self, retain_mode: RetainModeEnum) -> bool:
return (
retain_mode == RetainModeEnum.motion
and self.motion_count == 0
and self.average_dBFS == 0
) or (
retain_mode == RetainModeEnum.active_objects
and self.active_object_count == 0
)
2020-11-30 04:31:02 +01:00
class RecordingMaintainer(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
super().__init__(name="recording_maintainer")
2020-11-30 04:31:02 +01:00
self.config = config
# create communication for retained recordings
self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber("config/record/")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
2020-11-30 04:31:02 +01:00
self.stop_event = stop_event
self.object_recordings_info: dict[str, list] = defaultdict(list)
self.audio_recordings_info: dict[str, list] = defaultdict(list)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
2020-11-30 04:31:02 +01:00
async def move_files(self) -> None:
cache_files = [
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
and not d.startswith("preview_")
]
2020-11-30 04:31:02 +01:00
files_in_use = []
for process in psutil.process_iter():
try:
2021-02-17 14:23:32 +01:00
if process.name() != "ffmpeg":
2020-12-24 21:23:59 +01:00
continue
file_list = process.open_files()
if file_list:
for nt in file_list:
2021-07-09 22:14:16 +02:00
if nt.path.startswith(CACHE_DIR):
2021-02-17 14:23:32 +01:00
files_in_use.append(nt.path.split("/")[-1])
except psutil.Error:
2020-11-30 04:31:02 +01:00
continue
# group recordings by camera
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
for cache in cache_files:
# Skip files currently in use
if cache in files_in_use:
2020-11-30 04:31:02 +01:00
continue
cache_path = os.path.join(CACHE_DIR, cache)
basename = os.path.splitext(cache)[0]
camera, date = basename.rsplit("@", maxsplit=1)
# important that start_time is utc because recordings are stored and compared in utc
start_time = datetime.datetime.strptime(
date, CACHE_SEGMENT_FORMAT
).astimezone(datetime.timezone.utc)
grouped_recordings[camera].append(
{
"cache_path": cache_path,
"start_time": start_time,
}
2021-02-17 14:23:32 +01:00
)
2020-11-30 04:31:02 +01:00
2023-11-18 22:37:06 +01:00
# delete all cached files past the most recent MAX_SEGMENTS_IN_CACHE
keep_count = MAX_SEGMENTS_IN_CACHE
2021-11-17 15:57:57 +01:00
for camera in grouped_recordings.keys():
# sort based on start time
grouped_recordings[camera] = sorted(
grouped_recordings[camera], key=lambda s: s["start_time"]
)
camera_info = self.object_recordings_info[camera]
most_recently_processed_frame_time = (
camera_info[-1][0] if len(camera_info) > 0 else 0
)
processed_segment_count = len(
list(
filter(
lambda r: r["start_time"].timestamp()
< most_recently_processed_frame_time,
grouped_recordings[camera],
)
)
)
# see if the recording mover is too slow and segments need to be deleted
if processed_segment_count > keep_count:
logger.warning(
f"Unable to keep up with recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {processed_segment_count} and discarding the rest..."
)
to_remove = grouped_recordings[camera][:-keep_count]
for rec in to_remove:
cache_path = rec["cache_path"]
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
2021-11-11 04:12:41 +01:00
# see if detection has failed and unprocessed segments need to be deleted
unprocessed_segment_count = (
len(grouped_recordings[camera]) - processed_segment_count
)
if unprocessed_segment_count > keep_count:
logger.warning(
f"Too many unprocessed recording segments in cache for {camera}. This likely indicates an issue with the detect stream, keeping the {keep_count} most recent segments out of {unprocessed_segment_count} and discarding the rest..."
)
to_remove = grouped_recordings[camera][:-keep_count]
for rec in to_remove:
cache_path = rec["cache_path"]
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
tasks = []
for camera, recordings in grouped_recordings.items():
# clear out all the object recording info for old frames
while (
len(self.object_recordings_info[camera]) > 0
and self.object_recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp()
):
self.object_recordings_info[camera].pop(0)
# clear out all the audio recording info for old frames
while (
len(self.audio_recordings_info[camera]) > 0
and self.audio_recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp()
):
self.audio_recordings_info[camera].pop(0)
# get all reviews with the end time after the start of the oldest cache file
# or with end_time None
reviews: ReviewSegment = (
ReviewSegment.select(
ReviewSegment.start_time,
ReviewSegment.end_time,
ReviewSegment.data,
)
.where(
ReviewSegment.camera == camera,
(ReviewSegment.end_time == None)
| (
ReviewSegment.end_time
>= recordings[0]["start_time"].timestamp()
),
)
.order_by(ReviewSegment.start_time)
)
2020-11-30 04:31:02 +01:00
tasks.extend(
[self.validate_and_move_segment(camera, reviews, r) for r in recordings]
)
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
# fire and forget recordings entries
self.requestor.send_data(
INSERT_MANY_RECORDINGS,
[r for r in recordings_to_insert if r is not None],
)
async def validate_and_move_segment(
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, any]
) -> None:
cache_path = recording["cache_path"]
start_time = recording["start_time"]
record_config = self.config.cameras[camera].record
2021-10-22 14:23:18 +02:00
# Just delete files if recordings are turned off
if (
camera not in self.config.cameras
or not self.config.cameras[camera].record.enabled
):
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
return
if cache_path in self.end_time_cache:
end_time, duration = self.end_time_cache[cache_path]
else:
segment_info = await get_video_properties(
self.config.ffmpeg, cache_path, get_duration=True
)
if segment_info["duration"]:
duration = float(segment_info["duration"])
else:
duration = -1
# ensure duration is within expected length
if 0 < duration < MAX_SEGMENT_DURATION:
end_time = start_time + datetime.timedelta(seconds=duration)
self.end_time_cache[cache_path] = (end_time, duration)
else:
if duration == -1:
logger.warning(f"Failed to probe corrupt segment {cache_path}")
logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
Path(cache_path).unlink(missing_ok=True)
return
# if cached file's start_time is earlier than the retain days for the camera
if start_time <= (
datetime.datetime.now().astimezone(datetime.timezone.utc)
- datetime.timedelta(days=self.config.cameras[camera].record.retain.days)
):
# if the cached segment overlaps with the events:
overlaps = False
for review in reviews:
# if the event starts in the future, stop checking events
# and remove this segment
if review.start_time > end_time.timestamp():
overlaps = False
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if review.end_time is None or review.end_time >= start_time.timestamp():
overlaps = True
break
if overlaps:
record_mode = (
record_config.alerts.retain.mode
if review.severity == "alert"
else record_config.detections.retain.mode
)
# move from cache to recordings immediately
return await self.move_segment(
camera,
start_time,
end_time,
duration,
cache_path,
record_mode,
)
# if it doesn't overlap with an event, go ahead and drop the segment
# if it ends more than the configured pre_capture for the camera
else:
pre_capture = max(
record_config.alerts.pre_capture,
record_config.detections.pre_capture,
)
2023-10-26 02:42:13 +02:00
camera_info = self.object_recordings_info[camera]
most_recently_processed_frame_time = (
camera_info[-1][0] if len(camera_info) > 0 else 0
)
retain_cutoff = datetime.datetime.fromtimestamp(
most_recently_processed_frame_time - pre_capture
2023-11-07 12:32:31 +01:00
).astimezone(datetime.timezone.utc)
if end_time < retain_cutoff:
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
# else retain days includes this segment
else:
2023-10-26 02:23:15 +02:00
# assume that empty means the relevant recording info has not been received yet
camera_info = self.object_recordings_info[camera]
most_recently_processed_frame_time = (
camera_info[-1][0] if len(camera_info) > 0 else 0
)
2023-10-25 21:06:57 +02:00
# ensure delayed segment info does not lead to lost segments
if (
datetime.datetime.fromtimestamp(
most_recently_processed_frame_time
).astimezone(datetime.timezone.utc)
>= end_time
):
2023-10-25 21:06:57 +02:00
record_mode = self.config.cameras[camera].record.retain.mode
return await self.move_segment(
camera, start_time, end_time, duration, cache_path, record_mode
)
def segment_stats(
self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime
) -> SegmentInfo:
video_frame_count = 0
2021-12-11 20:11:39 +01:00
active_count = 0
region_count = 0
motion_count = 0
for frame in self.object_recordings_info[camera]:
2021-12-11 20:11:39 +01:00
# frame is after end time of segment
if frame[0] > end_time.timestamp():
break
# frame is before start time of segment
if frame[0] < start_time.timestamp():
continue
video_frame_count += 1
2021-12-11 20:11:39 +01:00
active_count += len(
[
o
for o in frame[1]
2022-02-06 16:56:06 +01:00
if not o["false_positive"] and o["motionless_count"] == 0
2021-12-11 20:11:39 +01:00
]
)
motion_count += len(frame[2])
region_count += len(frame[3])
audio_values = []
for frame in self.audio_recordings_info[camera]:
# frame is after end time of segment
if frame[0] > end_time.timestamp():
break
# frame is before start time of segment
if frame[0] < start_time.timestamp():
continue
# add active audio label count to count of active objects
active_count += len(frame[2])
# add sound level to audio values
audio_values.append(frame[1])
average_dBFS = 0 if not audio_values else np.average(audio_values)
return SegmentInfo(
motion_count, active_count, region_count, round(average_dBFS)
)
2021-12-11 20:11:39 +01:00
async def move_segment(
2021-12-11 20:11:39 +01:00
self,
camera: str,
start_time: datetime.datetime,
end_time: datetime.datetime,
duration: float,
cache_path: str,
2021-12-11 20:11:39 +01:00
store_mode: RetainModeEnum,
) -> Optional[Recordings]:
segment_info = self.segment_stats(camera, start_time, end_time)
2021-12-11 20:11:39 +01:00
# check if the segment shouldn't be stored
if segment_info.should_discard_segment(store_mode):
2021-12-11 20:11:39 +01:00
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
return
# directory will be in utc due to start_time being in utc
directory = os.path.join(
RECORD_DIR,
start_time.strftime("%Y-%m-%d/%H"),
camera,
)
if not os.path.exists(directory):
os.makedirs(directory)
# file will be in utc due to start_time being in utc
file_name = f"{start_time.strftime('%M.%S.mp4')}"
file_path = os.path.join(directory, file_name)
try:
if not os.path.exists(file_path):
start_frame = datetime.datetime.now().timestamp()
# add faststart to kept segments to improve metadata reading
p = await asyncio.create_subprocess_exec(
self.config.ffmpeg.ffmpeg_path,
"-hide_banner",
"-y",
"-i",
cache_path,
"-c",
"copy",
"-movflags",
"+faststart",
file_path,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.DEVNULL,
)
await p.wait()
if p.returncode != 0:
logger.error(f"Unable to convert {cache_path} to {file_path}")
logger.error((await p.stderr.read()).decode("ascii"))
return None
else:
logger.debug(
f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds."
)
Limit recording retention to available storage (#3942) * Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
2022-10-09 13:28:26 +02:00
try:
# get the segment size of the cache file
# file without faststart is same size
Limit recording retention to available storage (#3942) * Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
2022-10-09 13:28:26 +02:00
segment_size = round(
float(os.path.getsize(cache_path)) / pow(2, 20), 1
Limit recording retention to available storage (#3942) * Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
2022-10-09 13:28:26 +02:00
)
except OSError:
segment_size = 0
os.remove(cache_path)
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
return {
Recordings.id.name: f"{start_time.timestamp()}-{rand_id}",
Recordings.camera.name: camera,
Recordings.path.name: file_path,
Recordings.start_time.name: start_time.timestamp(),
Recordings.end_time.name: end_time.timestamp(),
Recordings.duration.name: duration,
Recordings.motion.name: segment_info.motion_count,
# TODO: update this to store list of active objects at some point
Recordings.objects.name: segment_info.active_object_count,
Recordings.regions.name: segment_info.region_count,
Recordings.dBFS.name: segment_info.average_dBFS,
Recordings.segment_size.name: segment_size,
}
except Exception as e:
logger.error(f"Unable to store recording segment {cache_path}")
Path(cache_path).unlink(missing_ok=True)
logger.error(e)
2020-11-30 04:31:02 +01:00
# clear end_time cache
self.end_time_cache.pop(cache_path, None)
return None
def run(self) -> None:
# Check for new files every 5 seconds
wait_time = 0.0
while not self.stop_event.is_set():
time.sleep(wait_time)
if self.stop_event.is_set():
break
2021-10-22 14:23:18 +02:00
run_start = datetime.datetime.now().timestamp()
# check if there is an updated config
while True:
(
updated_topic,
updated_record_config,
) = self.config_subscriber.check_for_update()
if not updated_topic:
break
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
stale_frame_count = 0
stale_frame_count_threshold = 10
# empty the object recordings info queue
while True:
(topic, data) = self.detection_subscriber.check_for_update(
timeout=QUEUE_READ_TIMEOUT
)
if not topic:
break
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
if self.config.cameras[camera].record.enabled:
self.object_recordings_info[camera].append(
(
frame_time,
current_tracked_objects,
motion_boxes,
regions,
)
)
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
if self.config.cameras[camera].record.enabled:
self.audio_recordings_info[camera].append(
(
frame_time,
dBFS,
audio_detections,
)
)
elif topic == DetectionTypeEnum.api:
continue
if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1
if stale_frame_count > 0:
logger.debug(f"Found {stale_frame_count} old frames.")
try:
asyncio.run(self.move_files())
except Exception as e:
logger.error(
"Error occurred when attempting to maintain recording cache"
)
logger.error(e)
2021-11-17 15:57:57 +01:00
duration = datetime.datetime.now().timestamp() - run_start
wait_time = max(0, 5 - duration)
self.requestor.stop()
self.config_subscriber.stop()
self.detection_subscriber.stop()
logger.info("Exiting recording maintenance...")