From e451f44cedda3a4f16be662d77674868d87268bc Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 26 Apr 2023 07:25:26 -0600 Subject: [PATCH] Move recording management to separate process (#6248) * Move recordings management to own process and ensure db multiprocess access * remove reference to old threads * Cleanup directory remover * Mypy fixes * Fix mypy * Add support back for setting record via MQTT and WS * Formatting * Fix rebase issue --- frigate/app.py | 46 +-- frigate/comms/dispatcher.py | 10 +- frigate/const.py | 6 +- frigate/record/cleanup.py | 248 ++++++++++++++++ frigate/{record.py => record/maintainer.py} | 310 +++----------------- frigate/record/record.py | 53 ++++ frigate/record/util.py | 19 ++ frigate/types.py | 4 + 8 files changed, 402 insertions(+), 294 deletions(-) create mode 100644 frigate/record/cleanup.py rename frigate/{record.py => record/maintainer.py} (56%) create mode 100644 frigate/record/record.py create mode 100644 frigate/record/util.py diff --git a/frigate/app.py b/frigate/app.py index 54d2825c8..095a51e36 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -28,14 +28,14 @@ from frigate.object_processing import TrackedObjectProcessor from frigate.output import output_frames from frigate.plus import PlusApi from frigate.ptz import OnvifController -from frigate.record import RecordingCleanup, RecordingMaintainer +from frigate.record.record import manage_recordings from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor from frigate.version import VERSION from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog -from frigate.types import CameraMetricsTypes +from frigate.types import CameraMetricsTypes, RecordMetricsTypes logger = logging.getLogger(__name__) @@ -50,6 +50,7 @@ class FrigateApp: self.log_queue: Queue = mp.Queue() self.plus_api = PlusApi() self.camera_metrics: dict[str, CameraMetricsTypes] = {} + self.record_metrics: dict[str, RecordMetricsTypes] = {} def set_environment_vars(self) -> None: for key, value in self.config.environment_vars.items(): @@ -109,6 +110,11 @@ class FrigateApp: "capture_process": None, "process": None, } + self.record_metrics[camera_name] = { + "record_enabled": mp.Value( + "i", self.config.cameras[camera_name].record.enabled + ) + } def set_log_levels(self) -> None: logging.getLogger().setLevel(self.config.logger.default.value.upper()) @@ -158,6 +164,20 @@ class FrigateApp: migrate_db.close() + def init_recording_manager(self) -> None: + recording_process = mp.Process( + target=manage_recordings, + name="recording_manager", + args=(self.config, self.recordings_info_queue, self.record_metrics), + ) + recording_process.daemon = True + self.recording_process = recording_process + recording_process.start() + logger.info(f"Recording process started: {recording_process.pid}") + + def bind_database(self) -> None: + """Bind db to the main process.""" + # NOTE: all db accessing processes need to be created before the db can be bound to the main process self.db = SqliteQueueDatabase(self.config.database.path) models = [Event, Recordings, Timeline] self.db.bind(models) @@ -189,7 +209,11 @@ class FrigateApp: comms.append(WebSocketClient(self.config)) self.dispatcher = Dispatcher( - self.config, self.onvif_controller, self.camera_metrics, comms + self.config, + self.onvif_controller, + self.camera_metrics, + self.record_metrics, + comms, ) def start_detectors(self) -> None: @@ -318,16 +342,6 @@ class FrigateApp: self.event_cleanup = EventCleanup(self.config, self.stop_event) self.event_cleanup.start() - def start_recording_maintainer(self) -> None: - self.recording_maintainer = RecordingMaintainer( - self.config, self.recordings_info_queue, self.stop_event - ) - self.recording_maintainer.start() - - def start_recording_cleanup(self) -> None: - self.recording_cleanup = RecordingCleanup(self.config, self.stop_event) - self.recording_cleanup.start() - def start_storage_maintainer(self) -> None: self.storage_maintainer = StorageMaintainer(self.config, self.stop_event) self.storage_maintainer.start() @@ -390,6 +404,8 @@ class FrigateApp: self.init_queues() self.init_database() self.init_onvif() + self.init_recording_manager() + self.bind_database() self.init_dispatcher() except Exception as e: print(e) @@ -406,8 +422,6 @@ class FrigateApp: self.start_timeline_processor() self.start_event_processor() self.start_event_cleanup() - self.start_recording_maintainer() - self.start_recording_cleanup() self.start_stats_emitter() self.start_watchdog() self.check_shm() @@ -443,8 +457,6 @@ class FrigateApp: self.detected_frames_processor.join() self.event_processor.join() self.event_cleanup.join() - self.recording_maintainer.join() - self.recording_cleanup.join() self.stats_emitter.join() self.frigate_watchdog.join() self.db.stop() diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 7a2c98392..4f3bdd2fd 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -8,7 +8,7 @@ from abc import ABC, abstractmethod from frigate.config import FrigateConfig from frigate.ptz import OnvifController, OnvifCommandEnum -from frigate.types import CameraMetricsTypes +from frigate.types import CameraMetricsTypes, RecordMetricsTypes from frigate.util import restart_frigate @@ -42,11 +42,13 @@ class Dispatcher: config: FrigateConfig, onvif: OnvifController, camera_metrics: dict[str, CameraMetricsTypes], + record_metrics: dict[str, RecordMetricsTypes], communicators: list[Communicator], ) -> None: self.config = config self.onvif = onvif self.camera_metrics = camera_metrics + self.record_metrics = record_metrics self.comms = communicators for comm in self.comms: @@ -192,13 +194,15 @@ class Dispatcher: record_settings = self.config.cameras[camera_name].record if payload == "ON": - if not record_settings.enabled: + if not self.record_metrics[camera_name]["record_enabled"].value: logger.info(f"Turning on recordings for {camera_name}") record_settings.enabled = True + self.record_metrics[camera_name]["record_enabled"].value = True elif payload == "OFF": - if record_settings.enabled: + if self.record_metrics[camera_name]["record_enabled"].value: logger.info(f"Turning off recordings for {camera_name}") record_settings.enabled = False + self.record_metrics[camera_name]["record_enabled"].value = False self.publish(f"{camera_name}/recordings/state", payload, retain=True) diff --git a/frigate/const.py b/frigate/const.py index f0d76d940..8e1e42bb9 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -8,7 +8,6 @@ CACHE_DIR = "/tmp/cache" YAML_EXT = (".yaml", ".yml") PLUS_ENV_VAR = "PLUS_API_KEY" PLUS_API_HOST = "https://api.frigate.video" -MAX_SEGMENT_DURATION = 600 BTBN_PATH = "/usr/lib/btbn-ffmpeg" # Regex Consts @@ -23,3 +22,8 @@ DRIVER_ENV_VAR = "LIBVA_DRIVER_NAME" DRIVER_AMD = "radeonsi" DRIVER_INTEL_i965 = "i965" DRIVER_INTEL_iHD = "iHD" + +# Record Values + +MAX_SEGMENT_DURATION = 600 +SECONDS_IN_DAY = 60 * 60 * 24 diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py new file mode 100644 index 000000000..74c7eadf2 --- /dev/null +++ b/frigate/record/cleanup.py @@ -0,0 +1,248 @@ +"""Cleanup recordings that are expired based on retention config.""" + +import datetime +import itertools +import logging +import subprocess as sp +import threading +from pathlib import Path + +from peewee import DoesNotExist +from multiprocessing.synchronize import Event as MpEvent + +from frigate.config import RetainModeEnum, FrigateConfig +from frigate.const import RECORD_DIR, SECONDS_IN_DAY +from frigate.models import Event, Recordings +from frigate.record.util import remove_empty_directories + +logger = logging.getLogger(__name__) + + +class RecordingCleanup(threading.Thread): + """Cleanup existing recordings based on retention config.""" + + def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None: + threading.Thread.__init__(self) + self.name = "recording_cleanup" + self.config = config + self.stop_event = stop_event + + def clean_tmp_clips(self) -> None: + # delete any clips more than 5 minutes old + for p in Path("/tmp/cache").rglob("clip_*.mp4"): + logger.debug(f"Checking tmp clip {p}.") + if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1): + logger.debug("Deleting tmp clip.") + + # empty contents of file before unlinking https://github.com/blakeblackshear/frigate/issues/4769 + with open(p, "w"): + pass + p.unlink(missing_ok=True) + + def expire_recordings(self) -> None: + logger.debug("Start expire recordings (new).") + + logger.debug("Start deleted cameras.") + # Handle deleted cameras + expire_days = self.config.record.retain.days + expire_before = ( + datetime.datetime.now() - datetime.timedelta(days=expire_days) + ).timestamp() + no_camera_recordings: Recordings = Recordings.select().where( + Recordings.camera.not_in(list(self.config.cameras.keys())), + Recordings.end_time < expire_before, + ) + + deleted_recordings = set() + for recording in no_camera_recordings: + Path(recording.path).unlink(missing_ok=True) + deleted_recordings.add(recording.id) + + logger.debug(f"Expiring {len(deleted_recordings)} recordings") + Recordings.delete().where(Recordings.id << deleted_recordings).execute() + logger.debug("End deleted cameras.") + + logger.debug("Start all cameras.") + for camera, config in self.config.cameras.items(): + logger.debug(f"Start camera: {camera}.") + # Get the timestamp for cutoff of retained days + expire_days = config.record.retain.days + expire_date = ( + datetime.datetime.now() - datetime.timedelta(days=expire_days) + ).timestamp() + + # Get recordings to check for expiration + recordings: Recordings = ( + Recordings.select() + .where( + Recordings.camera == camera, + Recordings.end_time < expire_date, + ) + .order_by(Recordings.start_time) + ) + + # Get all the events to check against + events: Event = ( + Event.select() + .where( + Event.camera == camera, + # need to ensure segments for all events starting + # before the expire date are included + Event.start_time < expire_date, + Event.has_clip, + ) + .order_by(Event.start_time) + .objects() + ) + + # loop over recordings and see if they overlap with any non-expired events + # TODO: expire segments based on segment stats according to config + event_start = 0 + deleted_recordings = set() + for recording in recordings.objects().iterator(): + keep = False + # Now look for a reason to keep this recording segment + for idx in range(event_start, len(events)): + event = events[idx] + + # if the event starts in the future, stop checking events + # and let this recording segment expire + if event.start_time > recording.end_time: + keep = False + break + + # if the event is in progress or ends after the recording starts, keep it + # and stop looking at events + if event.end_time is None or event.end_time >= recording.start_time: + keep = True + break + + # if the event ends before this recording segment starts, skip + # this event and check the next event for an overlap. + # since the events and recordings are sorted, we can skip events + # that end before the previous recording segment started on future segments + if event.end_time < recording.start_time: + event_start = idx + + # Delete recordings outside of the retention window or based on the retention mode + if ( + not keep + or ( + config.record.events.retain.mode == RetainModeEnum.motion + and recording.motion == 0 + ) + or ( + config.record.events.retain.mode + == RetainModeEnum.active_objects + and recording.objects == 0 + ) + ): + Path(recording.path).unlink(missing_ok=True) + deleted_recordings.add(recording.id) + + logger.debug(f"Expiring {len(deleted_recordings)} recordings") + # delete up to 100,000 at a time + max_deletes = 100000 + deleted_recordings_list = list(deleted_recordings) + for i in range(0, len(deleted_recordings_list), max_deletes): + Recordings.delete().where( + Recordings.id << deleted_recordings_list[i : i + max_deletes] + ).execute() + + logger.debug(f"End camera: {camera}.") + + logger.debug("End all cameras.") + logger.debug("End expire recordings (new).") + + def expire_files(self) -> None: + logger.debug("Start expire files (legacy).") + + default_expire = ( + datetime.datetime.now().timestamp() + - SECONDS_IN_DAY * self.config.record.retain.days + ) + delete_before = {} + + for name, camera in self.config.cameras.items(): + delete_before[name] = ( + datetime.datetime.now().timestamp() + - SECONDS_IN_DAY * camera.record.retain.days + ) + + # find all the recordings older than the oldest recording in the db + try: + oldest_recording = Recordings.select().order_by(Recordings.start_time).get() + + p = Path(oldest_recording.path) + oldest_timestamp = p.stat().st_mtime - 1 + except DoesNotExist: + oldest_timestamp = datetime.datetime.now().timestamp() + except FileNotFoundError: + logger.warning(f"Unable to find file from recordings database: {p}") + Recordings.delete().where(Recordings.id == oldest_recording.id).execute() + return + + logger.debug(f"Oldest recording in the db: {oldest_timestamp}") + process = sp.run( + ["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"], + capture_output=True, + text=True, + ) + files_to_check = process.stdout.splitlines() + + for f in files_to_check: + p = Path(f) + try: + if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire): + p.unlink(missing_ok=True) + except FileNotFoundError: + logger.warning(f"Attempted to expire missing file: {f}") + + logger.debug("End expire files (legacy).") + + def sync_recordings(self) -> None: + logger.debug("Start sync recordings.") + + # get all recordings in the db + recordings: Recordings = Recordings.select() + + # get all recordings files on disk + process = sp.run( + ["find", RECORD_DIR, "-type", "f"], + capture_output=True, + text=True, + ) + files_on_disk = process.stdout.splitlines() + + recordings_to_delete = [] + for recording in recordings.objects().iterator(): + if not recording.path in files_on_disk: + recordings_to_delete.append(recording.id) + + logger.debug( + f"Deleting {len(recordings_to_delete)} recordings with missing files" + ) + # delete up to 100,000 at a time + max_deletes = 100000 + for i in range(0, len(recordings_to_delete), max_deletes): + Recordings.delete().where( + Recordings.id << recordings_to_delete[i : i + max_deletes] + ).execute() + + logger.debug("End sync recordings.") + + def run(self) -> None: + # on startup sync recordings with disk (disabled due to too much CPU usage) + # self.sync_recordings() + + # Expire tmp clips every minute, recordings and clean directories every hour. + for counter in itertools.cycle(range(self.config.record.expire_interval)): + if self.stop_event.wait(60): + logger.info(f"Exiting recording cleanup...") + break + self.clean_tmp_clips() + + if counter == 0: + self.expire_recordings() + self.expire_files() + remove_empty_directories(RECORD_DIR) diff --git a/frigate/record.py b/frigate/record/maintainer.py similarity index 56% rename from frigate/record.py rename to frigate/record/maintainer.py index 4ec3ff9a1..651bd1214 100644 --- a/frigate/record.py +++ b/frigate/record/maintainer.py @@ -1,5 +1,6 @@ +"""Maintain recording segments in cache.""" + import datetime -import itertools import logging import multiprocessing as mp import os @@ -8,51 +9,40 @@ import random import string import subprocess as sp import threading -from collections import defaultdict -from pathlib import Path - import psutil -from peewee import JOIN, DoesNotExist + +from collections import defaultdict +from multiprocessing.synchronize import Event as MpEvent +from pathlib import Path +from typing import Any, Tuple from frigate.config import RetainModeEnum, FrigateConfig from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR from frigate.models import Event, Recordings +from frigate.types import RecordMetricsTypes from frigate.util import area logger = logging.getLogger(__name__) -SECONDS_IN_DAY = 60 * 60 * 24 - - -def remove_empty_directories(directory): - # list all directories recursively and sort them by path, - # longest first - paths = sorted( - [x[0] for x in os.walk(RECORD_DIR)], - key=lambda p: len(str(p)), - reverse=True, - ) - for path in paths: - # don't delete the parent - if path == RECORD_DIR: - continue - if len(os.listdir(path)) == 0: - os.rmdir(path) - class RecordingMaintainer(threading.Thread): def __init__( - self, config: FrigateConfig, recordings_info_queue: mp.Queue, stop_event + self, + config: FrigateConfig, + recordings_info_queue: mp.Queue, + process_info: dict[str, RecordMetricsTypes], + stop_event: MpEvent, ): threading.Thread.__init__(self) - self.name = "recording_maint" + self.name = "recording_maintainer" self.config = config self.recordings_info_queue = recordings_info_queue + self.process_info = process_info self.stop_event = stop_event - self.recordings_info = defaultdict(list) - self.end_time_cache = {} + self.recordings_info: dict[str, Any] = defaultdict(list) + self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {} - def move_files(self): + def move_files(self) -> None: cache_files = sorted( [ d @@ -77,14 +67,14 @@ class RecordingMaintainer(threading.Thread): continue # group recordings by camera - grouped_recordings = defaultdict(list) - for f in cache_files: + grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) + for cache in cache_files: # Skip files currently in use - if f in files_in_use: + if cache in files_in_use: continue - cache_path = os.path.join(CACHE_DIR, f) - basename = os.path.splitext(f)[0] + cache_path = os.path.join(CACHE_DIR, cache) + basename = os.path.splitext(cache)[0] camera, date = basename.rsplit("-", maxsplit=1) start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S") @@ -104,8 +94,8 @@ class RecordingMaintainer(threading.Thread): f"Unable to keep up with recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {segment_count} and discarding the rest..." ) to_remove = grouped_recordings[camera][:-keep_count] - for f in to_remove: - cache_path = f["cache_path"] + for rec in to_remove: + cache_path = rec["cache_path"] Path(cache_path).unlink(missing_ok=True) self.end_time_cache.pop(cache_path, None) grouped_recordings[camera] = grouped_recordings[camera][-keep_count:] @@ -138,7 +128,7 @@ class RecordingMaintainer(threading.Thread): # Just delete files if recordings are turned off if ( not camera in self.config.cameras - or not self.config.cameras[camera].record.enabled + or not self.process_info[camera]["record_enabled"].value ): Path(cache_path).unlink(missing_ok=True) self.end_time_cache.pop(cache_path, None) @@ -170,7 +160,7 @@ class RecordingMaintainer(threading.Thread): else: if duration == -1: logger.warning( - f"Failed to probe corrupt segment {cache_path}: {p.returncode} - {p.stderr}" + f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}" ) logger.warning( @@ -241,7 +231,9 @@ class RecordingMaintainer(threading.Thread): camera, start_time, end_time, duration, cache_path, record_mode ) - def segment_stats(self, camera, start_time, end_time): + def segment_stats( + self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime + ) -> Tuple[int, int]: active_count = 0 motion_count = 0 for frame in self.recordings_info[camera]: @@ -266,13 +258,13 @@ class RecordingMaintainer(threading.Thread): def store_segment( self, - camera, + camera: str, start_time: datetime.datetime, end_time: datetime.datetime, - duration, - cache_path, + duration: float, + cache_path: str, store_mode: RetainModeEnum, - ): + ) -> None: motion_count, active_count = self.segment_stats(camera, start_time, end_time) # check if the segment shouldn't be stored @@ -363,9 +355,9 @@ class RecordingMaintainer(threading.Thread): # clear end_time cache self.end_time_cache.pop(cache_path, None) - def run(self): + def run(self) -> None: # Check for new files every 5 seconds - wait_time = 5 + wait_time = 5.0 while not self.stop_event.wait(wait_time): run_start = datetime.datetime.now().timestamp() @@ -380,7 +372,7 @@ class RecordingMaintainer(threading.Thread): regions, ) = self.recordings_info_queue.get(False) - if self.config.cameras[camera].record.enabled: + if self.process_info[camera]["record_enabled"].value: self.recordings_info[camera].append( ( frame_time, @@ -403,231 +395,3 @@ class RecordingMaintainer(threading.Thread): wait_time = max(0, 5 - duration) logger.info(f"Exiting recording maintenance...") - - -class RecordingCleanup(threading.Thread): - def __init__(self, config: FrigateConfig, stop_event): - threading.Thread.__init__(self) - self.name = "recording_cleanup" - self.config = config - self.stop_event = stop_event - - def clean_tmp_clips(self): - # delete any clips more than 5 minutes old - for p in Path("/tmp/cache").rglob("clip_*.mp4"): - logger.debug(f"Checking tmp clip {p}.") - if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1): - logger.debug("Deleting tmp clip.") - - # empty contents of file before unlinking https://github.com/blakeblackshear/frigate/issues/4769 - with open(p, "w"): - pass - p.unlink(missing_ok=True) - - def expire_recordings(self): - logger.debug("Start expire recordings (new).") - - logger.debug("Start deleted cameras.") - # Handle deleted cameras - expire_days = self.config.record.retain.days - expire_before = ( - datetime.datetime.now() - datetime.timedelta(days=expire_days) - ).timestamp() - no_camera_recordings: Recordings = Recordings.select().where( - Recordings.camera.not_in(list(self.config.cameras.keys())), - Recordings.end_time < expire_before, - ) - - deleted_recordings = set() - for recording in no_camera_recordings: - Path(recording.path).unlink(missing_ok=True) - deleted_recordings.add(recording.id) - - logger.debug(f"Expiring {len(deleted_recordings)} recordings") - Recordings.delete().where(Recordings.id << deleted_recordings).execute() - logger.debug("End deleted cameras.") - - logger.debug("Start all cameras.") - for camera, config in self.config.cameras.items(): - logger.debug(f"Start camera: {camera}.") - # Get the timestamp for cutoff of retained days - expire_days = config.record.retain.days - expire_date = ( - datetime.datetime.now() - datetime.timedelta(days=expire_days) - ).timestamp() - - # Get recordings to check for expiration - recordings: Recordings = ( - Recordings.select() - .where( - Recordings.camera == camera, - Recordings.end_time < expire_date, - ) - .order_by(Recordings.start_time) - ) - - # Get all the events to check against - events: Event = ( - Event.select() - .where( - Event.camera == camera, - # need to ensure segments for all events starting - # before the expire date are included - Event.start_time < expire_date, - Event.has_clip, - ) - .order_by(Event.start_time) - .objects() - ) - - # loop over recordings and see if they overlap with any non-expired events - # TODO: expire segments based on segment stats according to config - event_start = 0 - deleted_recordings = set() - for recording in recordings.objects().iterator(): - keep = False - # Now look for a reason to keep this recording segment - for idx in range(event_start, len(events)): - event = events[idx] - - # if the event starts in the future, stop checking events - # and let this recording segment expire - if event.start_time > recording.end_time: - keep = False - break - - # if the event is in progress or ends after the recording starts, keep it - # and stop looking at events - if event.end_time is None or event.end_time >= recording.start_time: - keep = True - break - - # if the event ends before this recording segment starts, skip - # this event and check the next event for an overlap. - # since the events and recordings are sorted, we can skip events - # that end before the previous recording segment started on future segments - if event.end_time < recording.start_time: - event_start = idx - - # Delete recordings outside of the retention window or based on the retention mode - if ( - not keep - or ( - config.record.events.retain.mode == RetainModeEnum.motion - and recording.motion == 0 - ) - or ( - config.record.events.retain.mode - == RetainModeEnum.active_objects - and recording.objects == 0 - ) - ): - Path(recording.path).unlink(missing_ok=True) - deleted_recordings.add(recording.id) - - logger.debug(f"Expiring {len(deleted_recordings)} recordings") - # delete up to 100,000 at a time - max_deletes = 100000 - deleted_recordings_list = list(deleted_recordings) - for i in range(0, len(deleted_recordings_list), max_deletes): - Recordings.delete().where( - Recordings.id << deleted_recordings_list[i : i + max_deletes] - ).execute() - - logger.debug(f"End camera: {camera}.") - - logger.debug("End all cameras.") - logger.debug("End expire recordings (new).") - - def expire_files(self): - logger.debug("Start expire files (legacy).") - - default_expire = ( - datetime.datetime.now().timestamp() - - SECONDS_IN_DAY * self.config.record.retain.days - ) - delete_before = {} - - for name, camera in self.config.cameras.items(): - delete_before[name] = ( - datetime.datetime.now().timestamp() - - SECONDS_IN_DAY * camera.record.retain.days - ) - - # find all the recordings older than the oldest recording in the db - try: - oldest_recording = Recordings.select().order_by(Recordings.start_time).get() - - p = Path(oldest_recording.path) - oldest_timestamp = p.stat().st_mtime - 1 - except DoesNotExist: - oldest_timestamp = datetime.datetime.now().timestamp() - except FileNotFoundError: - logger.warning(f"Unable to find file from recordings database: {p}") - Recordings.delete().where(Recordings.id == oldest_recording.id).execute() - return - - logger.debug(f"Oldest recording in the db: {oldest_timestamp}") - process = sp.run( - ["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"], - capture_output=True, - text=True, - ) - files_to_check = process.stdout.splitlines() - - for f in files_to_check: - p = Path(f) - try: - if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire): - p.unlink(missing_ok=True) - except FileNotFoundError: - logger.warning(f"Attempted to expire missing file: {f}") - - logger.debug("End expire files (legacy).") - - def sync_recordings(self): - logger.debug("Start sync recordings.") - - # get all recordings in the db - recordings: Recordings = Recordings.select() - - # get all recordings files on disk - process = sp.run( - ["find", RECORD_DIR, "-type", "f"], - capture_output=True, - text=True, - ) - files_on_disk = process.stdout.splitlines() - - recordings_to_delete = [] - for recording in recordings.objects().iterator(): - if not recording.path in files_on_disk: - recordings_to_delete.append(recording.id) - - logger.debug( - f"Deleting {len(recordings_to_delete)} recordings with missing files" - ) - # delete up to 100,000 at a time - max_deletes = 100000 - for i in range(0, len(recordings_to_delete), max_deletes): - Recordings.delete().where( - Recordings.id << recordings_to_delete[i : i + max_deletes] - ).execute() - - logger.debug("End sync recordings.") - - def run(self): - # on startup sync recordings with disk (disabled due to too much CPU usage) - # self.sync_recordings() - - # Expire tmp clips every minute, recordings and clean directories every hour. - for counter in itertools.cycle(range(self.config.record.expire_interval)): - if self.stop_event.wait(60): - logger.info(f"Exiting recording cleanup...") - break - self.clean_tmp_clips() - - if counter == 0: - self.expire_recordings() - self.expire_files() - remove_empty_directories(RECORD_DIR) diff --git a/frigate/record/record.py b/frigate/record/record.py new file mode 100644 index 000000000..59fda095b --- /dev/null +++ b/frigate/record/record.py @@ -0,0 +1,53 @@ +"""Run recording maintainer and cleanup.""" + +import logging +import multiprocessing as mp +import signal +import threading + +from setproctitle import setproctitle +from types import FrameType +from typing import Optional + +from playhouse.sqliteq import SqliteQueueDatabase + +from frigate.config import FrigateConfig +from frigate.models import Event, Recordings, Timeline +from frigate.record.cleanup import RecordingCleanup +from frigate.record.maintainer import RecordingMaintainer +from frigate.types import RecordMetricsTypes +from frigate.util import listen + +logger = logging.getLogger(__name__) + + +def manage_recordings( + config: FrigateConfig, + recordings_info_queue: mp.Queue, + process_info: dict[str, RecordMetricsTypes], +) -> None: + stop_event = mp.Event() + + def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + threading.current_thread().name = "process:recording_manager" + setproctitle("frigate.recording_manager") + listen() + + db = SqliteQueueDatabase(config.database.path) + models = [Event, Recordings, Timeline] + db.bind(models) + + maintainer = RecordingMaintainer( + config, recordings_info_queue, process_info, stop_event + ) + maintainer.start() + + cleanup = RecordingCleanup(config, stop_event) + cleanup.start() + + logger.info("recording_manager: exiting subprocess") diff --git a/frigate/record/util.py b/frigate/record/util.py new file mode 100644 index 000000000..d9692c25e --- /dev/null +++ b/frigate/record/util.py @@ -0,0 +1,19 @@ +"""Recordings Utilities.""" + +import os + + +def remove_empty_directories(directory: str) -> None: + # list all directories recursively and sort them by path, + # longest first + paths = sorted( + [x[0] for x in os.walk(directory)], + key=lambda p: len(str(p)), + reverse=True, + ) + for path in paths: + # don't delete the parent + if path == directory: + continue + if len(os.listdir(path)) == 0: + os.rmdir(path) diff --git a/frigate/types.py b/frigate/types.py index 04339e366..9da4027c9 100644 --- a/frigate/types.py +++ b/frigate/types.py @@ -24,6 +24,10 @@ class CameraMetricsTypes(TypedDict): skipped_fps: Synchronized +class RecordMetricsTypes(TypedDict): + record_enabled: Synchronized + + class StatsTrackingTypes(TypedDict): camera_metrics: dict[str, CameraMetricsTypes] detectors: dict[str, ObjectDetectProcess]