diff --git a/frigate/app.py b/frigate/app.py index 5abf954d8..eca56f49f 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -313,7 +313,9 @@ class FrigateApp: "cache_size": -512 * 1000, # 512MB of cache, "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous }, - timeout=10 * len([c for c in self.config.cameras.values() if c.enabled]), + timeout=max( + 60, 10 * len([c for c in self.config.cameras.values() if c.enabled]) + ), ) models = [Event, Recordings, Timeline] self.db.bind(models) diff --git a/frigate/events/cleanup.py b/frigate/events/cleanup.py index 43fb5f8dc..d70a290d7 100644 --- a/frigate/events/cleanup.py +++ b/frigate/events/cleanup.py @@ -4,16 +4,22 @@ import datetime import logging import os import threading +from enum import Enum from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from frigate.config import FrigateConfig from frigate.const import CLIPS_DIR -from frigate.models import Event +from frigate.models import Event, Timeline logger = logging.getLogger(__name__) +class EventCleanupType(str, Enum): + clips = "clips" + snapshots = "snapshots" + + class EventCleanup(threading.Thread): def __init__(self, config: FrigateConfig, stop_event: MpEvent): threading.Thread.__init__(self) @@ -21,25 +27,54 @@ class EventCleanup(threading.Thread): self.config = config self.stop_event = stop_event self.camera_keys = list(self.config.cameras.keys()) + self.removed_camera_labels: list[str] = None + self.camera_labels: dict[str, dict[str, any]] = {} - def expire(self, media_type: str) -> None: - # TODO: Refactor media_type to enum + def get_removed_camera_labels(self) -> list[Event]: + """Get a list of distinct labels for removed cameras.""" + if self.removed_camera_labels is None: + self.removed_camera_labels = list( + Event.select(Event.label) + .where(Event.camera.not_in(self.camera_keys)) + .distinct() + .execute() + ) + + return self.removed_camera_labels + + def get_camera_labels(self, camera: str) -> list[Event]: + """Get a list of distinct labels for each camera, updating once a day.""" + if ( + self.camera_labels.get(camera) is None + or self.camera_labels[camera]["last_update"] + < (datetime.datetime.now() - datetime.timedelta(days=1)).timestamp() + ): + self.camera_labels[camera] = { + "last_update": datetime.datetime.now().timestamp(), + "labels": list( + Event.select(Event.label) + .where(Event.camera == camera) + .distinct() + .execute() + ), + } + + return self.camera_labels[camera]["labels"] + + def expire(self, media_type: EventCleanupType) -> list[str]: ## Expire events from unlisted cameras based on the global config - if media_type == "clips": + if media_type == EventCleanupType.clips: retain_config = self.config.record.events.retain - file_extension = "mp4" + file_extension = None # mp4 clips are no longer stored in /clips update_params = {"has_clip": False} else: retain_config = self.config.snapshots.retain file_extension = "jpg" update_params = {"has_snapshot": False} - distinct_labels = ( - Event.select(Event.label) - .where(Event.camera.not_in(self.camera_keys)) - .distinct() - ) + distinct_labels = self.get_removed_camera_labels() + ## Expire events from cameras no longer in the config # loop over object types in db for event in distinct_labels: # get expiration time for this label @@ -76,16 +111,17 @@ class EventCleanup(threading.Thread): ) update_query.execute() + events_to_update = [] + ## Expire events from cameras based on the camera config for name, camera in self.config.cameras.items(): - if media_type == "clips": + if media_type == EventCleanupType.clips: retain_config = camera.record.events.retain else: retain_config = camera.snapshots.retain + # get distinct objects in database for this camera - distinct_labels = ( - Event.select(Event.label).where(Event.camera == name).distinct() - ) + distinct_labels = self.get_camera_labels(name) # loop over object types in db for event in distinct_labels: @@ -103,26 +139,27 @@ class EventCleanup(threading.Thread): Event.label == event.label, Event.retain_indefinitely == False, ) + # delete the grabbed clips from disk + # only snapshots are stored in /clips + # so no need to delete mp4 files for event in expired_events: - media_name = f"{event.camera}-{event.id}" - media_path = Path( - f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}" - ) - media_path.unlink(missing_ok=True) - if file_extension == "jpg": + events_to_update.append(event.id) + + if media_type == EventCleanupType.snapshots: + media_name = f"{event.camera}-{event.id}" + media_path = Path( + f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}" + ) + media_path.unlink(missing_ok=True) media_path = Path( f"{os.path.join(CLIPS_DIR, media_name)}-clean.png" ) media_path.unlink(missing_ok=True) - # update the clips attribute for the db entry - update_query = Event.update(update_params).where( - Event.camera == name, - Event.start_time < expire_after, - Event.label == event.label, - Event.retain_indefinitely == False, - ) - update_query.execute() + + # update the clips attribute for the db entry + Event.update(update_params).where(Event.id << events_to_update).execute() + return events_to_update def purge_duplicates(self) -> None: duplicate_query = """with grouped_events as ( @@ -149,8 +186,6 @@ class EventCleanup(threading.Thread): media_path.unlink(missing_ok=True) media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png") media_path.unlink(missing_ok=True) - media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4") - media_path.unlink(missing_ok=True) ( Event.delete() @@ -161,8 +196,14 @@ class EventCleanup(threading.Thread): def run(self) -> None: # only expire events every 5 minutes while not self.stop_event.wait(300): - self.expire("clips") - self.expire("snapshots") + events_with_expired_clips = self.expire(EventCleanupType.clips) + + # delete timeline entries for events that have expired recordings + Timeline.delete().where( + Timeline.source_id << events_with_expired_clips + ).execute() + + self.expire(EventCleanupType.snapshots) self.purge_duplicates() # drop events from db where has_clip and has_snapshot are false diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index 64ec28746..4600ce036 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -12,7 +12,7 @@ from peewee import DatabaseError, chunked from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import RECORD_DIR -from frigate.models import Event, Recordings, RecordingsToDelete, Timeline +from frigate.models import Event, Recordings, RecordingsToDelete from frigate.record.util import remove_empty_directories logger = logging.getLogger(__name__) @@ -140,15 +140,6 @@ class RecordingCleanup(threading.Thread): Path(recording.path).unlink(missing_ok=True) deleted_recordings.add(recording.id) - # delete timeline entries relevant to this recording segment - Timeline.delete().where( - Timeline.timestamp.between( - recording.start_time, recording.end_time - ), - Timeline.timestamp < expire_date, - Timeline.camera == camera, - ).execute() - logger.debug(f"Expiring {len(deleted_recordings)} recordings") # delete up to 100,000 at a time max_deletes = 100000 diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index e1dabdf67..372e6a4fd 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -125,6 +125,7 @@ class RecordingMaintainer(threading.Thread): self.end_time_cache.pop(cache_path, None) grouped_recordings[camera] = grouped_recordings[camera][-keep_count:] + tasks = [] for camera, recordings in grouped_recordings.items(): # clear out all the object recording info for old frames while ( @@ -155,10 +156,15 @@ class RecordingMaintainer(threading.Thread): .order_by(Event.start_time) ) - await asyncio.gather( - *(self.validate_and_move_segment(camera, events, r) for r in recordings) + tasks.extend( + [self.validate_and_move_segment(camera, events, r) for r in recordings] ) + recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) + Recordings.insert_many( + [r for r in recordings_to_insert if r is not None] + ).execute() + async def validate_and_move_segment( self, camera: str, events: Event, recording: dict[str, any] ) -> None: @@ -225,7 +231,7 @@ class RecordingMaintainer(threading.Thread): if overlaps: record_mode = self.config.cameras[camera].record.events.retain.mode # move from cache to recordings immediately - self.store_segment( + return self.move_segment( camera, start_time, end_time, @@ -247,7 +253,7 @@ class RecordingMaintainer(threading.Thread): # else retain days includes this segment else: record_mode = self.config.cameras[camera].record.retain.mode - self.store_segment( + return self.move_segment( camera, start_time, end_time, duration, cache_path, record_mode ) @@ -290,7 +296,7 @@ class RecordingMaintainer(threading.Thread): return SegmentInfo(motion_count, active_count, round(average_dBFS)) - def store_segment( + def move_segment( self, camera: str, start_time: datetime.datetime, @@ -298,7 +304,7 @@ class RecordingMaintainer(threading.Thread): duration: float, cache_path: str, store_mode: RetainModeEnum, - ) -> None: + ) -> Optional[Recordings]: segment_info = self.segment_stats(camera, start_time, end_time) # check if the segment shouldn't be stored @@ -348,7 +354,7 @@ class RecordingMaintainer(threading.Thread): if p.returncode != 0: logger.error(f"Unable to convert {cache_path} to {file_path}") logger.error(p.stderr) - return + return None else: logger.debug( f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds." @@ -368,19 +374,20 @@ class RecordingMaintainer(threading.Thread): rand_id = "".join( random.choices(string.ascii_lowercase + string.digits, k=6) ) - Recordings.create( - id=f"{start_time.timestamp()}-{rand_id}", - camera=camera, - path=file_path, - start_time=start_time.timestamp(), - end_time=end_time.timestamp(), - duration=duration, - motion=segment_info.motion_box_count, + + return { + Recordings.id: f"{start_time.timestamp()}-{rand_id}", + Recordings.camera: camera, + Recordings.path: file_path, + Recordings.start_time: start_time.timestamp(), + Recordings.end_time: end_time.timestamp(), + Recordings.duration: duration, + Recordings.motion: segment_info.motion_box_count, # TODO: update this to store list of active objects at some point - objects=segment_info.active_object_count, - dBFS=segment_info.average_dBFS, - segment_size=segment_size, - ) + Recordings.objects: segment_info.active_object_count, + Recordings.dBFS: segment_info.average_dBFS, + Recordings.segment_size: segment_size, + } except Exception as e: logger.error(f"Unable to store recording segment {cache_path}") Path(cache_path).unlink(missing_ok=True) @@ -388,10 +395,11 @@ class RecordingMaintainer(threading.Thread): # clear end_time cache self.end_time_cache.pop(cache_path, None) + return None def run(self) -> None: # Check for new files every 5 seconds - wait_time = 5.0 + wait_time = 0.0 while not self.stop_event.wait(wait_time): run_start = datetime.datetime.now().timestamp() diff --git a/frigate/record/record.py b/frigate/record/record.py index 6e40ac01c..0c76f33cb 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -45,7 +45,7 @@ def manage_recordings( "cache_size": -512 * 1000, # 512MB of cache "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous }, - timeout=10 * len([c for c in config.cameras.values() if c.enabled]), + timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), ) models = [Event, Recordings, Timeline, RecordingsToDelete] db.bind(models) diff --git a/frigate/storage.py b/frigate/storage.py index 2511c2aa8..2088ea57c 100644 --- a/frigate/storage.py +++ b/frigate/storage.py @@ -42,21 +42,21 @@ class StorageMaintainer(threading.Thread): ) } - # calculate MB/hr - try: - bandwidth = round( - Recordings.select(fn.AVG(bandwidth_equation)) - .where(Recordings.camera == camera, Recordings.segment_size > 0) - .limit(100) - .scalar() - * 3600, - 2, - ) - except TypeError: - bandwidth = 0 + # calculate MB/hr + try: + bandwidth = round( + Recordings.select(fn.AVG(bandwidth_equation)) + .where(Recordings.camera == camera, Recordings.segment_size > 0) + .limit(100) + .scalar() + * 3600, + 2, + ) + except TypeError: + bandwidth = 0 - self.camera_storage_stats[camera]["bandwidth"] = bandwidth - logger.debug(f"{camera} has a bandwidth of {bandwidth} MiB/hr.") + self.camera_storage_stats[camera]["bandwidth"] = bandwidth + logger.debug(f"{camera} has a bandwidth of {bandwidth} MiB/hr.") def calculate_camera_usages(self) -> dict[str, dict]: """Calculate the storage usage of each camera."""