Optimization of Sync Records: Implementing Pagination and Temporary Table (#6585)

* Update pull_request.yml

* Add temporary table for deletion and use pagination to process recordings in chunks for deletion of recordings with missing files

* move RecordingsToDelete class to models.py

* recording cleanup: bugfixes

* Update cleanup.py

* improve log message in cleanup.py

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>

---------

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
This commit is contained in:
Sergey Krashevich 2023-06-11 16:01:50 +03:00 committed by GitHub
parent 7459a1cdde
commit 5c27cb7e9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 22 deletions

View File

@ -65,3 +65,11 @@ class Recordings(Model): # type: ignore[misc]
motion = IntegerField(null=True)
objects = IntegerField(null=True)
segment_size = FloatField(default=0) # this should be stored as MB
# Used for temporary table in record/cleanup.py
class RecordingsToDelete(Model): # type: ignore[misc]
id = CharField(null=False, primary_key=False, max_length=30)
class Meta:
temporary = True

View File

@ -8,11 +8,11 @@ import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from peewee import DoesNotExist
from peewee import DatabaseError, DoesNotExist, chunked
from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import RECORD_DIR, SECONDS_IN_DAY
from frigate.models import Event, Recordings, Timeline
from frigate.models import Event, Recordings, RecordingsToDelete, Timeline
from frigate.record.util import remove_empty_directories
logger = logging.getLogger(__name__)
@ -217,34 +217,62 @@ class RecordingCleanup(threading.Thread):
logger.debug("Start sync recordings.")
# get all recordings in the db
recordings: Recordings = Recordings.select()
recordings = Recordings.select(Recordings.id, Recordings.path)
# get all recordings files on disk
files_on_disk = []
for root, _, files in os.walk(RECORD_DIR):
for file in files:
files_on_disk.append(os.path.join(root, file))
# get all recordings files on disk and put them in a set
files_on_disk = {
os.path.join(root, file)
for root, _, files in os.walk(RECORD_DIR)
for file in files
}
recordings_to_delete = []
for recording in recordings.objects().iterator():
# Use pagination to process records in chunks
page_size = 1000
num_pages = (recordings.count() + page_size - 1) // page_size
recordings_to_delete = set()
for page in range(num_pages):
for recording in recordings.paginate(page, page_size):
if recording.path not in files_on_disk:
recordings_to_delete.append(recording.id)
recordings_to_delete.add(recording.id)
# convert back to list of dictionaries for insertion
recordings_to_delete = [
{"id": recording_id} for recording_id in recordings_to_delete
]
if len(recordings_to_delete) / recordings.count() > 0.5:
logger.debug(
f"Deleting {(len(recordings_to_delete) / recordings.count()):2f}% of recordings could be due to configuration error. Aborting..."
)
return
logger.debug(
f"Deleting {len(recordings_to_delete)} recordings with missing files"
)
# delete up to 100,000 at a time
max_deletes = 100000
for i in range(0, len(recordings_to_delete), max_deletes):
Recordings.delete().where(
Recordings.id << recordings_to_delete[i : i + max_deletes]
).execute()
# create a temporary table for deletion
RecordingsToDelete.create_table(temporary=True)
# insert ids to the temporary table
max_inserts = 1000
for batch in chunked(recordings_to_delete, max_inserts):
RecordingsToDelete.insert_many(batch).execute()
try:
# delete records in the main table that exist in the temporary table
query = Recordings.delete().where(
Recordings.id.in_(RecordingsToDelete.select(RecordingsToDelete.id))
)
query.execute()
except DatabaseError as e:
logger.error(f"Database error during delete: {e}")
logger.debug("End sync recordings.")
def run(self) -> None:
# on startup sync recordings with disk (disabled due to too much CPU usage)
# self.sync_recordings()
# on startup sync recordings with disk
self.sync_recordings()
# Expire tmp clips every minute, recordings and clean directories every hour.
for counter in itertools.cycle(range(self.config.record.expire_interval)):

View File

@ -11,7 +11,7 @@ from playhouse.sqliteq import SqliteQueueDatabase
from setproctitle import setproctitle
from frigate.config import FrigateConfig
from frigate.models import Event, Recordings, Timeline
from frigate.models import Event, Recordings, RecordingsToDelete, Timeline
from frigate.record.cleanup import RecordingCleanup
from frigate.record.maintainer import RecordingMaintainer
from frigate.types import RecordMetricsTypes
@ -46,7 +46,7 @@ def manage_recordings(
},
timeout=60,
)
models = [Event, Recordings, Timeline]
models = [Event, Recordings, Timeline, RecordingsToDelete]
db.bind(models)
maintainer = RecordingMaintainer(