Move recording management to separate process (#6248)

* Move recordings management to own process and ensure db multiprocess access

* remove reference to old threads

* Cleanup directory remover

* Mypy fixes

* Fix mypy

* Add support back for setting record via MQTT and WS

* Formatting

* Fix rebase issue
This commit is contained in:
Nicolas Mowen 2023-04-26 07:25:26 -06:00 committed by GitHub
parent 6dc82b6cef
commit e451f44ced
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 402 additions and 294 deletions

View File

@ -28,14 +28,14 @@ from frigate.object_processing import TrackedObjectProcessor
from frigate.output import output_frames
from frigate.plus import PlusApi
from frigate.ptz import OnvifController
from frigate.record import RecordingCleanup, RecordingMaintainer
from frigate.record.record import manage_recordings
from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.version import VERSION
from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog
from frigate.types import CameraMetricsTypes
from frigate.types import CameraMetricsTypes, RecordMetricsTypes
logger = logging.getLogger(__name__)
@ -50,6 +50,7 @@ class FrigateApp:
self.log_queue: Queue = mp.Queue()
self.plus_api = PlusApi()
self.camera_metrics: dict[str, CameraMetricsTypes] = {}
self.record_metrics: dict[str, RecordMetricsTypes] = {}
def set_environment_vars(self) -> None:
for key, value in self.config.environment_vars.items():
@ -109,6 +110,11 @@ class FrigateApp:
"capture_process": None,
"process": None,
}
self.record_metrics[camera_name] = {
"record_enabled": mp.Value(
"i", self.config.cameras[camera_name].record.enabled
)
}
def set_log_levels(self) -> None:
logging.getLogger().setLevel(self.config.logger.default.value.upper())
@ -158,6 +164,20 @@ class FrigateApp:
migrate_db.close()
def init_recording_manager(self) -> None:
recording_process = mp.Process(
target=manage_recordings,
name="recording_manager",
args=(self.config, self.recordings_info_queue, self.record_metrics),
)
recording_process.daemon = True
self.recording_process = recording_process
recording_process.start()
logger.info(f"Recording process started: {recording_process.pid}")
def bind_database(self) -> None:
"""Bind db to the main process."""
# NOTE: all db accessing processes need to be created before the db can be bound to the main process
self.db = SqliteQueueDatabase(self.config.database.path)
models = [Event, Recordings, Timeline]
self.db.bind(models)
@ -189,7 +209,11 @@ class FrigateApp:
comms.append(WebSocketClient(self.config))
self.dispatcher = Dispatcher(
self.config, self.onvif_controller, self.camera_metrics, comms
self.config,
self.onvif_controller,
self.camera_metrics,
self.record_metrics,
comms,
)
def start_detectors(self) -> None:
@ -318,16 +342,6 @@ class FrigateApp:
self.event_cleanup = EventCleanup(self.config, self.stop_event)
self.event_cleanup.start()
def start_recording_maintainer(self) -> None:
self.recording_maintainer = RecordingMaintainer(
self.config, self.recordings_info_queue, self.stop_event
)
self.recording_maintainer.start()
def start_recording_cleanup(self) -> None:
self.recording_cleanup = RecordingCleanup(self.config, self.stop_event)
self.recording_cleanup.start()
def start_storage_maintainer(self) -> None:
self.storage_maintainer = StorageMaintainer(self.config, self.stop_event)
self.storage_maintainer.start()
@ -390,6 +404,8 @@ class FrigateApp:
self.init_queues()
self.init_database()
self.init_onvif()
self.init_recording_manager()
self.bind_database()
self.init_dispatcher()
except Exception as e:
print(e)
@ -406,8 +422,6 @@ class FrigateApp:
self.start_timeline_processor()
self.start_event_processor()
self.start_event_cleanup()
self.start_recording_maintainer()
self.start_recording_cleanup()
self.start_stats_emitter()
self.start_watchdog()
self.check_shm()
@ -443,8 +457,6 @@ class FrigateApp:
self.detected_frames_processor.join()
self.event_processor.join()
self.event_cleanup.join()
self.recording_maintainer.join()
self.recording_cleanup.join()
self.stats_emitter.join()
self.frigate_watchdog.join()
self.db.stop()

View File

@ -8,7 +8,7 @@ from abc import ABC, abstractmethod
from frigate.config import FrigateConfig
from frigate.ptz import OnvifController, OnvifCommandEnum
from frigate.types import CameraMetricsTypes
from frigate.types import CameraMetricsTypes, RecordMetricsTypes
from frigate.util import restart_frigate
@ -42,11 +42,13 @@ class Dispatcher:
config: FrigateConfig,
onvif: OnvifController,
camera_metrics: dict[str, CameraMetricsTypes],
record_metrics: dict[str, RecordMetricsTypes],
communicators: list[Communicator],
) -> None:
self.config = config
self.onvif = onvif
self.camera_metrics = camera_metrics
self.record_metrics = record_metrics
self.comms = communicators
for comm in self.comms:
@ -192,13 +194,15 @@ class Dispatcher:
record_settings = self.config.cameras[camera_name].record
if payload == "ON":
if not record_settings.enabled:
if not self.record_metrics[camera_name]["record_enabled"].value:
logger.info(f"Turning on recordings for {camera_name}")
record_settings.enabled = True
self.record_metrics[camera_name]["record_enabled"].value = True
elif payload == "OFF":
if record_settings.enabled:
if self.record_metrics[camera_name]["record_enabled"].value:
logger.info(f"Turning off recordings for {camera_name}")
record_settings.enabled = False
self.record_metrics[camera_name]["record_enabled"].value = False
self.publish(f"{camera_name}/recordings/state", payload, retain=True)

View File

@ -8,7 +8,6 @@ CACHE_DIR = "/tmp/cache"
YAML_EXT = (".yaml", ".yml")
PLUS_ENV_VAR = "PLUS_API_KEY"
PLUS_API_HOST = "https://api.frigate.video"
MAX_SEGMENT_DURATION = 600
BTBN_PATH = "/usr/lib/btbn-ffmpeg"
# Regex Consts
@ -23,3 +22,8 @@ DRIVER_ENV_VAR = "LIBVA_DRIVER_NAME"
DRIVER_AMD = "radeonsi"
DRIVER_INTEL_i965 = "i965"
DRIVER_INTEL_iHD = "iHD"
# Record Values
MAX_SEGMENT_DURATION = 600
SECONDS_IN_DAY = 60 * 60 * 24

248
frigate/record/cleanup.py Normal file
View File

@ -0,0 +1,248 @@
"""Cleanup recordings that are expired based on retention config."""
import datetime
import itertools
import logging
import subprocess as sp
import threading
from pathlib import Path
from peewee import DoesNotExist
from multiprocessing.synchronize import Event as MpEvent
from frigate.config import RetainModeEnum, FrigateConfig
from frigate.const import RECORD_DIR, SECONDS_IN_DAY
from frigate.models import Event, Recordings
from frigate.record.util import remove_empty_directories
logger = logging.getLogger(__name__)
class RecordingCleanup(threading.Thread):
"""Cleanup existing recordings based on retention config."""
def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
threading.Thread.__init__(self)
self.name = "recording_cleanup"
self.config = config
self.stop_event = stop_event
def clean_tmp_clips(self) -> None:
# delete any clips more than 5 minutes old
for p in Path("/tmp/cache").rglob("clip_*.mp4"):
logger.debug(f"Checking tmp clip {p}.")
if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1):
logger.debug("Deleting tmp clip.")
# empty contents of file before unlinking https://github.com/blakeblackshear/frigate/issues/4769
with open(p, "w"):
pass
p.unlink(missing_ok=True)
def expire_recordings(self) -> None:
logger.debug("Start expire recordings (new).")
logger.debug("Start deleted cameras.")
# Handle deleted cameras
expire_days = self.config.record.retain.days
expire_before = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
no_camera_recordings: Recordings = Recordings.select().where(
Recordings.camera.not_in(list(self.config.cameras.keys())),
Recordings.end_time < expire_before,
)
deleted_recordings = set()
for recording in no_camera_recordings:
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
Recordings.delete().where(Recordings.id << deleted_recordings).execute()
logger.debug("End deleted cameras.")
logger.debug("Start all cameras.")
for camera, config in self.config.cameras.items():
logger.debug(f"Start camera: {camera}.")
# Get the timestamp for cutoff of retained days
expire_days = config.record.retain.days
expire_date = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
# Get recordings to check for expiration
recordings: Recordings = (
Recordings.select()
.where(
Recordings.camera == camera,
Recordings.end_time < expire_date,
)
.order_by(Recordings.start_time)
)
# Get all the events to check against
events: Event = (
Event.select()
.where(
Event.camera == camera,
# need to ensure segments for all events starting
# before the expire date are included
Event.start_time < expire_date,
Event.has_clip,
)
.order_by(Event.start_time)
.objects()
)
# loop over recordings and see if they overlap with any non-expired events
# TODO: expire segments based on segment stats according to config
event_start = 0
deleted_recordings = set()
for recording in recordings.objects().iterator():
keep = False
# Now look for a reason to keep this recording segment
for idx in range(event_start, len(events)):
event = events[idx]
# if the event starts in the future, stop checking events
# and let this recording segment expire
if event.start_time > recording.end_time:
keep = False
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event.end_time is None or event.end_time >= recording.start_time:
keep = True
break
# if the event ends before this recording segment starts, skip
# this event and check the next event for an overlap.
# since the events and recordings are sorted, we can skip events
# that end before the previous recording segment started on future segments
if event.end_time < recording.start_time:
event_start = idx
# Delete recordings outside of the retention window or based on the retention mode
if (
not keep
or (
config.record.events.retain.mode == RetainModeEnum.motion
and recording.motion == 0
)
or (
config.record.events.retain.mode
== RetainModeEnum.active_objects
and recording.objects == 0
)
):
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
# delete up to 100,000 at a time
max_deletes = 100000
deleted_recordings_list = list(deleted_recordings)
for i in range(0, len(deleted_recordings_list), max_deletes):
Recordings.delete().where(
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
logger.debug(f"End camera: {camera}.")
logger.debug("End all cameras.")
logger.debug("End expire recordings (new).")
def expire_files(self) -> None:
logger.debug("Start expire files (legacy).")
default_expire = (
datetime.datetime.now().timestamp()
- SECONDS_IN_DAY * self.config.record.retain.days
)
delete_before = {}
for name, camera in self.config.cameras.items():
delete_before[name] = (
datetime.datetime.now().timestamp()
- SECONDS_IN_DAY * camera.record.retain.days
)
# find all the recordings older than the oldest recording in the db
try:
oldest_recording = Recordings.select().order_by(Recordings.start_time).get()
p = Path(oldest_recording.path)
oldest_timestamp = p.stat().st_mtime - 1
except DoesNotExist:
oldest_timestamp = datetime.datetime.now().timestamp()
except FileNotFoundError:
logger.warning(f"Unable to find file from recordings database: {p}")
Recordings.delete().where(Recordings.id == oldest_recording.id).execute()
return
logger.debug(f"Oldest recording in the db: {oldest_timestamp}")
process = sp.run(
["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"],
capture_output=True,
text=True,
)
files_to_check = process.stdout.splitlines()
for f in files_to_check:
p = Path(f)
try:
if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire):
p.unlink(missing_ok=True)
except FileNotFoundError:
logger.warning(f"Attempted to expire missing file: {f}")
logger.debug("End expire files (legacy).")
def sync_recordings(self) -> None:
logger.debug("Start sync recordings.")
# get all recordings in the db
recordings: Recordings = Recordings.select()
# get all recordings files on disk
process = sp.run(
["find", RECORD_DIR, "-type", "f"],
capture_output=True,
text=True,
)
files_on_disk = process.stdout.splitlines()
recordings_to_delete = []
for recording in recordings.objects().iterator():
if not recording.path in files_on_disk:
recordings_to_delete.append(recording.id)
logger.debug(
f"Deleting {len(recordings_to_delete)} recordings with missing files"
)
# delete up to 100,000 at a time
max_deletes = 100000
for i in range(0, len(recordings_to_delete), max_deletes):
Recordings.delete().where(
Recordings.id << recordings_to_delete[i : i + max_deletes]
).execute()
logger.debug("End sync recordings.")
def run(self) -> None:
# on startup sync recordings with disk (disabled due to too much CPU usage)
# self.sync_recordings()
# Expire tmp clips every minute, recordings and clean directories every hour.
for counter in itertools.cycle(range(self.config.record.expire_interval)):
if self.stop_event.wait(60):
logger.info(f"Exiting recording cleanup...")
break
self.clean_tmp_clips()
if counter == 0:
self.expire_recordings()
self.expire_files()
remove_empty_directories(RECORD_DIR)

View File

@ -1,5 +1,6 @@
"""Maintain recording segments in cache."""
import datetime
import itertools
import logging
import multiprocessing as mp
import os
@ -8,51 +9,40 @@ import random
import string
import subprocess as sp
import threading
from collections import defaultdict
from pathlib import Path
import psutil
from peewee import JOIN, DoesNotExist
from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any, Tuple
from frigate.config import RetainModeEnum, FrigateConfig
from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR
from frigate.models import Event, Recordings
from frigate.types import RecordMetricsTypes
from frigate.util import area
logger = logging.getLogger(__name__)
SECONDS_IN_DAY = 60 * 60 * 24
def remove_empty_directories(directory):
# list all directories recursively and sort them by path,
# longest first
paths = sorted(
[x[0] for x in os.walk(RECORD_DIR)],
key=lambda p: len(str(p)),
reverse=True,
)
for path in paths:
# don't delete the parent
if path == RECORD_DIR:
continue
if len(os.listdir(path)) == 0:
os.rmdir(path)
class RecordingMaintainer(threading.Thread):
def __init__(
self, config: FrigateConfig, recordings_info_queue: mp.Queue, stop_event
self,
config: FrigateConfig,
recordings_info_queue: mp.Queue,
process_info: dict[str, RecordMetricsTypes],
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "recording_maint"
self.name = "recording_maintainer"
self.config = config
self.recordings_info_queue = recordings_info_queue
self.process_info = process_info
self.stop_event = stop_event
self.recordings_info = defaultdict(list)
self.end_time_cache = {}
self.recordings_info: dict[str, Any] = defaultdict(list)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
def move_files(self):
def move_files(self) -> None:
cache_files = sorted(
[
d
@ -77,14 +67,14 @@ class RecordingMaintainer(threading.Thread):
continue
# group recordings by camera
grouped_recordings = defaultdict(list)
for f in cache_files:
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
for cache in cache_files:
# Skip files currently in use
if f in files_in_use:
if cache in files_in_use:
continue
cache_path = os.path.join(CACHE_DIR, f)
basename = os.path.splitext(f)[0]
cache_path = os.path.join(CACHE_DIR, cache)
basename = os.path.splitext(cache)[0]
camera, date = basename.rsplit("-", maxsplit=1)
start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
@ -104,8 +94,8 @@ class RecordingMaintainer(threading.Thread):
f"Unable to keep up with recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {segment_count} and discarding the rest..."
)
to_remove = grouped_recordings[camera][:-keep_count]
for f in to_remove:
cache_path = f["cache_path"]
for rec in to_remove:
cache_path = rec["cache_path"]
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
@ -138,7 +128,7 @@ class RecordingMaintainer(threading.Thread):
# Just delete files if recordings are turned off
if (
not camera in self.config.cameras
or not self.config.cameras[camera].record.enabled
or not self.process_info[camera]["record_enabled"].value
):
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
@ -170,7 +160,7 @@ class RecordingMaintainer(threading.Thread):
else:
if duration == -1:
logger.warning(
f"Failed to probe corrupt segment {cache_path}: {p.returncode} - {p.stderr}"
f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}"
)
logger.warning(
@ -241,7 +231,9 @@ class RecordingMaintainer(threading.Thread):
camera, start_time, end_time, duration, cache_path, record_mode
)
def segment_stats(self, camera, start_time, end_time):
def segment_stats(
self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime
) -> Tuple[int, int]:
active_count = 0
motion_count = 0
for frame in self.recordings_info[camera]:
@ -266,13 +258,13 @@ class RecordingMaintainer(threading.Thread):
def store_segment(
self,
camera,
camera: str,
start_time: datetime.datetime,
end_time: datetime.datetime,
duration,
cache_path,
duration: float,
cache_path: str,
store_mode: RetainModeEnum,
):
) -> None:
motion_count, active_count = self.segment_stats(camera, start_time, end_time)
# check if the segment shouldn't be stored
@ -363,9 +355,9 @@ class RecordingMaintainer(threading.Thread):
# clear end_time cache
self.end_time_cache.pop(cache_path, None)
def run(self):
def run(self) -> None:
# Check for new files every 5 seconds
wait_time = 5
wait_time = 5.0
while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp()
@ -380,7 +372,7 @@ class RecordingMaintainer(threading.Thread):
regions,
) = self.recordings_info_queue.get(False)
if self.config.cameras[camera].record.enabled:
if self.process_info[camera]["record_enabled"].value:
self.recordings_info[camera].append(
(
frame_time,
@ -403,231 +395,3 @@ class RecordingMaintainer(threading.Thread):
wait_time = max(0, 5 - duration)
logger.info(f"Exiting recording maintenance...")
class RecordingCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event):
threading.Thread.__init__(self)
self.name = "recording_cleanup"
self.config = config
self.stop_event = stop_event
def clean_tmp_clips(self):
# delete any clips more than 5 minutes old
for p in Path("/tmp/cache").rglob("clip_*.mp4"):
logger.debug(f"Checking tmp clip {p}.")
if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1):
logger.debug("Deleting tmp clip.")
# empty contents of file before unlinking https://github.com/blakeblackshear/frigate/issues/4769
with open(p, "w"):
pass
p.unlink(missing_ok=True)
def expire_recordings(self):
logger.debug("Start expire recordings (new).")
logger.debug("Start deleted cameras.")
# Handle deleted cameras
expire_days = self.config.record.retain.days
expire_before = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
no_camera_recordings: Recordings = Recordings.select().where(
Recordings.camera.not_in(list(self.config.cameras.keys())),
Recordings.end_time < expire_before,
)
deleted_recordings = set()
for recording in no_camera_recordings:
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
Recordings.delete().where(Recordings.id << deleted_recordings).execute()
logger.debug("End deleted cameras.")
logger.debug("Start all cameras.")
for camera, config in self.config.cameras.items():
logger.debug(f"Start camera: {camera}.")
# Get the timestamp for cutoff of retained days
expire_days = config.record.retain.days
expire_date = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
# Get recordings to check for expiration
recordings: Recordings = (
Recordings.select()
.where(
Recordings.camera == camera,
Recordings.end_time < expire_date,
)
.order_by(Recordings.start_time)
)
# Get all the events to check against
events: Event = (
Event.select()
.where(
Event.camera == camera,
# need to ensure segments for all events starting
# before the expire date are included
Event.start_time < expire_date,
Event.has_clip,
)
.order_by(Event.start_time)
.objects()
)
# loop over recordings and see if they overlap with any non-expired events
# TODO: expire segments based on segment stats according to config
event_start = 0
deleted_recordings = set()
for recording in recordings.objects().iterator():
keep = False
# Now look for a reason to keep this recording segment
for idx in range(event_start, len(events)):
event = events[idx]
# if the event starts in the future, stop checking events
# and let this recording segment expire
if event.start_time > recording.end_time:
keep = False
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event.end_time is None or event.end_time >= recording.start_time:
keep = True
break
# if the event ends before this recording segment starts, skip
# this event and check the next event for an overlap.
# since the events and recordings are sorted, we can skip events
# that end before the previous recording segment started on future segments
if event.end_time < recording.start_time:
event_start = idx
# Delete recordings outside of the retention window or based on the retention mode
if (
not keep
or (
config.record.events.retain.mode == RetainModeEnum.motion
and recording.motion == 0
)
or (
config.record.events.retain.mode
== RetainModeEnum.active_objects
and recording.objects == 0
)
):
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
# delete up to 100,000 at a time
max_deletes = 100000
deleted_recordings_list = list(deleted_recordings)
for i in range(0, len(deleted_recordings_list), max_deletes):
Recordings.delete().where(
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
logger.debug(f"End camera: {camera}.")
logger.debug("End all cameras.")
logger.debug("End expire recordings (new).")
def expire_files(self):
logger.debug("Start expire files (legacy).")
default_expire = (
datetime.datetime.now().timestamp()
- SECONDS_IN_DAY * self.config.record.retain.days
)
delete_before = {}
for name, camera in self.config.cameras.items():
delete_before[name] = (
datetime.datetime.now().timestamp()
- SECONDS_IN_DAY * camera.record.retain.days
)
# find all the recordings older than the oldest recording in the db
try:
oldest_recording = Recordings.select().order_by(Recordings.start_time).get()
p = Path(oldest_recording.path)
oldest_timestamp = p.stat().st_mtime - 1
except DoesNotExist:
oldest_timestamp = datetime.datetime.now().timestamp()
except FileNotFoundError:
logger.warning(f"Unable to find file from recordings database: {p}")
Recordings.delete().where(Recordings.id == oldest_recording.id).execute()
return
logger.debug(f"Oldest recording in the db: {oldest_timestamp}")
process = sp.run(
["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"],
capture_output=True,
text=True,
)
files_to_check = process.stdout.splitlines()
for f in files_to_check:
p = Path(f)
try:
if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire):
p.unlink(missing_ok=True)
except FileNotFoundError:
logger.warning(f"Attempted to expire missing file: {f}")
logger.debug("End expire files (legacy).")
def sync_recordings(self):
logger.debug("Start sync recordings.")
# get all recordings in the db
recordings: Recordings = Recordings.select()
# get all recordings files on disk
process = sp.run(
["find", RECORD_DIR, "-type", "f"],
capture_output=True,
text=True,
)
files_on_disk = process.stdout.splitlines()
recordings_to_delete = []
for recording in recordings.objects().iterator():
if not recording.path in files_on_disk:
recordings_to_delete.append(recording.id)
logger.debug(
f"Deleting {len(recordings_to_delete)} recordings with missing files"
)
# delete up to 100,000 at a time
max_deletes = 100000
for i in range(0, len(recordings_to_delete), max_deletes):
Recordings.delete().where(
Recordings.id << recordings_to_delete[i : i + max_deletes]
).execute()
logger.debug("End sync recordings.")
def run(self):
# on startup sync recordings with disk (disabled due to too much CPU usage)
# self.sync_recordings()
# Expire tmp clips every minute, recordings and clean directories every hour.
for counter in itertools.cycle(range(self.config.record.expire_interval)):
if self.stop_event.wait(60):
logger.info(f"Exiting recording cleanup...")
break
self.clean_tmp_clips()
if counter == 0:
self.expire_recordings()
self.expire_files()
remove_empty_directories(RECORD_DIR)

53
frigate/record/record.py Normal file
View File

@ -0,0 +1,53 @@
"""Run recording maintainer and cleanup."""
import logging
import multiprocessing as mp
import signal
import threading
from setproctitle import setproctitle
from types import FrameType
from typing import Optional
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.config import FrigateConfig
from frigate.models import Event, Recordings, Timeline
from frigate.record.cleanup import RecordingCleanup
from frigate.record.maintainer import RecordingMaintainer
from frigate.types import RecordMetricsTypes
from frigate.util import listen
logger = logging.getLogger(__name__)
def manage_recordings(
config: FrigateConfig,
recordings_info_queue: mp.Queue,
process_info: dict[str, RecordMetricsTypes],
) -> None:
stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:recording_manager"
setproctitle("frigate.recording_manager")
listen()
db = SqliteQueueDatabase(config.database.path)
models = [Event, Recordings, Timeline]
db.bind(models)
maintainer = RecordingMaintainer(
config, recordings_info_queue, process_info, stop_event
)
maintainer.start()
cleanup = RecordingCleanup(config, stop_event)
cleanup.start()
logger.info("recording_manager: exiting subprocess")

19
frigate/record/util.py Normal file
View File

@ -0,0 +1,19 @@
"""Recordings Utilities."""
import os
def remove_empty_directories(directory: str) -> None:
# list all directories recursively and sort them by path,
# longest first
paths = sorted(
[x[0] for x in os.walk(directory)],
key=lambda p: len(str(p)),
reverse=True,
)
for path in paths:
# don't delete the parent
if path == directory:
continue
if len(os.listdir(path)) == 0:
os.rmdir(path)

View File

@ -24,6 +24,10 @@ class CameraMetricsTypes(TypedDict):
skipped_fps: Synchronized
class RecordMetricsTypes(TypedDict):
record_enabled: Synchronized
class StatsTrackingTypes(TypedDict):
camera_metrics: dict[str, CameraMetricsTypes]
detectors: dict[str, ObjectDetectProcess]