From 63233a583088d5f794f0d44002b5fd1e1412be26 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Fri, 3 Nov 2023 20:21:29 -0600 Subject: [PATCH] Periodically sync for stale recordings (#8433) * Periodically cleanup recordings files / DB * Make automatic sync limited ot last 36 hours --- frigate/record/cleanup.py | 75 ++++-------------------- frigate/record/util.py | 116 ++++++++++++++++++++++++++++++++++++++ frigate/util/builtin.py | 5 +- frigate/video.py | 6 +- 4 files changed, 133 insertions(+), 69 deletions(-) diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index 5a81c91ae..e6bb41da3 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -3,17 +3,15 @@ import datetime import itertools import logging -import os import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path -from peewee import DatabaseError, chunked - from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import CACHE_DIR, RECORD_DIR -from frigate.models import Event, Recordings, RecordingsToDelete -from frigate.record.util import remove_empty_directories +from frigate.models import Event, Recordings +from frigate.record.util import remove_empty_directories, sync_recordings +from frigate.util.builtin import get_tomorrow_at_time logger = logging.getLogger(__name__) @@ -180,76 +178,25 @@ class RecordingCleanup(threading.Thread): logger.debug("End all cameras.") logger.debug("End expire recordings.") - def sync_recordings(self) -> None: - """Check the db for stale recordings entries that don't exist in the filesystem.""" - logger.debug("Start sync recordings.") - - # get all recordings in the db - recordings = Recordings.select(Recordings.id, Recordings.path) - - # get all recordings files on disk and put them in a set - files_on_disk = { - os.path.join(root, file) - for root, _, files in os.walk(RECORD_DIR) - for file in files - } - - # Use pagination to process records in chunks - page_size = 1000 - num_pages = (recordings.count() + page_size - 1) // page_size - recordings_to_delete = set() - - for page in range(num_pages): - for recording in recordings.paginate(page, page_size): - if recording.path not in files_on_disk: - recordings_to_delete.add(recording.id) - - # convert back to list of dictionaries for insertion - recordings_to_delete = [ - {"id": recording_id} for recording_id in recordings_to_delete - ] - - if len(recordings_to_delete) / max(1, recordings.count()) > 0.5: - logger.debug( - f"Deleting {(len(recordings_to_delete) / recordings.count()):2f}% of recordings could be due to configuration error. Aborting..." - ) - return - - logger.debug( - f"Deleting {len(recordings_to_delete)} recordings with missing files" - ) - - # create a temporary table for deletion - RecordingsToDelete.create_table(temporary=True) - - # insert ids to the temporary table - max_inserts = 1000 - for batch in chunked(recordings_to_delete, max_inserts): - RecordingsToDelete.insert_many(batch).execute() - - try: - # delete records in the main table that exist in the temporary table - query = Recordings.delete().where( - Recordings.id.in_(RecordingsToDelete.select(RecordingsToDelete.id)) - ) - query.execute() - except DatabaseError as e: - logger.error(f"Database error during delete: {e}") - - logger.debug("End sync recordings.") - def run(self) -> None: # on startup sync recordings with disk if enabled if self.config.record.sync_on_startup: - self.sync_recordings() + sync_recordings(limited=False) + + next_sync = get_tomorrow_at_time(3) # Expire tmp clips every minute, recordings and clean directories every hour. for counter in itertools.cycle(range(self.config.record.expire_interval)): if self.stop_event.wait(60): logger.info("Exiting recording cleanup...") break + self.clean_tmp_clips() + if datetime.datetime.now().astimezone(datetime.timezone.utc) > next_sync: + sync_recordings(limited=True) + next_sync = get_tomorrow_at_time(3) + if counter == 0: self.expire_recordings() remove_empty_directories(RECORD_DIR) diff --git a/frigate/record/util.py b/frigate/record/util.py index d9692c25e..4710b063e 100644 --- a/frigate/record/util.py +++ b/frigate/record/util.py @@ -1,7 +1,16 @@ """Recordings Utilities.""" +import datetime +import logging import os +from peewee import DatabaseError, chunked + +from frigate.const import RECORD_DIR +from frigate.models import Recordings, RecordingsToDelete + +logger = logging.getLogger(__name__) + def remove_empty_directories(directory: str) -> None: # list all directories recursively and sort them by path, @@ -17,3 +26,110 @@ def remove_empty_directories(directory: str) -> None: continue if len(os.listdir(path)) == 0: os.rmdir(path) + + +def sync_recordings(limited: bool) -> None: + """Check the db for stale recordings entries that don't exist in the filesystem.""" + + def delete_db_entries_without_file(files_on_disk: list[str]) -> bool: + """Delete db entries where file was deleted outside of frigate.""" + + if limited: + recordings = Recordings.select(Recordings.id, Recordings.path).where( + Recordings.start_time + >= (datetime.datetime.now() - datetime.timedelta(hours=36)).timestamp() + ) + else: + # get all recordings in the db + recordings = Recordings.select(Recordings.id, Recordings.path) + + # Use pagination to process records in chunks + page_size = 1000 + num_pages = (recordings.count() + page_size - 1) // page_size + recordings_to_delete = set() + + for page in range(num_pages): + for recording in recordings.paginate(page, page_size): + if recording.path not in files_on_disk: + recordings_to_delete.add(recording.id) + + # convert back to list of dictionaries for insertion + recordings_to_delete = [ + {"id": recording_id} for recording_id in recordings_to_delete + ] + + if float(len(recordings_to_delete)) / max(1, recordings.count()) > 0.5: + logger.debug( + f"Deleting {(float(len(recordings_to_delete)) / recordings.count()):2f}% of recordings DB entries, could be due to configuration error. Aborting..." + ) + return False + + logger.debug( + f"Deleting {len(recordings_to_delete)} recording DB entries with missing files" + ) + + # create a temporary table for deletion + RecordingsToDelete.create_table(temporary=True) + + # insert ids to the temporary table + max_inserts = 1000 + for batch in chunked(recordings_to_delete, max_inserts): + RecordingsToDelete.insert_many(batch).execute() + + try: + # delete records in the main table that exist in the temporary table + query = Recordings.delete().where( + Recordings.id.in_(RecordingsToDelete.select(RecordingsToDelete.id)) + ) + query.execute() + except DatabaseError as e: + logger.error(f"Database error during recordings db cleanup: {e}") + + return True + + def delete_files_without_db_entry(files_on_disk: list[str]): + """Delete files where file is not inside frigate db.""" + files_to_delete = [] + + for file in files_on_disk: + if not Recordings.select().where(Recordings.path == file).exists(): + files_to_delete.append(file) + + if float(len(files_to_delete)) / max(1, len(files_on_disk)) > 0.5: + logger.debug( + f"Deleting {(float(len(files_to_delete)) / len(files_on_disk)):2f}% of recordings DB entries, could be due to configuration error. Aborting..." + ) + return + + for file in files_to_delete: + os.unlink(file) + + logger.debug("Start sync recordings.") + + if limited: + # get recording files from last 36 hours + hour_check = ( + datetime.datetime.now().astimezone(datetime.timezone.utc) + - datetime.timedelta(hours=36) + ).strftime("%Y-%m-%d/%H") + files_on_disk = { + os.path.join(root, file) + for root, _, files in os.walk(RECORD_DIR) + for file in files + if file > hour_check + } + else: + # get all recordings files on disk and put them in a set + files_on_disk = { + os.path.join(root, file) + for root, _, files in os.walk(RECORD_DIR) + for file in files + } + + db_success = delete_db_entries_without_file(files_on_disk) + + # only try to cleanup files if db cleanup was successful + if db_success: + delete_files_without_db_entry(files_on_disk) + + logger.debug("End sync recordings.") diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index 376ea51cf..a0f0d7725 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -263,8 +263,9 @@ def find_by_key(dictionary, target_key): return None -def get_tomorrow_at_2() -> datetime.datetime: +def get_tomorrow_at_time(hour: int) -> datetime.datetime: + """Returns the datetime of the following day at 2am.""" tomorrow = datetime.datetime.now(get_localzone()) + datetime.timedelta(days=1) - return tomorrow.replace(hour=2, minute=0, second=0).astimezone( + return tomorrow.replace(hour=hour, minute=0, second=0).astimezone( datetime.timezone.utc ) diff --git a/frigate/video.py b/frigate/video.py index 23b3481cf..7ab41a5a6 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -26,7 +26,7 @@ from frigate.ptz.autotrack import ptz_moving_at_frame_time from frigate.track import ObjectTracker from frigate.track.norfair_tracker import NorfairTracker from frigate.types import PTZMetricsTypes -from frigate.util.builtin import EventsPerSecond, get_tomorrow_at_2 +from frigate.util.builtin import EventsPerSecond, get_tomorrow_at_time from frigate.util.image import ( FrameManager, SharedMemoryFrameManager, @@ -528,7 +528,7 @@ def process_frames( fps = process_info["process_fps"] detection_fps = process_info["detection_fps"] current_frame_time = process_info["detection_frame"] - next_region_update = get_tomorrow_at_2() + next_region_update = get_tomorrow_at_time(2) fps_tracker = EventsPerSecond() fps_tracker.start() @@ -550,7 +550,7 @@ def process_frames( except queue.Empty: logger.error(f"Unable to get updated region grid for {camera_name}") - next_region_update = get_tomorrow_at_2() + next_region_update = get_tomorrow_at_time(2) try: if exit_on_empty: