mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-07 00:06:57 +01:00
3879fde06d
* Don't allow unlimited unprocessed frames to stay in cache * Formatting
575 lines
22 KiB
Python
575 lines
22 KiB
Python
"""Maintain recording segments in cache."""
|
|
|
|
import asyncio
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import random
|
|
import string
|
|
import threading
|
|
import time
|
|
from collections import defaultdict
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
from pathlib import Path
|
|
from typing import Any, Optional, Tuple
|
|
|
|
import numpy as np
|
|
import psutil
|
|
|
|
from frigate.comms.config_updater import ConfigSubscriber
|
|
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
|
|
from frigate.comms.inter_process import InterProcessRequestor
|
|
from frigate.config import FrigateConfig, RetainModeEnum
|
|
from frigate.const import (
|
|
CACHE_DIR,
|
|
CACHE_SEGMENT_FORMAT,
|
|
INSERT_MANY_RECORDINGS,
|
|
MAX_SEGMENT_DURATION,
|
|
MAX_SEGMENTS_IN_CACHE,
|
|
RECORD_DIR,
|
|
)
|
|
from frigate.models import Recordings, ReviewSegment
|
|
from frigate.util.services import get_video_properties
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
QUEUE_READ_TIMEOUT = 0.00001 # seconds
|
|
|
|
|
|
class SegmentInfo:
|
|
def __init__(
|
|
self,
|
|
motion_count: int,
|
|
active_object_count: int,
|
|
region_count: int,
|
|
average_dBFS: int,
|
|
) -> None:
|
|
self.motion_count = motion_count
|
|
self.active_object_count = active_object_count
|
|
self.region_count = region_count
|
|
self.average_dBFS = average_dBFS
|
|
|
|
def should_discard_segment(self, retain_mode: RetainModeEnum) -> bool:
|
|
return (
|
|
retain_mode == RetainModeEnum.motion
|
|
and self.motion_count == 0
|
|
and self.average_dBFS == 0
|
|
) or (
|
|
retain_mode == RetainModeEnum.active_objects
|
|
and self.active_object_count == 0
|
|
)
|
|
|
|
|
|
class RecordingMaintainer(threading.Thread):
|
|
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
|
|
super().__init__(name="recording_maintainer")
|
|
self.config = config
|
|
|
|
# create communication for retained recordings
|
|
self.requestor = InterProcessRequestor()
|
|
self.config_subscriber = ConfigSubscriber("config/record/")
|
|
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
|
|
|
|
self.stop_event = stop_event
|
|
self.object_recordings_info: dict[str, list] = defaultdict(list)
|
|
self.audio_recordings_info: dict[str, list] = defaultdict(list)
|
|
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
|
|
|
|
async def move_files(self) -> None:
|
|
cache_files = [
|
|
d
|
|
for d in os.listdir(CACHE_DIR)
|
|
if os.path.isfile(os.path.join(CACHE_DIR, d))
|
|
and d.endswith(".mp4")
|
|
and not d.startswith("preview_")
|
|
]
|
|
|
|
files_in_use = []
|
|
for process in psutil.process_iter():
|
|
try:
|
|
if process.name() != "ffmpeg":
|
|
continue
|
|
file_list = process.open_files()
|
|
if file_list:
|
|
for nt in file_list:
|
|
if nt.path.startswith(CACHE_DIR):
|
|
files_in_use.append(nt.path.split("/")[-1])
|
|
except psutil.Error:
|
|
continue
|
|
|
|
# group recordings by camera
|
|
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
|
|
for cache in cache_files:
|
|
# Skip files currently in use
|
|
if cache in files_in_use:
|
|
continue
|
|
|
|
cache_path = os.path.join(CACHE_DIR, cache)
|
|
basename = os.path.splitext(cache)[0]
|
|
camera, date = basename.rsplit("@", maxsplit=1)
|
|
|
|
# important that start_time is utc because recordings are stored and compared in utc
|
|
start_time = datetime.datetime.strptime(
|
|
date, CACHE_SEGMENT_FORMAT
|
|
).astimezone(datetime.timezone.utc)
|
|
|
|
grouped_recordings[camera].append(
|
|
{
|
|
"cache_path": cache_path,
|
|
"start_time": start_time,
|
|
}
|
|
)
|
|
|
|
# delete all cached files past the most recent MAX_SEGMENTS_IN_CACHE
|
|
keep_count = MAX_SEGMENTS_IN_CACHE
|
|
for camera in grouped_recordings.keys():
|
|
# sort based on start time
|
|
grouped_recordings[camera] = sorted(
|
|
grouped_recordings[camera], key=lambda s: s["start_time"]
|
|
)
|
|
|
|
camera_info = self.object_recordings_info[camera]
|
|
most_recently_processed_frame_time = (
|
|
camera_info[-1][0] if len(camera_info) > 0 else 0
|
|
)
|
|
|
|
processed_segment_count = len(
|
|
list(
|
|
filter(
|
|
lambda r: r["start_time"].timestamp()
|
|
< most_recently_processed_frame_time,
|
|
grouped_recordings[camera],
|
|
)
|
|
)
|
|
)
|
|
|
|
# see if the recording mover is too slow and segments need to be deleted
|
|
if processed_segment_count > keep_count:
|
|
logger.warning(
|
|
f"Unable to keep up with recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {processed_segment_count} and discarding the rest..."
|
|
)
|
|
to_remove = grouped_recordings[camera][:-keep_count]
|
|
for rec in to_remove:
|
|
cache_path = rec["cache_path"]
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
|
|
|
|
# see if detection has failed and unprocessed segments need to be deleted
|
|
unprocessed_segment_count = (
|
|
len(grouped_recordings[camera]) - processed_segment_count
|
|
)
|
|
if unprocessed_segment_count > keep_count:
|
|
logger.warning(
|
|
f"Too many unprocessed recording segments in cache for {camera}. This likely indicates an issue with the detect stream, keeping the {keep_count} most recent segments out of {unprocessed_segment_count} and discarding the rest..."
|
|
)
|
|
to_remove = grouped_recordings[camera][:-keep_count]
|
|
for rec in to_remove:
|
|
cache_path = rec["cache_path"]
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
|
|
|
|
tasks = []
|
|
for camera, recordings in grouped_recordings.items():
|
|
# clear out all the object recording info for old frames
|
|
while (
|
|
len(self.object_recordings_info[camera]) > 0
|
|
and self.object_recordings_info[camera][0][0]
|
|
< recordings[0]["start_time"].timestamp()
|
|
):
|
|
self.object_recordings_info[camera].pop(0)
|
|
|
|
# clear out all the audio recording info for old frames
|
|
while (
|
|
len(self.audio_recordings_info[camera]) > 0
|
|
and self.audio_recordings_info[camera][0][0]
|
|
< recordings[0]["start_time"].timestamp()
|
|
):
|
|
self.audio_recordings_info[camera].pop(0)
|
|
|
|
# get all reviews with the end time after the start of the oldest cache file
|
|
# or with end_time None
|
|
reviews: ReviewSegment = (
|
|
ReviewSegment.select(
|
|
ReviewSegment.start_time,
|
|
ReviewSegment.end_time,
|
|
ReviewSegment.data,
|
|
)
|
|
.where(
|
|
ReviewSegment.camera == camera,
|
|
(ReviewSegment.end_time == None)
|
|
| (
|
|
ReviewSegment.end_time
|
|
>= recordings[0]["start_time"].timestamp()
|
|
),
|
|
)
|
|
.order_by(ReviewSegment.start_time)
|
|
)
|
|
|
|
tasks.extend(
|
|
[self.validate_and_move_segment(camera, reviews, r) for r in recordings]
|
|
)
|
|
|
|
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
|
|
|
|
# fire and forget recordings entries
|
|
self.requestor.send_data(
|
|
INSERT_MANY_RECORDINGS,
|
|
[r for r in recordings_to_insert if r is not None],
|
|
)
|
|
|
|
async def validate_and_move_segment(
|
|
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, any]
|
|
) -> None:
|
|
cache_path = recording["cache_path"]
|
|
start_time = recording["start_time"]
|
|
record_config = self.config.cameras[camera].record
|
|
|
|
# Just delete files if recordings are turned off
|
|
if (
|
|
camera not in self.config.cameras
|
|
or not self.config.cameras[camera].record.enabled
|
|
):
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
return
|
|
|
|
if cache_path in self.end_time_cache:
|
|
end_time, duration = self.end_time_cache[cache_path]
|
|
else:
|
|
segment_info = await get_video_properties(
|
|
self.config.ffmpeg, cache_path, get_duration=True
|
|
)
|
|
|
|
if segment_info["duration"]:
|
|
duration = float(segment_info["duration"])
|
|
else:
|
|
duration = -1
|
|
|
|
# ensure duration is within expected length
|
|
if 0 < duration < MAX_SEGMENT_DURATION:
|
|
end_time = start_time + datetime.timedelta(seconds=duration)
|
|
self.end_time_cache[cache_path] = (end_time, duration)
|
|
else:
|
|
if duration == -1:
|
|
logger.warning(f"Failed to probe corrupt segment {cache_path}")
|
|
|
|
logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
return
|
|
|
|
# if cached file's start_time is earlier than the retain days for the camera
|
|
if start_time <= (
|
|
datetime.datetime.now().astimezone(datetime.timezone.utc)
|
|
- datetime.timedelta(days=self.config.cameras[camera].record.retain.days)
|
|
):
|
|
# if the cached segment overlaps with the events:
|
|
overlaps = False
|
|
for review in reviews:
|
|
# if the event starts in the future, stop checking events
|
|
# and remove this segment
|
|
if review.start_time > end_time.timestamp():
|
|
overlaps = False
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
break
|
|
|
|
# if the event is in progress or ends after the recording starts, keep it
|
|
# and stop looking at events
|
|
if review.end_time is None or review.end_time >= start_time.timestamp():
|
|
overlaps = True
|
|
break
|
|
|
|
if overlaps:
|
|
record_mode = (
|
|
record_config.alerts.retain.mode
|
|
if review.severity == "alert"
|
|
else record_config.detections.retain.mode
|
|
)
|
|
# move from cache to recordings immediately
|
|
return await self.move_segment(
|
|
camera,
|
|
start_time,
|
|
end_time,
|
|
duration,
|
|
cache_path,
|
|
record_mode,
|
|
)
|
|
# if it doesn't overlap with an event, go ahead and drop the segment
|
|
# if it ends more than the configured pre_capture for the camera
|
|
else:
|
|
pre_capture = max(
|
|
record_config.alerts.pre_capture,
|
|
record_config.detections.pre_capture,
|
|
)
|
|
camera_info = self.object_recordings_info[camera]
|
|
most_recently_processed_frame_time = (
|
|
camera_info[-1][0] if len(camera_info) > 0 else 0
|
|
)
|
|
retain_cutoff = datetime.datetime.fromtimestamp(
|
|
most_recently_processed_frame_time - pre_capture
|
|
).astimezone(datetime.timezone.utc)
|
|
if end_time < retain_cutoff:
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
# else retain days includes this segment
|
|
else:
|
|
# assume that empty means the relevant recording info has not been received yet
|
|
camera_info = self.object_recordings_info[camera]
|
|
most_recently_processed_frame_time = (
|
|
camera_info[-1][0] if len(camera_info) > 0 else 0
|
|
)
|
|
|
|
# ensure delayed segment info does not lead to lost segments
|
|
if (
|
|
datetime.datetime.fromtimestamp(
|
|
most_recently_processed_frame_time
|
|
).astimezone(datetime.timezone.utc)
|
|
>= end_time
|
|
):
|
|
record_mode = self.config.cameras[camera].record.retain.mode
|
|
return await self.move_segment(
|
|
camera, start_time, end_time, duration, cache_path, record_mode
|
|
)
|
|
|
|
def segment_stats(
|
|
self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime
|
|
) -> SegmentInfo:
|
|
video_frame_count = 0
|
|
active_count = 0
|
|
region_count = 0
|
|
motion_count = 0
|
|
for frame in self.object_recordings_info[camera]:
|
|
# frame is after end time of segment
|
|
if frame[0] > end_time.timestamp():
|
|
break
|
|
# frame is before start time of segment
|
|
if frame[0] < start_time.timestamp():
|
|
continue
|
|
|
|
video_frame_count += 1
|
|
active_count += len(
|
|
[
|
|
o
|
|
for o in frame[1]
|
|
if not o["false_positive"] and o["motionless_count"] == 0
|
|
]
|
|
)
|
|
motion_count += len(frame[2])
|
|
region_count += len(frame[3])
|
|
|
|
audio_values = []
|
|
for frame in self.audio_recordings_info[camera]:
|
|
# frame is after end time of segment
|
|
if frame[0] > end_time.timestamp():
|
|
break
|
|
|
|
# frame is before start time of segment
|
|
if frame[0] < start_time.timestamp():
|
|
continue
|
|
|
|
# add active audio label count to count of active objects
|
|
active_count += len(frame[2])
|
|
|
|
# add sound level to audio values
|
|
audio_values.append(frame[1])
|
|
|
|
average_dBFS = 0 if not audio_values else np.average(audio_values)
|
|
|
|
return SegmentInfo(
|
|
motion_count, active_count, region_count, round(average_dBFS)
|
|
)
|
|
|
|
async def move_segment(
|
|
self,
|
|
camera: str,
|
|
start_time: datetime.datetime,
|
|
end_time: datetime.datetime,
|
|
duration: float,
|
|
cache_path: str,
|
|
store_mode: RetainModeEnum,
|
|
) -> Optional[Recordings]:
|
|
segment_info = self.segment_stats(camera, start_time, end_time)
|
|
|
|
# check if the segment shouldn't be stored
|
|
if segment_info.should_discard_segment(store_mode):
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
return
|
|
|
|
# directory will be in utc due to start_time being in utc
|
|
directory = os.path.join(
|
|
RECORD_DIR,
|
|
start_time.strftime("%Y-%m-%d/%H"),
|
|
camera,
|
|
)
|
|
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
|
|
# file will be in utc due to start_time being in utc
|
|
file_name = f"{start_time.strftime('%M.%S.mp4')}"
|
|
file_path = os.path.join(directory, file_name)
|
|
|
|
try:
|
|
if not os.path.exists(file_path):
|
|
start_frame = datetime.datetime.now().timestamp()
|
|
|
|
# add faststart to kept segments to improve metadata reading
|
|
p = await asyncio.create_subprocess_exec(
|
|
self.config.ffmpeg.ffmpeg_path,
|
|
"-hide_banner",
|
|
"-y",
|
|
"-i",
|
|
cache_path,
|
|
"-c",
|
|
"copy",
|
|
"-movflags",
|
|
"+faststart",
|
|
file_path,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
)
|
|
await p.wait()
|
|
|
|
if p.returncode != 0:
|
|
logger.error(f"Unable to convert {cache_path} to {file_path}")
|
|
logger.error((await p.stderr.read()).decode("ascii"))
|
|
return None
|
|
else:
|
|
logger.debug(
|
|
f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds."
|
|
)
|
|
|
|
try:
|
|
# get the segment size of the cache file
|
|
# file without faststart is same size
|
|
segment_size = round(
|
|
float(os.path.getsize(cache_path)) / pow(2, 20), 1
|
|
)
|
|
except OSError:
|
|
segment_size = 0
|
|
|
|
os.remove(cache_path)
|
|
|
|
rand_id = "".join(
|
|
random.choices(string.ascii_lowercase + string.digits, k=6)
|
|
)
|
|
|
|
return {
|
|
Recordings.id.name: f"{start_time.timestamp()}-{rand_id}",
|
|
Recordings.camera.name: camera,
|
|
Recordings.path.name: file_path,
|
|
Recordings.start_time.name: start_time.timestamp(),
|
|
Recordings.end_time.name: end_time.timestamp(),
|
|
Recordings.duration.name: duration,
|
|
Recordings.motion.name: segment_info.motion_count,
|
|
# TODO: update this to store list of active objects at some point
|
|
Recordings.objects.name: segment_info.active_object_count,
|
|
Recordings.regions.name: segment_info.region_count,
|
|
Recordings.dBFS.name: segment_info.average_dBFS,
|
|
Recordings.segment_size.name: segment_size,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Unable to store recording segment {cache_path}")
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
logger.error(e)
|
|
|
|
# clear end_time cache
|
|
self.end_time_cache.pop(cache_path, None)
|
|
return None
|
|
|
|
def run(self) -> None:
|
|
# Check for new files every 5 seconds
|
|
wait_time = 0.0
|
|
while not self.stop_event.is_set():
|
|
time.sleep(wait_time)
|
|
|
|
if self.stop_event.is_set():
|
|
break
|
|
|
|
run_start = datetime.datetime.now().timestamp()
|
|
|
|
# check if there is an updated config
|
|
while True:
|
|
(
|
|
updated_topic,
|
|
updated_record_config,
|
|
) = self.config_subscriber.check_for_update()
|
|
|
|
if not updated_topic:
|
|
break
|
|
|
|
camera_name = updated_topic.rpartition("/")[-1]
|
|
self.config.cameras[camera_name].record = updated_record_config
|
|
|
|
stale_frame_count = 0
|
|
stale_frame_count_threshold = 10
|
|
# empty the object recordings info queue
|
|
while True:
|
|
(topic, data) = self.detection_subscriber.check_for_update(
|
|
timeout=QUEUE_READ_TIMEOUT
|
|
)
|
|
|
|
if not topic:
|
|
break
|
|
|
|
if topic == DetectionTypeEnum.video:
|
|
(
|
|
camera,
|
|
frame_time,
|
|
current_tracked_objects,
|
|
motion_boxes,
|
|
regions,
|
|
) = data
|
|
|
|
if self.config.cameras[camera].record.enabled:
|
|
self.object_recordings_info[camera].append(
|
|
(
|
|
frame_time,
|
|
current_tracked_objects,
|
|
motion_boxes,
|
|
regions,
|
|
)
|
|
)
|
|
elif topic == DetectionTypeEnum.audio:
|
|
(
|
|
camera,
|
|
frame_time,
|
|
dBFS,
|
|
audio_detections,
|
|
) = data
|
|
|
|
if self.config.cameras[camera].record.enabled:
|
|
self.audio_recordings_info[camera].append(
|
|
(
|
|
frame_time,
|
|
dBFS,
|
|
audio_detections,
|
|
)
|
|
)
|
|
elif topic == DetectionTypeEnum.api:
|
|
continue
|
|
|
|
if frame_time < run_start - stale_frame_count_threshold:
|
|
stale_frame_count += 1
|
|
|
|
if stale_frame_count > 0:
|
|
logger.debug(f"Found {stale_frame_count} old frames.")
|
|
|
|
try:
|
|
asyncio.run(self.move_files())
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error occurred when attempting to maintain recording cache"
|
|
)
|
|
logger.error(e)
|
|
duration = datetime.datetime.now().timestamp() - run_start
|
|
wait_time = max(0, 5 - duration)
|
|
|
|
self.requestor.stop()
|
|
self.config_subscriber.stop()
|
|
self.detection_subscriber.stop()
|
|
logger.info("Exiting recording maintenance...")
|