mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-10-04 11:15:55 +02:00
* Fix the `Any` typing hint treewide There has been confusion between the Any type[1] and the any function[2] in typing hints. [1] https://docs.python.org/3/library/typing.html#typing.Any [2] https://docs.python.org/3/library/functions.html#any * Fix typing for various frame_shape members Frame shapes are most likely defined by height and width, so a single int cannot express that. * Wrap gpu stats functions in Optional[] These can return `None`, so they need to be `Type | None`, which is what `Optional` expresses very nicely. * Fix return type in get_latest_segment_datetime Returns a datetime object, not an integer. * Make the return type of FrameManager.write optional This is necessary since the SharedMemoryFrameManager.write function can return None. * Fix total_seconds() return type in get_tz_modifiers The function returns a float, not an int. https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds * Account for floating point results in to_relative_box Because the function uses division the return types may either be int or float. * Resolve ruff deprecation warning The config has been split into formatter and linter, and the global options are deprecated.
603 lines
22 KiB
Python
603 lines
22 KiB
Python
"""Maintain recording segments in cache."""
|
|
|
|
import asyncio
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import random
|
|
import string
|
|
import threading
|
|
import time
|
|
from collections import defaultdict
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
from pathlib import Path
|
|
from typing import Any, Optional, Tuple
|
|
|
|
import numpy as np
|
|
import psutil
|
|
|
|
from frigate.comms.config_updater import ConfigSubscriber
|
|
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
|
|
from frigate.comms.inter_process import InterProcessRequestor
|
|
from frigate.comms.recordings_updater import (
|
|
RecordingsDataPublisher,
|
|
RecordingsDataTypeEnum,
|
|
)
|
|
from frigate.config import FrigateConfig, RetainModeEnum
|
|
from frigate.const import (
|
|
CACHE_DIR,
|
|
CACHE_SEGMENT_FORMAT,
|
|
FAST_QUEUE_TIMEOUT,
|
|
INSERT_MANY_RECORDINGS,
|
|
MAX_SEGMENT_DURATION,
|
|
MAX_SEGMENTS_IN_CACHE,
|
|
RECORD_DIR,
|
|
)
|
|
from frigate.models import Recordings, ReviewSegment
|
|
from frigate.review.types import SeverityEnum
|
|
from frigate.util.services import get_video_properties
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SegmentInfo:
|
|
def __init__(
|
|
self,
|
|
motion_count: int,
|
|
active_object_count: int,
|
|
region_count: int,
|
|
average_dBFS: int,
|
|
) -> None:
|
|
self.motion_count = motion_count
|
|
self.active_object_count = active_object_count
|
|
self.region_count = region_count
|
|
self.average_dBFS = average_dBFS
|
|
|
|
def should_discard_segment(self, retain_mode: RetainModeEnum) -> bool:
|
|
return (
|
|
retain_mode == RetainModeEnum.motion
|
|
and self.motion_count == 0
|
|
and self.average_dBFS == 0
|
|
) or (
|
|
retain_mode == RetainModeEnum.active_objects
|
|
and self.active_object_count == 0
|
|
)
|
|
|
|
|
|
class RecordingMaintainer(threading.Thread):
|
|
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
|
|
super().__init__(name="recording_maintainer")
|
|
self.config = config
|
|
|
|
# create communication for retained recordings
|
|
self.requestor = InterProcessRequestor()
|
|
self.config_subscriber = ConfigSubscriber("config/record/")
|
|
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
|
|
self.recordings_publisher = RecordingsDataPublisher(
|
|
RecordingsDataTypeEnum.recordings_available_through
|
|
)
|
|
|
|
self.stop_event = stop_event
|
|
self.object_recordings_info: dict[str, list] = defaultdict(list)
|
|
self.audio_recordings_info: dict[str, list] = defaultdict(list)
|
|
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
|
|
|
|
async def move_files(self) -> None:
|
|
cache_files = [
|
|
d
|
|
for d in os.listdir(CACHE_DIR)
|
|
if os.path.isfile(os.path.join(CACHE_DIR, d))
|
|
and d.endswith(".mp4")
|
|
and not d.startswith("preview_")
|
|
]
|
|
|
|
files_in_use = []
|
|
for process in psutil.process_iter():
|
|
try:
|
|
if process.name() != "ffmpeg":
|
|
continue
|
|
file_list = process.open_files()
|
|
if file_list:
|
|
for nt in file_list:
|
|
if nt.path.startswith(CACHE_DIR):
|
|
files_in_use.append(nt.path.split("/")[-1])
|
|
except psutil.Error:
|
|
continue
|
|
|
|
# group recordings by camera
|
|
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
|
|
for cache in cache_files:
|
|
# Skip files currently in use
|
|
if cache in files_in_use:
|
|
continue
|
|
|
|
cache_path = os.path.join(CACHE_DIR, cache)
|
|
basename = os.path.splitext(cache)[0]
|
|
camera, date = basename.rsplit("@", maxsplit=1)
|
|
|
|
# important that start_time is utc because recordings are stored and compared in utc
|
|
start_time = datetime.datetime.strptime(
|
|
date, CACHE_SEGMENT_FORMAT
|
|
).astimezone(datetime.timezone.utc)
|
|
|
|
grouped_recordings[camera].append(
|
|
{
|
|
"cache_path": cache_path,
|
|
"start_time": start_time,
|
|
}
|
|
)
|
|
|
|
# delete all cached files past the most recent MAX_SEGMENTS_IN_CACHE
|
|
keep_count = MAX_SEGMENTS_IN_CACHE
|
|
for camera in grouped_recordings.keys():
|
|
# sort based on start time
|
|
grouped_recordings[camera] = sorted(
|
|
grouped_recordings[camera], key=lambda s: s["start_time"]
|
|
)
|
|
|
|
camera_info = self.object_recordings_info[camera]
|
|
most_recently_processed_frame_time = (
|
|
camera_info[-1][0] if len(camera_info) > 0 else 0
|
|
)
|
|
|
|
processed_segment_count = len(
|
|
list(
|
|
filter(
|
|
lambda r: r["start_time"].timestamp()
|
|
< most_recently_processed_frame_time,
|
|
grouped_recordings[camera],
|
|
)
|
|
)
|
|
)
|
|
|
|
# see if the recording mover is too slow and segments need to be deleted
|
|
if processed_segment_count > keep_count:
|
|
logger.warning(
|
|
f"Unable to keep up with recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {processed_segment_count} and discarding the rest..."
|
|
)
|
|
to_remove = grouped_recordings[camera][:-keep_count]
|
|
for rec in to_remove:
|
|
cache_path = rec["cache_path"]
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
|
|
|
|
# see if detection has failed and unprocessed segments need to be deleted
|
|
unprocessed_segment_count = (
|
|
len(grouped_recordings[camera]) - processed_segment_count
|
|
)
|
|
if unprocessed_segment_count > keep_count:
|
|
logger.warning(
|
|
f"Too many unprocessed recording segments in cache for {camera}. This likely indicates an issue with the detect stream, keeping the {keep_count} most recent segments out of {unprocessed_segment_count} and discarding the rest..."
|
|
)
|
|
to_remove = grouped_recordings[camera][:-keep_count]
|
|
for rec in to_remove:
|
|
cache_path = rec["cache_path"]
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
|
|
|
|
tasks = []
|
|
for camera, recordings in grouped_recordings.items():
|
|
# clear out all the object recording info for old frames
|
|
while (
|
|
len(self.object_recordings_info[camera]) > 0
|
|
and self.object_recordings_info[camera][0][0]
|
|
< recordings[0]["start_time"].timestamp()
|
|
):
|
|
self.object_recordings_info[camera].pop(0)
|
|
|
|
# clear out all the audio recording info for old frames
|
|
while (
|
|
len(self.audio_recordings_info[camera]) > 0
|
|
and self.audio_recordings_info[camera][0][0]
|
|
< recordings[0]["start_time"].timestamp()
|
|
):
|
|
self.audio_recordings_info[camera].pop(0)
|
|
|
|
# get all reviews with the end time after the start of the oldest cache file
|
|
# or with end_time None
|
|
reviews: ReviewSegment = (
|
|
ReviewSegment.select(
|
|
ReviewSegment.start_time,
|
|
ReviewSegment.end_time,
|
|
ReviewSegment.severity,
|
|
ReviewSegment.data,
|
|
)
|
|
.where(
|
|
ReviewSegment.camera == camera,
|
|
(ReviewSegment.end_time == None)
|
|
| (
|
|
ReviewSegment.end_time
|
|
>= recordings[0]["start_time"].timestamp()
|
|
),
|
|
)
|
|
.order_by(ReviewSegment.start_time)
|
|
)
|
|
|
|
tasks.extend(
|
|
[self.validate_and_move_segment(camera, reviews, r) for r in recordings]
|
|
)
|
|
|
|
# publish most recently available recording time and None if disabled
|
|
self.recordings_publisher.publish(
|
|
(
|
|
camera,
|
|
recordings[0]["start_time"].timestamp()
|
|
if self.config.cameras[camera].record.enabled
|
|
else None,
|
|
)
|
|
)
|
|
|
|
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
|
|
|
|
# fire and forget recordings entries
|
|
self.requestor.send_data(
|
|
INSERT_MANY_RECORDINGS,
|
|
[r for r in recordings_to_insert if r is not None],
|
|
)
|
|
|
|
def drop_segment(self, cache_path: str) -> None:
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
self.end_time_cache.pop(cache_path, None)
|
|
|
|
async def validate_and_move_segment(
|
|
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any]
|
|
) -> None:
|
|
cache_path: str = recording["cache_path"]
|
|
start_time: datetime.datetime = recording["start_time"]
|
|
record_config = self.config.cameras[camera].record
|
|
|
|
# Just delete files if recordings are turned off
|
|
if (
|
|
camera not in self.config.cameras
|
|
or not self.config.cameras[camera].record.enabled
|
|
):
|
|
self.drop_segment(cache_path)
|
|
return
|
|
|
|
if cache_path in self.end_time_cache:
|
|
end_time, duration = self.end_time_cache[cache_path]
|
|
else:
|
|
segment_info = await get_video_properties(
|
|
self.config.ffmpeg, cache_path, get_duration=True
|
|
)
|
|
|
|
if segment_info["duration"]:
|
|
duration = float(segment_info["duration"])
|
|
else:
|
|
duration = -1
|
|
|
|
# ensure duration is within expected length
|
|
if 0 < duration < MAX_SEGMENT_DURATION:
|
|
end_time = start_time + datetime.timedelta(seconds=duration)
|
|
self.end_time_cache[cache_path] = (end_time, duration)
|
|
else:
|
|
if duration == -1:
|
|
logger.warning(f"Failed to probe corrupt segment {cache_path}")
|
|
|
|
logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
return
|
|
|
|
# if cached file's start_time is earlier than the retain days for the camera
|
|
# meaning continuous recording is not enabled
|
|
if start_time <= (
|
|
datetime.datetime.now().astimezone(datetime.timezone.utc)
|
|
- datetime.timedelta(days=self.config.cameras[camera].record.retain.days)
|
|
):
|
|
# if the cached segment overlaps with the review items:
|
|
overlaps = False
|
|
for review in reviews:
|
|
severity = SeverityEnum[review.severity]
|
|
|
|
# if the review item starts in the future, stop checking review items
|
|
# and remove this segment
|
|
if (
|
|
review.start_time - record_config.get_review_pre_capture(severity)
|
|
) > end_time.timestamp():
|
|
overlaps = False
|
|
break
|
|
|
|
# if the review item is in progress or ends after the recording starts, keep it
|
|
# and stop looking at review items
|
|
if (
|
|
review.end_time is None
|
|
or (
|
|
review.end_time
|
|
+ record_config.get_review_post_capture(severity)
|
|
)
|
|
>= start_time.timestamp()
|
|
):
|
|
overlaps = True
|
|
break
|
|
|
|
if overlaps:
|
|
record_mode = (
|
|
record_config.alerts.retain.mode
|
|
if review.severity == "alert"
|
|
else record_config.detections.retain.mode
|
|
)
|
|
# move from cache to recordings immediately
|
|
return await self.move_segment(
|
|
camera,
|
|
start_time,
|
|
end_time,
|
|
duration,
|
|
cache_path,
|
|
record_mode,
|
|
)
|
|
# if it doesn't overlap with an review item, go ahead and drop the segment
|
|
# if it ends more than the configured pre_capture for the camera
|
|
else:
|
|
camera_info = self.object_recordings_info[camera]
|
|
most_recently_processed_frame_time = (
|
|
camera_info[-1][0] if len(camera_info) > 0 else 0
|
|
)
|
|
retain_cutoff = datetime.datetime.fromtimestamp(
|
|
most_recently_processed_frame_time - record_config.event_pre_capture
|
|
).astimezone(datetime.timezone.utc)
|
|
if end_time < retain_cutoff:
|
|
self.drop_segment(cache_path)
|
|
# else retain days includes this segment
|
|
# meaning continuous recording is enabled
|
|
else:
|
|
# assume that empty means the relevant recording info has not been received yet
|
|
camera_info = self.object_recordings_info[camera]
|
|
most_recently_processed_frame_time = (
|
|
camera_info[-1][0] if len(camera_info) > 0 else 0
|
|
)
|
|
|
|
# ensure delayed segment info does not lead to lost segments
|
|
if (
|
|
datetime.datetime.fromtimestamp(
|
|
most_recently_processed_frame_time
|
|
).astimezone(datetime.timezone.utc)
|
|
>= end_time
|
|
):
|
|
record_mode = self.config.cameras[camera].record.retain.mode
|
|
return await self.move_segment(
|
|
camera, start_time, end_time, duration, cache_path, record_mode
|
|
)
|
|
|
|
def segment_stats(
|
|
self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime
|
|
) -> SegmentInfo:
|
|
video_frame_count = 0
|
|
active_count = 0
|
|
region_count = 0
|
|
motion_count = 0
|
|
for frame in self.object_recordings_info[camera]:
|
|
# frame is after end time of segment
|
|
if frame[0] > end_time.timestamp():
|
|
break
|
|
# frame is before start time of segment
|
|
if frame[0] < start_time.timestamp():
|
|
continue
|
|
|
|
video_frame_count += 1
|
|
active_count += len(
|
|
[
|
|
o
|
|
for o in frame[1]
|
|
if not o["false_positive"] and o["motionless_count"] == 0
|
|
]
|
|
)
|
|
motion_count += len(frame[2])
|
|
region_count += len(frame[3])
|
|
|
|
audio_values = []
|
|
for frame in self.audio_recordings_info[camera]:
|
|
# frame is after end time of segment
|
|
if frame[0] > end_time.timestamp():
|
|
break
|
|
|
|
# frame is before start time of segment
|
|
if frame[0] < start_time.timestamp():
|
|
continue
|
|
|
|
# add active audio label count to count of active objects
|
|
active_count += len(frame[2])
|
|
|
|
# add sound level to audio values
|
|
audio_values.append(frame[1])
|
|
|
|
average_dBFS = 0 if not audio_values else np.average(audio_values)
|
|
|
|
return SegmentInfo(
|
|
motion_count, active_count, region_count, round(average_dBFS)
|
|
)
|
|
|
|
async def move_segment(
|
|
self,
|
|
camera: str,
|
|
start_time: datetime.datetime,
|
|
end_time: datetime.datetime,
|
|
duration: float,
|
|
cache_path: str,
|
|
store_mode: RetainModeEnum,
|
|
) -> Optional[Recordings]:
|
|
segment_info = self.segment_stats(camera, start_time, end_time)
|
|
|
|
# check if the segment shouldn't be stored
|
|
if segment_info.should_discard_segment(store_mode):
|
|
self.drop_segment(cache_path)
|
|
return
|
|
|
|
# directory will be in utc due to start_time being in utc
|
|
directory = os.path.join(
|
|
RECORD_DIR,
|
|
start_time.strftime("%Y-%m-%d/%H"),
|
|
camera,
|
|
)
|
|
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
|
|
# file will be in utc due to start_time being in utc
|
|
file_name = f"{start_time.strftime('%M.%S.mp4')}"
|
|
file_path = os.path.join(directory, file_name)
|
|
|
|
try:
|
|
if not os.path.exists(file_path):
|
|
start_frame = datetime.datetime.now().timestamp()
|
|
|
|
# add faststart to kept segments to improve metadata reading
|
|
p = await asyncio.create_subprocess_exec(
|
|
self.config.ffmpeg.ffmpeg_path,
|
|
"-hide_banner",
|
|
"-y",
|
|
"-i",
|
|
cache_path,
|
|
"-c",
|
|
"copy",
|
|
"-movflags",
|
|
"+faststart",
|
|
file_path,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
)
|
|
await p.wait()
|
|
|
|
if p.returncode != 0:
|
|
logger.error(f"Unable to convert {cache_path} to {file_path}")
|
|
logger.error((await p.stderr.read()).decode("ascii"))
|
|
return None
|
|
else:
|
|
logger.debug(
|
|
f"Copied {file_path} in {datetime.datetime.now().timestamp() - start_frame} seconds."
|
|
)
|
|
|
|
try:
|
|
# get the segment size of the cache file
|
|
# file without faststart is same size
|
|
segment_size = round(
|
|
float(os.path.getsize(cache_path)) / pow(2, 20), 2
|
|
)
|
|
except OSError:
|
|
segment_size = 0
|
|
|
|
os.remove(cache_path)
|
|
|
|
rand_id = "".join(
|
|
random.choices(string.ascii_lowercase + string.digits, k=6)
|
|
)
|
|
|
|
return {
|
|
Recordings.id.name: f"{start_time.timestamp()}-{rand_id}",
|
|
Recordings.camera.name: camera,
|
|
Recordings.path.name: file_path,
|
|
Recordings.start_time.name: start_time.timestamp(),
|
|
Recordings.end_time.name: end_time.timestamp(),
|
|
Recordings.duration.name: duration,
|
|
Recordings.motion.name: segment_info.motion_count,
|
|
# TODO: update this to store list of active objects at some point
|
|
Recordings.objects.name: segment_info.active_object_count,
|
|
Recordings.regions.name: segment_info.region_count,
|
|
Recordings.dBFS.name: segment_info.average_dBFS,
|
|
Recordings.segment_size.name: segment_size,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Unable to store recording segment {cache_path}")
|
|
Path(cache_path).unlink(missing_ok=True)
|
|
logger.error(e)
|
|
|
|
# clear end_time cache
|
|
self.end_time_cache.pop(cache_path, None)
|
|
return None
|
|
|
|
def run(self) -> None:
|
|
# Check for new files every 5 seconds
|
|
wait_time = 0.0
|
|
while not self.stop_event.is_set():
|
|
time.sleep(wait_time)
|
|
|
|
if self.stop_event.is_set():
|
|
break
|
|
|
|
run_start = datetime.datetime.now().timestamp()
|
|
|
|
# check if there is an updated config
|
|
while True:
|
|
(
|
|
updated_topic,
|
|
updated_record_config,
|
|
) = self.config_subscriber.check_for_update()
|
|
|
|
if not updated_topic:
|
|
break
|
|
|
|
camera_name = updated_topic.rpartition("/")[-1]
|
|
self.config.cameras[camera_name].record = updated_record_config
|
|
|
|
stale_frame_count = 0
|
|
stale_frame_count_threshold = 10
|
|
# empty the object recordings info queue
|
|
while True:
|
|
(topic, data) = self.detection_subscriber.check_for_update(
|
|
timeout=FAST_QUEUE_TIMEOUT
|
|
)
|
|
|
|
if not topic:
|
|
break
|
|
|
|
if topic == DetectionTypeEnum.video:
|
|
(
|
|
camera,
|
|
_,
|
|
frame_time,
|
|
current_tracked_objects,
|
|
motion_boxes,
|
|
regions,
|
|
) = data
|
|
|
|
if self.config.cameras[camera].record.enabled:
|
|
self.object_recordings_info[camera].append(
|
|
(
|
|
frame_time,
|
|
current_tracked_objects,
|
|
motion_boxes,
|
|
regions,
|
|
)
|
|
)
|
|
elif topic == DetectionTypeEnum.audio:
|
|
(
|
|
camera,
|
|
frame_time,
|
|
dBFS,
|
|
audio_detections,
|
|
) = data
|
|
|
|
if self.config.cameras[camera].record.enabled:
|
|
self.audio_recordings_info[camera].append(
|
|
(
|
|
frame_time,
|
|
dBFS,
|
|
audio_detections,
|
|
)
|
|
)
|
|
elif topic == DetectionTypeEnum.api or DetectionTypeEnum.lpr:
|
|
continue
|
|
|
|
if frame_time < run_start - stale_frame_count_threshold:
|
|
stale_frame_count += 1
|
|
|
|
if stale_frame_count > 0:
|
|
logger.debug(f"Found {stale_frame_count} old frames.")
|
|
|
|
try:
|
|
asyncio.run(self.move_files())
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error occurred when attempting to maintain recording cache"
|
|
)
|
|
logger.error(e)
|
|
duration = datetime.datetime.now().timestamp() - run_start
|
|
wait_time = max(0, 5 - duration)
|
|
|
|
self.requestor.stop()
|
|
self.config_subscriber.stop()
|
|
self.detection_subscriber.stop()
|
|
self.recordings_publisher.stop()
|
|
logger.info("Exiting recording maintenance...")
|