blakeblackshear.frigate/frigate/record/maintainer.py

513 lines
19 KiB
Python
Raw Normal View History

"""Maintain recording segments in cache."""
import asyncio
2020-11-30 04:31:02 +01:00
import datetime
import logging
import multiprocessing as mp
2020-11-30 04:31:02 +01:00
import os
import queue
import random
import string
2020-11-30 04:31:02 +01:00
import threading
from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent
2020-11-30 04:31:02 +01:00
from pathlib import Path
from typing import Any, Optional, Tuple
2021-07-09 22:14:16 +02:00
import numpy as np
import psutil
from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import (
CACHE_DIR,
INSERT_MANY_RECORDINGS,
MAX_SEGMENT_DURATION,
RECORD_DIR,
)
2021-07-09 22:14:16 +02:00
from frigate.models import Event, Recordings
from frigate.types import FeatureMetricsTypes
from frigate.util.image import area
from frigate.util.services import get_video_properties
2020-11-30 04:31:02 +01:00
logger = logging.getLogger(__name__)
2020-12-01 04:08:47 +01:00
class SegmentInfo:
def __init__(
self, motion_box_count: int, active_object_count: int, average_dBFS: int
) -> None:
self.motion_box_count = motion_box_count
self.active_object_count = active_object_count
self.average_dBFS = average_dBFS
def should_discard_segment(self, retain_mode: RetainModeEnum) -> bool:
return (
retain_mode == RetainModeEnum.motion
and self.motion_box_count == 0
and self.average_dBFS == 0
) or (
retain_mode == RetainModeEnum.active_objects
and self.active_object_count == 0
)
2020-11-30 04:31:02 +01:00
class RecordingMaintainer(threading.Thread):
def __init__(
self,
config: FrigateConfig,
inter_process_queue: mp.Queue,
object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: Optional[mp.Queue],
process_info: dict[str, FeatureMetricsTypes],
stop_event: MpEvent,
):
2020-11-30 04:31:02 +01:00
threading.Thread.__init__(self)
self.name = "recording_maintainer"
2020-11-30 04:31:02 +01:00
self.config = config
self.inter_process_queue = inter_process_queue
self.object_recordings_info_queue = object_recordings_info_queue
self.audio_recordings_info_queue = audio_recordings_info_queue
self.process_info = process_info
2020-11-30 04:31:02 +01:00
self.stop_event = stop_event
self.object_recordings_info: dict[str, list] = defaultdict(list)
self.audio_recordings_info: dict[str, list] = defaultdict(list)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
2020-11-30 04:31:02 +01:00
async def move_files(self) -> None:
cache_files = sorted(
[
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
and not d.startswith("clip_")
]
)
2020-11-30 04:31:02 +01:00
files_in_use = []
for process in psutil.process_iter():
try:
2021-02-17 14:23:32 +01:00
if process.name() != "ffmpeg":
2020-12-24 21:23:59 +01:00
continue
2020-11-30 04:31:02 +01:00
flist = process.open_files()
if flist:
for nt in flist:
2021-07-09 22:14:16 +02:00
if nt.path.startswith(CACHE_DIR):
2021-02-17 14:23:32 +01:00
files_in_use.append(nt.path.split("/")[-1])
except psutil.Error:
2020-11-30 04:31:02 +01:00
continue
# group recordings by camera
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
for cache in cache_files:
# Skip files currently in use
if cache in files_in_use:
2020-11-30 04:31:02 +01:00
continue
cache_path = os.path.join(CACHE_DIR, cache)
basename = os.path.splitext(cache)[0]
camera, date = basename.rsplit("-", maxsplit=1)
start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
grouped_recordings[camera].append(
{
"cache_path": cache_path,
"start_time": start_time,
}
2021-02-17 14:23:32 +01:00
)
2020-11-30 04:31:02 +01:00
2021-11-19 14:16:29 +01:00
# delete all cached files past the most recent 5
keep_count = 5
2021-11-17 15:57:57 +01:00
for camera in grouped_recordings.keys():
segment_count = len(grouped_recordings[camera])
if segment_count > keep_count:
logger.warning(
f"Unable to keep up with recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {segment_count} and discarding the rest..."
)
to_remove = grouped_recordings[camera][:-keep_count]
for rec in to_remove:
cache_path = rec["cache_path"]
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
2021-11-11 04:12:41 +01:00
tasks = []
for camera, recordings in grouped_recordings.items():
# clear out all the object recording info for old frames
while (
len(self.object_recordings_info[camera]) > 0
and self.object_recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp()
):
self.object_recordings_info[camera].pop(0)
# clear out all the audio recording info for old frames
while (
len(self.audio_recordings_info[camera]) > 0
and self.audio_recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp()
):
self.audio_recordings_info[camera].pop(0)
# get all events with the end time after the start of the oldest cache file
# or with end_time None
events: Event = (
Event.select(
Event.start_time,
Event.end_time,
)
.where(
Event.camera == camera,
(Event.end_time == None)
| (Event.end_time >= recordings[0]["start_time"].timestamp()),
Event.has_clip,
)
.order_by(Event.start_time)
.namedtuples()
.iterator()
)
2020-11-30 04:31:02 +01:00
tasks.extend(
[self.validate_and_move_segment(camera, events, r) for r in recordings]
)
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
# fire and forget recordings entries
self.inter_process_queue.put(
(INSERT_MANY_RECORDINGS, [r for r in recordings_to_insert if r is not None])
)
async def validate_and_move_segment(
self, camera: str, events: Event, recording: dict[str, any]
) -> None:
cache_path = recording["cache_path"]
start_time = recording["start_time"]
2021-10-22 14:23:18 +02:00
# Just delete files if recordings are turned off
if (
camera not in self.config.cameras
or not self.process_info[camera]["record_enabled"].value
):
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
return
if cache_path in self.end_time_cache:
end_time, duration = self.end_time_cache[cache_path]
else:
segment_info = await get_video_properties(cache_path, get_duration=True)
if segment_info["duration"]:
duration = float(segment_info["duration"])
else:
duration = -1
# ensure duration is within expected length
if 0 < duration < MAX_SEGMENT_DURATION:
end_time = start_time + datetime.timedelta(seconds=duration)
self.end_time_cache[cache_path] = (end_time, duration)
else:
if duration == -1:
logger.warning(f"Failed to probe corrupt segment {cache_path}")
logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
Path(cache_path).unlink(missing_ok=True)
return
# if cached file's start_time is earlier than the retain days for the camera
if start_time <= (
(
datetime.datetime.now()
- datetime.timedelta(
days=self.config.cameras[camera].record.retain.days
)
)
):
# if the cached segment overlaps with the events:
overlaps = False
for event in events:
# if the event starts in the future, stop checking events
# and remove this segment
if event.start_time > end_time.timestamp():
overlaps = False
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event.end_time is None or event.end_time >= start_time.timestamp():
overlaps = True
break
if overlaps:
record_mode = self.config.cameras[camera].record.events.retain.mode
# move from cache to recordings immediately
return await self.move_segment(
camera,
start_time,
end_time,
duration,
cache_path,
record_mode,
)
# if it doesn't overlap with an event, go ahead and drop the segment
# if it ends more than the configured pre_capture for the camera
else:
pre_capture = self.config.cameras[camera].record.events.pre_capture
2023-10-26 02:42:13 +02:00
camera_info = self.object_recordings_info[camera]
most_recently_processed_frame_time = (
camera_info[-1][0] if len(camera_info) > 0 else 0
)
retain_cutoff = most_recently_processed_frame_time - pre_capture
if end_time.timestamp() < retain_cutoff:
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
# else retain days includes this segment
else:
2023-10-26 02:23:15 +02:00
# assume that empty means the relevant recording info has not been received yet
camera_info = self.object_recordings_info[camera]
most_recently_processed_frame_time = (
camera_info[-1][0] if len(camera_info) > 0 else 0
)
2023-10-25 21:06:57 +02:00
# ensure delayed segment info does not lead to lost segments
if most_recently_processed_frame_time >= end_time.timestamp():
2023-10-25 21:06:57 +02:00
record_mode = self.config.cameras[camera].record.retain.mode
return await self.move_segment(
camera, start_time, end_time, duration, cache_path, record_mode
)
def segment_stats(
self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime
) -> SegmentInfo:
2021-12-11 20:11:39 +01:00
active_count = 0
motion_count = 0
for frame in self.object_recordings_info[camera]:
2021-12-11 20:11:39 +01:00
# frame is after end time of segment
if frame[0] > end_time.timestamp():
break
# frame is before start time of segment
if frame[0] < start_time.timestamp():
continue
active_count += len(
[
o
for o in frame[1]
2022-02-06 16:56:06 +01:00
if not o["false_positive"] and o["motionless_count"] == 0
2021-12-11 20:11:39 +01:00
]
)
motion_count += sum([area(box) for box in frame[2]])
audio_values = []
for frame in self.audio_recordings_info[camera]:
# frame is after end time of segment
if frame[0] > end_time.timestamp():
break
# frame is before start time of segment
if frame[0] < start_time.timestamp():
continue
# add active audio label count to count of active objects
active_count += len(frame[2])
# add sound level to audio values
audio_values.append(frame[1])
average_dBFS = 0 if not audio_values else np.average(audio_values)
return SegmentInfo(motion_count, active_count, round(average_dBFS))
2021-12-11 20:11:39 +01:00
async def move_segment(
2021-12-11 20:11:39 +01:00
self,
camera: str,
start_time: datetime.datetime,
end_time: datetime.datetime,
duration: float,
cache_path: str,
2021-12-11 20:11:39 +01:00
store_mode: RetainModeEnum,
) -> Optional[Recordings]:
segment_info = self.segment_stats(camera, start_time, end_time)
2021-12-11 20:11:39 +01:00
# check if the segment shouldn't be stored
if segment_info.should_discard_segment(store_mode):
2021-12-11 20:11:39 +01:00
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
return
directory = os.path.join(
RECORD_DIR,
start_time.astimezone(tz=datetime.timezone.utc).strftime("%Y-%m-%d/%H"),
camera,
)
if not os.path.exists(directory):
os.makedirs(directory)
file_name = (
f"{start_time.replace(tzinfo=datetime.timezone.utc).strftime('%M.%S.mp4')}"
)
file_path = os.path.join(directory, file_name)
try:
if not os.path.exists(file_path):
start_frame = datetime.datetime.now().timestamp()
# add faststart to kept segments to improve metadata reading
p = await asyncio.create_subprocess_exec(
"ffmpeg",
"-hide_banner",
"-y",
"-i",
cache_path,
"-c",
"copy",
"-movflags",
"+faststart",
file_path,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.DEVNULL,
)
await p.wait()
if p.returncode != 0:
logger.error(f"Unable to convert {cache_path} to {file_path}")
logger.error((await p.stderr.read()).decode("ascii"))
return None
else:
logger.debug(
f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds."
)
Limit recording retention to available storage (#3942) * Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
2022-10-09 13:28:26 +02:00
try:
# get the segment size of the cache file
# file without faststart is same size
Limit recording retention to available storage (#3942) * Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
2022-10-09 13:28:26 +02:00
segment_size = round(
float(os.path.getsize(cache_path)) / pow(2, 20), 1
Limit recording retention to available storage (#3942) * Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
2022-10-09 13:28:26 +02:00
)
except OSError:
segment_size = 0
os.remove(cache_path)
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
return {
Recordings.id: f"{start_time.timestamp()}-{rand_id}",
Recordings.camera: camera,
Recordings.path: file_path,
Recordings.start_time: start_time.timestamp(),
Recordings.end_time: end_time.timestamp(),
Recordings.duration: duration,
Recordings.motion: segment_info.motion_box_count,
# TODO: update this to store list of active objects at some point
Recordings.objects: segment_info.active_object_count,
Recordings.dBFS: segment_info.average_dBFS,
Recordings.segment_size: segment_size,
}
except Exception as e:
logger.error(f"Unable to store recording segment {cache_path}")
Path(cache_path).unlink(missing_ok=True)
logger.error(e)
2020-11-30 04:31:02 +01:00
# clear end_time cache
self.end_time_cache.pop(cache_path, None)
return None
def run(self) -> None:
camera_count = sum(camera.enabled for camera in self.config.cameras.values())
# Check for new files every 5 seconds
wait_time = 0.0
2021-10-22 14:23:18 +02:00
while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp()
stale_frame_count = 0
stale_frame_count_threshold = 10
# empty the object recordings info queue
while True:
try:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = self.object_recordings_info_queue.get(True, timeout=0.01)
if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1
if self.process_info[camera]["record_enabled"].value:
self.object_recordings_info[camera].append(
(
frame_time,
current_tracked_objects,
motion_boxes,
regions,
)
)
except queue.Empty:
q_size = self.object_recordings_info_queue.qsize()
if q_size > camera_count:
logger.debug(
f"object_recordings_info loop queue not empty ({q_size})."
)
break
if stale_frame_count > 0:
2023-10-26 23:50:31 +02:00
logger.debug(f"Found {stale_frame_count} old frames.")
# empty the audio recordings info queue if audio is enabled
if self.audio_recordings_info_queue:
stale_frame_count = 0
while True:
try:
(
camera,
frame_time,
dBFS,
audio_detections,
) = self.audio_recordings_info_queue.get(True, timeout=0.01)
if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1
if self.process_info[camera]["record_enabled"].value:
self.audio_recordings_info[camera].append(
(
frame_time,
dBFS,
audio_detections,
)
)
except queue.Empty:
q_size = self.audio_recordings_info_queue.qsize()
if q_size > camera_count:
logger.debug(
f"object_recordings_info loop audio queue not empty ({q_size})."
)
break
if stale_frame_count > 0:
logger.error(
f"Found {stale_frame_count} old audio frames, segments from recordings may be missing"
)
try:
asyncio.run(self.move_files())
except Exception as e:
logger.error(
"Error occurred when attempting to maintain recording cache"
)
logger.error(e)
2021-11-17 15:57:57 +01:00
duration = datetime.datetime.now().timestamp() - run_start
wait_time = max(0, 5 - duration)
logger.info("Exiting recording maintenance...")