db improvements (#7227)

* Store camera labels in dict and other optimizations

* Add max on timeout so it is at least 60

* Ensure db timeout is at least 60

* Update list once a day to ensure new labels are cleaned up

* Formatting

* Insert recordings as bulk instead of individually.

* Fix

* Refactor event and timeline cleanup

* Remove unused
This commit is contained in:
Nicolas Mowen 2023-07-21 06:29:50 -06:00 committed by GitHub
parent b655eca152
commit bfa7a5cc60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 120 additions and 78 deletions

View File

@ -313,7 +313,9 @@ class FrigateApp:
"cache_size": -512 * 1000, # 512MB of cache, "cache_size": -512 * 1000, # 512MB of cache,
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
}, },
timeout=10 * len([c for c in self.config.cameras.values() if c.enabled]), timeout=max(
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
),
) )
models = [Event, Recordings, Timeline] models = [Event, Recordings, Timeline]
self.db.bind(models) self.db.bind(models)

View File

@ -4,16 +4,22 @@ import datetime
import logging import logging
import os import os
import threading import threading
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path from pathlib import Path
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.models import Event from frigate.models import Event, Timeline
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class EventCleanupType(str, Enum):
clips = "clips"
snapshots = "snapshots"
class EventCleanup(threading.Thread): class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event: MpEvent): def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -21,25 +27,54 @@ class EventCleanup(threading.Thread):
self.config = config self.config = config
self.stop_event = stop_event self.stop_event = stop_event
self.camera_keys = list(self.config.cameras.keys()) self.camera_keys = list(self.config.cameras.keys())
self.removed_camera_labels: list[str] = None
self.camera_labels: dict[str, dict[str, any]] = {}
def expire(self, media_type: str) -> None: def get_removed_camera_labels(self) -> list[Event]:
# TODO: Refactor media_type to enum """Get a list of distinct labels for removed cameras."""
if self.removed_camera_labels is None:
self.removed_camera_labels = list(
Event.select(Event.label)
.where(Event.camera.not_in(self.camera_keys))
.distinct()
.execute()
)
return self.removed_camera_labels
def get_camera_labels(self, camera: str) -> list[Event]:
"""Get a list of distinct labels for each camera, updating once a day."""
if (
self.camera_labels.get(camera) is None
or self.camera_labels[camera]["last_update"]
< (datetime.datetime.now() - datetime.timedelta(days=1)).timestamp()
):
self.camera_labels[camera] = {
"last_update": datetime.datetime.now().timestamp(),
"labels": list(
Event.select(Event.label)
.where(Event.camera == camera)
.distinct()
.execute()
),
}
return self.camera_labels[camera]["labels"]
def expire(self, media_type: EventCleanupType) -> list[str]:
## Expire events from unlisted cameras based on the global config ## Expire events from unlisted cameras based on the global config
if media_type == "clips": if media_type == EventCleanupType.clips:
retain_config = self.config.record.events.retain retain_config = self.config.record.events.retain
file_extension = "mp4" file_extension = None # mp4 clips are no longer stored in /clips
update_params = {"has_clip": False} update_params = {"has_clip": False}
else: else:
retain_config = self.config.snapshots.retain retain_config = self.config.snapshots.retain
file_extension = "jpg" file_extension = "jpg"
update_params = {"has_snapshot": False} update_params = {"has_snapshot": False}
distinct_labels = ( distinct_labels = self.get_removed_camera_labels()
Event.select(Event.label)
.where(Event.camera.not_in(self.camera_keys))
.distinct()
)
## Expire events from cameras no longer in the config
# loop over object types in db # loop over object types in db
for event in distinct_labels: for event in distinct_labels:
# get expiration time for this label # get expiration time for this label
@ -76,16 +111,17 @@ class EventCleanup(threading.Thread):
) )
update_query.execute() update_query.execute()
events_to_update = []
## Expire events from cameras based on the camera config ## Expire events from cameras based on the camera config
for name, camera in self.config.cameras.items(): for name, camera in self.config.cameras.items():
if media_type == "clips": if media_type == EventCleanupType.clips:
retain_config = camera.record.events.retain retain_config = camera.record.events.retain
else: else:
retain_config = camera.snapshots.retain retain_config = camera.snapshots.retain
# get distinct objects in database for this camera # get distinct objects in database for this camera
distinct_labels = ( distinct_labels = self.get_camera_labels(name)
Event.select(Event.label).where(Event.camera == name).distinct()
)
# loop over object types in db # loop over object types in db
for event in distinct_labels: for event in distinct_labels:
@ -103,26 +139,27 @@ class EventCleanup(threading.Thread):
Event.label == event.label, Event.label == event.label,
Event.retain_indefinitely == False, Event.retain_indefinitely == False,
) )
# delete the grabbed clips from disk # delete the grabbed clips from disk
# only snapshots are stored in /clips
# so no need to delete mp4 files
for event in expired_events: for event in expired_events:
events_to_update.append(event.id)
if media_type == EventCleanupType.snapshots:
media_name = f"{event.camera}-{event.id}" media_name = f"{event.camera}-{event.id}"
media_path = Path( media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}" f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
) )
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
if file_extension == "jpg":
media_path = Path( media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}-clean.png" f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
) )
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
# update the clips attribute for the db entry # update the clips attribute for the db entry
update_query = Event.update(update_params).where( Event.update(update_params).where(Event.id << events_to_update).execute()
Event.camera == name, return events_to_update
Event.start_time < expire_after,
Event.label == event.label,
Event.retain_indefinitely == False,
)
update_query.execute()
def purge_duplicates(self) -> None: def purge_duplicates(self) -> None:
duplicate_query = """with grouped_events as ( duplicate_query = """with grouped_events as (
@ -149,8 +186,6 @@ class EventCleanup(threading.Thread):
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png") media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png")
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
media_path.unlink(missing_ok=True)
( (
Event.delete() Event.delete()
@ -161,8 +196,14 @@ class EventCleanup(threading.Thread):
def run(self) -> None: def run(self) -> None:
# only expire events every 5 minutes # only expire events every 5 minutes
while not self.stop_event.wait(300): while not self.stop_event.wait(300):
self.expire("clips") events_with_expired_clips = self.expire(EventCleanupType.clips)
self.expire("snapshots")
# delete timeline entries for events that have expired recordings
Timeline.delete().where(
Timeline.source_id << events_with_expired_clips
).execute()
self.expire(EventCleanupType.snapshots)
self.purge_duplicates() self.purge_duplicates()
# drop events from db where has_clip and has_snapshot are false # drop events from db where has_clip and has_snapshot are false

View File

@ -12,7 +12,7 @@ from peewee import DatabaseError, chunked
from frigate.config import FrigateConfig, RetainModeEnum from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import RECORD_DIR from frigate.const import RECORD_DIR
from frigate.models import Event, Recordings, RecordingsToDelete, Timeline from frigate.models import Event, Recordings, RecordingsToDelete
from frigate.record.util import remove_empty_directories from frigate.record.util import remove_empty_directories
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -140,15 +140,6 @@ class RecordingCleanup(threading.Thread):
Path(recording.path).unlink(missing_ok=True) Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id) deleted_recordings.add(recording.id)
# delete timeline entries relevant to this recording segment
Timeline.delete().where(
Timeline.timestamp.between(
recording.start_time, recording.end_time
),
Timeline.timestamp < expire_date,
Timeline.camera == camera,
).execute()
logger.debug(f"Expiring {len(deleted_recordings)} recordings") logger.debug(f"Expiring {len(deleted_recordings)} recordings")
# delete up to 100,000 at a time # delete up to 100,000 at a time
max_deletes = 100000 max_deletes = 100000

View File

@ -125,6 +125,7 @@ class RecordingMaintainer(threading.Thread):
self.end_time_cache.pop(cache_path, None) self.end_time_cache.pop(cache_path, None)
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:] grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
tasks = []
for camera, recordings in grouped_recordings.items(): for camera, recordings in grouped_recordings.items():
# clear out all the object recording info for old frames # clear out all the object recording info for old frames
while ( while (
@ -155,10 +156,15 @@ class RecordingMaintainer(threading.Thread):
.order_by(Event.start_time) .order_by(Event.start_time)
) )
await asyncio.gather( tasks.extend(
*(self.validate_and_move_segment(camera, events, r) for r in recordings) [self.validate_and_move_segment(camera, events, r) for r in recordings]
) )
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
Recordings.insert_many(
[r for r in recordings_to_insert if r is not None]
).execute()
async def validate_and_move_segment( async def validate_and_move_segment(
self, camera: str, events: Event, recording: dict[str, any] self, camera: str, events: Event, recording: dict[str, any]
) -> None: ) -> None:
@ -225,7 +231,7 @@ class RecordingMaintainer(threading.Thread):
if overlaps: if overlaps:
record_mode = self.config.cameras[camera].record.events.retain.mode record_mode = self.config.cameras[camera].record.events.retain.mode
# move from cache to recordings immediately # move from cache to recordings immediately
self.store_segment( return self.move_segment(
camera, camera,
start_time, start_time,
end_time, end_time,
@ -247,7 +253,7 @@ class RecordingMaintainer(threading.Thread):
# else retain days includes this segment # else retain days includes this segment
else: else:
record_mode = self.config.cameras[camera].record.retain.mode record_mode = self.config.cameras[camera].record.retain.mode
self.store_segment( return self.move_segment(
camera, start_time, end_time, duration, cache_path, record_mode camera, start_time, end_time, duration, cache_path, record_mode
) )
@ -290,7 +296,7 @@ class RecordingMaintainer(threading.Thread):
return SegmentInfo(motion_count, active_count, round(average_dBFS)) return SegmentInfo(motion_count, active_count, round(average_dBFS))
def store_segment( def move_segment(
self, self,
camera: str, camera: str,
start_time: datetime.datetime, start_time: datetime.datetime,
@ -298,7 +304,7 @@ class RecordingMaintainer(threading.Thread):
duration: float, duration: float,
cache_path: str, cache_path: str,
store_mode: RetainModeEnum, store_mode: RetainModeEnum,
) -> None: ) -> Optional[Recordings]:
segment_info = self.segment_stats(camera, start_time, end_time) segment_info = self.segment_stats(camera, start_time, end_time)
# check if the segment shouldn't be stored # check if the segment shouldn't be stored
@ -348,7 +354,7 @@ class RecordingMaintainer(threading.Thread):
if p.returncode != 0: if p.returncode != 0:
logger.error(f"Unable to convert {cache_path} to {file_path}") logger.error(f"Unable to convert {cache_path} to {file_path}")
logger.error(p.stderr) logger.error(p.stderr)
return return None
else: else:
logger.debug( logger.debug(
f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds." f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds."
@ -368,19 +374,20 @@ class RecordingMaintainer(threading.Thread):
rand_id = "".join( rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6) random.choices(string.ascii_lowercase + string.digits, k=6)
) )
Recordings.create(
id=f"{start_time.timestamp()}-{rand_id}", return {
camera=camera, Recordings.id: f"{start_time.timestamp()}-{rand_id}",
path=file_path, Recordings.camera: camera,
start_time=start_time.timestamp(), Recordings.path: file_path,
end_time=end_time.timestamp(), Recordings.start_time: start_time.timestamp(),
duration=duration, Recordings.end_time: end_time.timestamp(),
motion=segment_info.motion_box_count, Recordings.duration: duration,
Recordings.motion: segment_info.motion_box_count,
# TODO: update this to store list of active objects at some point # TODO: update this to store list of active objects at some point
objects=segment_info.active_object_count, Recordings.objects: segment_info.active_object_count,
dBFS=segment_info.average_dBFS, Recordings.dBFS: segment_info.average_dBFS,
segment_size=segment_size, Recordings.segment_size: segment_size,
) }
except Exception as e: except Exception as e:
logger.error(f"Unable to store recording segment {cache_path}") logger.error(f"Unable to store recording segment {cache_path}")
Path(cache_path).unlink(missing_ok=True) Path(cache_path).unlink(missing_ok=True)
@ -388,10 +395,11 @@ class RecordingMaintainer(threading.Thread):
# clear end_time cache # clear end_time cache
self.end_time_cache.pop(cache_path, None) self.end_time_cache.pop(cache_path, None)
return None
def run(self) -> None: def run(self) -> None:
# Check for new files every 5 seconds # Check for new files every 5 seconds
wait_time = 5.0 wait_time = 0.0
while not self.stop_event.wait(wait_time): while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp() run_start = datetime.datetime.now().timestamp()

View File

@ -45,7 +45,7 @@ def manage_recordings(
"cache_size": -512 * 1000, # 512MB of cache "cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
}, },
timeout=10 * len([c for c in config.cameras.values() if c.enabled]), timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
) )
models = [Event, Recordings, Timeline, RecordingsToDelete] models = [Event, Recordings, Timeline, RecordingsToDelete]
db.bind(models) db.bind(models)