From 5c27cb7e9b922d011dcac0c62cf94b0dc9904143 Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Sun, 11 Jun 2023 16:01:50 +0300 Subject: [PATCH] Optimization of Sync Records: Implementing Pagination and Temporary Table (#6585) * Update pull_request.yml * Add temporary table for deletion and use pagination to process recordings in chunks for deletion of recordings with missing files * move RecordingsToDelete class to models.py * recording cleanup: bugfixes * Update cleanup.py * improve log message in cleanup.py Co-authored-by: Nicolas Mowen --------- Co-authored-by: Nicolas Mowen --- frigate/models.py | 8 +++++ frigate/record/cleanup.py | 68 +++++++++++++++++++++++++++------------ frigate/record/record.py | 4 +-- 3 files changed, 58 insertions(+), 22 deletions(-) diff --git a/frigate/models.py b/frigate/models.py index 5f6ac7339..8591b33b5 100644 --- a/frigate/models.py +++ b/frigate/models.py @@ -65,3 +65,11 @@ class Recordings(Model): # type: ignore[misc] motion = IntegerField(null=True) objects = IntegerField(null=True) segment_size = FloatField(default=0) # this should be stored as MB + + +# Used for temporary table in record/cleanup.py +class RecordingsToDelete(Model): # type: ignore[misc] + id = CharField(null=False, primary_key=False, max_length=30) + + class Meta: + temporary = True diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index f2e9a4fb5..75a1f9508 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -8,11 +8,11 @@ import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path -from peewee import DoesNotExist +from peewee import DatabaseError, DoesNotExist, chunked from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import RECORD_DIR, SECONDS_IN_DAY -from frigate.models import Event, Recordings, Timeline +from frigate.models import Event, Recordings, RecordingsToDelete, Timeline from frigate.record.util import remove_empty_directories logger = logging.getLogger(__name__) @@ -217,34 +217,62 @@ class RecordingCleanup(threading.Thread): logger.debug("Start sync recordings.") # get all recordings in the db - recordings: Recordings = Recordings.select() + recordings = Recordings.select(Recordings.id, Recordings.path) - # get all recordings files on disk - files_on_disk = [] - for root, _, files in os.walk(RECORD_DIR): - for file in files: - files_on_disk.append(os.path.join(root, file)) + # get all recordings files on disk and put them in a set + files_on_disk = { + os.path.join(root, file) + for root, _, files in os.walk(RECORD_DIR) + for file in files + } - recordings_to_delete = [] - for recording in recordings.objects().iterator(): - if recording.path not in files_on_disk: - recordings_to_delete.append(recording.id) + # Use pagination to process records in chunks + page_size = 1000 + num_pages = (recordings.count() + page_size - 1) // page_size + recordings_to_delete = set() + + for page in range(num_pages): + for recording in recordings.paginate(page, page_size): + if recording.path not in files_on_disk: + recordings_to_delete.add(recording.id) + + # convert back to list of dictionaries for insertion + recordings_to_delete = [ + {"id": recording_id} for recording_id in recordings_to_delete + ] + + if len(recordings_to_delete) / recordings.count() > 0.5: + logger.debug( + f"Deleting {(len(recordings_to_delete) / recordings.count()):2f}% of recordings could be due to configuration error. Aborting..." + ) + return logger.debug( f"Deleting {len(recordings_to_delete)} recordings with missing files" ) - # delete up to 100,000 at a time - max_deletes = 100000 - for i in range(0, len(recordings_to_delete), max_deletes): - Recordings.delete().where( - Recordings.id << recordings_to_delete[i : i + max_deletes] - ).execute() + + # create a temporary table for deletion + RecordingsToDelete.create_table(temporary=True) + + # insert ids to the temporary table + max_inserts = 1000 + for batch in chunked(recordings_to_delete, max_inserts): + RecordingsToDelete.insert_many(batch).execute() + + try: + # delete records in the main table that exist in the temporary table + query = Recordings.delete().where( + Recordings.id.in_(RecordingsToDelete.select(RecordingsToDelete.id)) + ) + query.execute() + except DatabaseError as e: + logger.error(f"Database error during delete: {e}") logger.debug("End sync recordings.") def run(self) -> None: - # on startup sync recordings with disk (disabled due to too much CPU usage) - # self.sync_recordings() + # on startup sync recordings with disk + self.sync_recordings() # Expire tmp clips every minute, recordings and clean directories every hour. for counter in itertools.cycle(range(self.config.record.expire_interval)): diff --git a/frigate/record/record.py b/frigate/record/record.py index 9d3106d0f..ab6cd3450 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -11,7 +11,7 @@ from playhouse.sqliteq import SqliteQueueDatabase from setproctitle import setproctitle from frigate.config import FrigateConfig -from frigate.models import Event, Recordings, Timeline +from frigate.models import Event, Recordings, RecordingsToDelete, Timeline from frigate.record.cleanup import RecordingCleanup from frigate.record.maintainer import RecordingMaintainer from frigate.types import RecordMetricsTypes @@ -46,7 +46,7 @@ def manage_recordings( }, timeout=60, ) - models = [Event, Recordings, Timeline] + models = [Event, Recordings, Timeline, RecordingsToDelete] db.bind(models) maintainer = RecordingMaintainer(