mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-07-26 13:47:03 +02:00
Periodically sync for stale recordings (#8433)
* Periodically cleanup recordings files / DB * Make automatic sync limited ot last 36 hours
This commit is contained in:
parent
4f7b710112
commit
63233a5830
@ -3,17 +3,15 @@
|
||||
import datetime
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from multiprocessing.synchronize import Event as MpEvent
|
||||
from pathlib import Path
|
||||
|
||||
from peewee import DatabaseError, chunked
|
||||
|
||||
from frigate.config import FrigateConfig, RetainModeEnum
|
||||
from frigate.const import CACHE_DIR, RECORD_DIR
|
||||
from frigate.models import Event, Recordings, RecordingsToDelete
|
||||
from frigate.record.util import remove_empty_directories
|
||||
from frigate.models import Event, Recordings
|
||||
from frigate.record.util import remove_empty_directories, sync_recordings
|
||||
from frigate.util.builtin import get_tomorrow_at_time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -180,76 +178,25 @@ class RecordingCleanup(threading.Thread):
|
||||
logger.debug("End all cameras.")
|
||||
logger.debug("End expire recordings.")
|
||||
|
||||
def sync_recordings(self) -> None:
|
||||
"""Check the db for stale recordings entries that don't exist in the filesystem."""
|
||||
logger.debug("Start sync recordings.")
|
||||
|
||||
# get all recordings in the db
|
||||
recordings = Recordings.select(Recordings.id, Recordings.path)
|
||||
|
||||
# get all recordings files on disk and put them in a set
|
||||
files_on_disk = {
|
||||
os.path.join(root, file)
|
||||
for root, _, files in os.walk(RECORD_DIR)
|
||||
for file in files
|
||||
}
|
||||
|
||||
# Use pagination to process records in chunks
|
||||
page_size = 1000
|
||||
num_pages = (recordings.count() + page_size - 1) // page_size
|
||||
recordings_to_delete = set()
|
||||
|
||||
for page in range(num_pages):
|
||||
for recording in recordings.paginate(page, page_size):
|
||||
if recording.path not in files_on_disk:
|
||||
recordings_to_delete.add(recording.id)
|
||||
|
||||
# convert back to list of dictionaries for insertion
|
||||
recordings_to_delete = [
|
||||
{"id": recording_id} for recording_id in recordings_to_delete
|
||||
]
|
||||
|
||||
if len(recordings_to_delete) / max(1, recordings.count()) > 0.5:
|
||||
logger.debug(
|
||||
f"Deleting {(len(recordings_to_delete) / recordings.count()):2f}% of recordings could be due to configuration error. Aborting..."
|
||||
)
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
f"Deleting {len(recordings_to_delete)} recordings with missing files"
|
||||
)
|
||||
|
||||
# create a temporary table for deletion
|
||||
RecordingsToDelete.create_table(temporary=True)
|
||||
|
||||
# insert ids to the temporary table
|
||||
max_inserts = 1000
|
||||
for batch in chunked(recordings_to_delete, max_inserts):
|
||||
RecordingsToDelete.insert_many(batch).execute()
|
||||
|
||||
try:
|
||||
# delete records in the main table that exist in the temporary table
|
||||
query = Recordings.delete().where(
|
||||
Recordings.id.in_(RecordingsToDelete.select(RecordingsToDelete.id))
|
||||
)
|
||||
query.execute()
|
||||
except DatabaseError as e:
|
||||
logger.error(f"Database error during delete: {e}")
|
||||
|
||||
logger.debug("End sync recordings.")
|
||||
|
||||
def run(self) -> None:
|
||||
# on startup sync recordings with disk if enabled
|
||||
if self.config.record.sync_on_startup:
|
||||
self.sync_recordings()
|
||||
sync_recordings(limited=False)
|
||||
|
||||
next_sync = get_tomorrow_at_time(3)
|
||||
|
||||
# Expire tmp clips every minute, recordings and clean directories every hour.
|
||||
for counter in itertools.cycle(range(self.config.record.expire_interval)):
|
||||
if self.stop_event.wait(60):
|
||||
logger.info("Exiting recording cleanup...")
|
||||
break
|
||||
|
||||
self.clean_tmp_clips()
|
||||
|
||||
if datetime.datetime.now().astimezone(datetime.timezone.utc) > next_sync:
|
||||
sync_recordings(limited=True)
|
||||
next_sync = get_tomorrow_at_time(3)
|
||||
|
||||
if counter == 0:
|
||||
self.expire_recordings()
|
||||
remove_empty_directories(RECORD_DIR)
|
||||
|
@ -1,7 +1,16 @@
|
||||
"""Recordings Utilities."""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
|
||||
from peewee import DatabaseError, chunked
|
||||
|
||||
from frigate.const import RECORD_DIR
|
||||
from frigate.models import Recordings, RecordingsToDelete
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def remove_empty_directories(directory: str) -> None:
|
||||
# list all directories recursively and sort them by path,
|
||||
@ -17,3 +26,110 @@ def remove_empty_directories(directory: str) -> None:
|
||||
continue
|
||||
if len(os.listdir(path)) == 0:
|
||||
os.rmdir(path)
|
||||
|
||||
|
||||
def sync_recordings(limited: bool) -> None:
|
||||
"""Check the db for stale recordings entries that don't exist in the filesystem."""
|
||||
|
||||
def delete_db_entries_without_file(files_on_disk: list[str]) -> bool:
|
||||
"""Delete db entries where file was deleted outside of frigate."""
|
||||
|
||||
if limited:
|
||||
recordings = Recordings.select(Recordings.id, Recordings.path).where(
|
||||
Recordings.start_time
|
||||
>= (datetime.datetime.now() - datetime.timedelta(hours=36)).timestamp()
|
||||
)
|
||||
else:
|
||||
# get all recordings in the db
|
||||
recordings = Recordings.select(Recordings.id, Recordings.path)
|
||||
|
||||
# Use pagination to process records in chunks
|
||||
page_size = 1000
|
||||
num_pages = (recordings.count() + page_size - 1) // page_size
|
||||
recordings_to_delete = set()
|
||||
|
||||
for page in range(num_pages):
|
||||
for recording in recordings.paginate(page, page_size):
|
||||
if recording.path not in files_on_disk:
|
||||
recordings_to_delete.add(recording.id)
|
||||
|
||||
# convert back to list of dictionaries for insertion
|
||||
recordings_to_delete = [
|
||||
{"id": recording_id} for recording_id in recordings_to_delete
|
||||
]
|
||||
|
||||
if float(len(recordings_to_delete)) / max(1, recordings.count()) > 0.5:
|
||||
logger.debug(
|
||||
f"Deleting {(float(len(recordings_to_delete)) / recordings.count()):2f}% of recordings DB entries, could be due to configuration error. Aborting..."
|
||||
)
|
||||
return False
|
||||
|
||||
logger.debug(
|
||||
f"Deleting {len(recordings_to_delete)} recording DB entries with missing files"
|
||||
)
|
||||
|
||||
# create a temporary table for deletion
|
||||
RecordingsToDelete.create_table(temporary=True)
|
||||
|
||||
# insert ids to the temporary table
|
||||
max_inserts = 1000
|
||||
for batch in chunked(recordings_to_delete, max_inserts):
|
||||
RecordingsToDelete.insert_many(batch).execute()
|
||||
|
||||
try:
|
||||
# delete records in the main table that exist in the temporary table
|
||||
query = Recordings.delete().where(
|
||||
Recordings.id.in_(RecordingsToDelete.select(RecordingsToDelete.id))
|
||||
)
|
||||
query.execute()
|
||||
except DatabaseError as e:
|
||||
logger.error(f"Database error during recordings db cleanup: {e}")
|
||||
|
||||
return True
|
||||
|
||||
def delete_files_without_db_entry(files_on_disk: list[str]):
|
||||
"""Delete files where file is not inside frigate db."""
|
||||
files_to_delete = []
|
||||
|
||||
for file in files_on_disk:
|
||||
if not Recordings.select().where(Recordings.path == file).exists():
|
||||
files_to_delete.append(file)
|
||||
|
||||
if float(len(files_to_delete)) / max(1, len(files_on_disk)) > 0.5:
|
||||
logger.debug(
|
||||
f"Deleting {(float(len(files_to_delete)) / len(files_on_disk)):2f}% of recordings DB entries, could be due to configuration error. Aborting..."
|
||||
)
|
||||
return
|
||||
|
||||
for file in files_to_delete:
|
||||
os.unlink(file)
|
||||
|
||||
logger.debug("Start sync recordings.")
|
||||
|
||||
if limited:
|
||||
# get recording files from last 36 hours
|
||||
hour_check = (
|
||||
datetime.datetime.now().astimezone(datetime.timezone.utc)
|
||||
- datetime.timedelta(hours=36)
|
||||
).strftime("%Y-%m-%d/%H")
|
||||
files_on_disk = {
|
||||
os.path.join(root, file)
|
||||
for root, _, files in os.walk(RECORD_DIR)
|
||||
for file in files
|
||||
if file > hour_check
|
||||
}
|
||||
else:
|
||||
# get all recordings files on disk and put them in a set
|
||||
files_on_disk = {
|
||||
os.path.join(root, file)
|
||||
for root, _, files in os.walk(RECORD_DIR)
|
||||
for file in files
|
||||
}
|
||||
|
||||
db_success = delete_db_entries_without_file(files_on_disk)
|
||||
|
||||
# only try to cleanup files if db cleanup was successful
|
||||
if db_success:
|
||||
delete_files_without_db_entry(files_on_disk)
|
||||
|
||||
logger.debug("End sync recordings.")
|
||||
|
@ -263,8 +263,9 @@ def find_by_key(dictionary, target_key):
|
||||
return None
|
||||
|
||||
|
||||
def get_tomorrow_at_2() -> datetime.datetime:
|
||||
def get_tomorrow_at_time(hour: int) -> datetime.datetime:
|
||||
"""Returns the datetime of the following day at 2am."""
|
||||
tomorrow = datetime.datetime.now(get_localzone()) + datetime.timedelta(days=1)
|
||||
return tomorrow.replace(hour=2, minute=0, second=0).astimezone(
|
||||
return tomorrow.replace(hour=hour, minute=0, second=0).astimezone(
|
||||
datetime.timezone.utc
|
||||
)
|
||||
|
@ -26,7 +26,7 @@ from frigate.ptz.autotrack import ptz_moving_at_frame_time
|
||||
from frigate.track import ObjectTracker
|
||||
from frigate.track.norfair_tracker import NorfairTracker
|
||||
from frigate.types import PTZMetricsTypes
|
||||
from frigate.util.builtin import EventsPerSecond, get_tomorrow_at_2
|
||||
from frigate.util.builtin import EventsPerSecond, get_tomorrow_at_time
|
||||
from frigate.util.image import (
|
||||
FrameManager,
|
||||
SharedMemoryFrameManager,
|
||||
@ -528,7 +528,7 @@ def process_frames(
|
||||
fps = process_info["process_fps"]
|
||||
detection_fps = process_info["detection_fps"]
|
||||
current_frame_time = process_info["detection_frame"]
|
||||
next_region_update = get_tomorrow_at_2()
|
||||
next_region_update = get_tomorrow_at_time(2)
|
||||
|
||||
fps_tracker = EventsPerSecond()
|
||||
fps_tracker.start()
|
||||
@ -550,7 +550,7 @@ def process_frames(
|
||||
except queue.Empty:
|
||||
logger.error(f"Unable to get updated region grid for {camera_name}")
|
||||
|
||||
next_region_update = get_tomorrow_at_2()
|
||||
next_region_update = get_tomorrow_at_time(2)
|
||||
|
||||
try:
|
||||
if exit_on_empty:
|
||||
|
Loading…
Reference in New Issue
Block a user