blakeblackshear.frigate/frigate/record.py

573 lines
22 KiB
Python
Raw Permalink Normal View History

2020-11-30 04:31:02 +01:00
import datetime
import itertools
2020-11-30 04:31:02 +01:00
import logging
import multiprocessing as mp
2020-11-30 04:31:02 +01:00
import os
import queue
import random
2021-07-09 22:14:16 +02:00
import shutil
import string
2020-11-30 04:31:02 +01:00
import subprocess as sp
import threading
from collections import defaultdict
2020-11-30 04:31:02 +01:00
from pathlib import Path
import psutil
from peewee import JOIN, DoesNotExist
2021-07-09 22:14:16 +02:00
2021-12-11 20:11:39 +01:00
from frigate.config import RetainModeEnum, FrigateConfig
2021-07-09 22:14:16 +02:00
from frigate.const import CACHE_DIR, RECORD_DIR
from frigate.models import Event, Recordings
2021-12-11 20:11:39 +01:00
from frigate.util import area
2020-11-30 04:31:02 +01:00
logger = logging.getLogger(__name__)
SECONDS_IN_DAY = 60 * 60 * 24
2021-02-17 14:23:32 +01:00
2020-12-01 04:08:47 +01:00
def remove_empty_directories(directory):
2021-02-17 14:23:32 +01:00
# list all directories recursively and sort them by path,
# longest first
paths = sorted(
[x[0] for x in os.walk(RECORD_DIR)],
key=lambda p: len(str(p)),
reverse=True,
)
for path in paths:
# don't delete the parent
if path == RECORD_DIR:
continue
if len(os.listdir(path)) == 0:
os.rmdir(path)
2020-12-01 04:08:47 +01:00
2020-11-30 04:31:02 +01:00
class RecordingMaintainer(threading.Thread):
def __init__(
self, config: FrigateConfig, recordings_info_queue: mp.Queue, stop_event
):
2020-11-30 04:31:02 +01:00
threading.Thread.__init__(self)
2021-02-17 14:23:32 +01:00
self.name = "recording_maint"
2020-11-30 04:31:02 +01:00
self.config = config
self.recordings_info_queue = recordings_info_queue
2020-11-30 04:31:02 +01:00
self.stop_event = stop_event
self.recordings_info = defaultdict(list)
self.end_time_cache = {}
2020-11-30 04:31:02 +01:00
def move_files(self):
cache_files = sorted(
[
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
and not d.startswith("clip_")
]
)
2020-11-30 04:31:02 +01:00
files_in_use = []
for process in psutil.process_iter():
try:
2021-02-17 14:23:32 +01:00
if process.name() != "ffmpeg":
2020-12-24 21:23:59 +01:00
continue
2020-11-30 04:31:02 +01:00
flist = process.open_files()
if flist:
for nt in flist:
2021-07-09 22:14:16 +02:00
if nt.path.startswith(CACHE_DIR):
2021-02-17 14:23:32 +01:00
files_in_use.append(nt.path.split("/")[-1])
2020-11-30 04:31:02 +01:00
except:
continue
# group recordings by camera
grouped_recordings = defaultdict(list)
for f in cache_files:
# Skip files currently in use
2020-11-30 04:31:02 +01:00
if f in files_in_use:
continue
2021-07-09 22:14:16 +02:00
cache_path = os.path.join(CACHE_DIR, f)
basename = os.path.splitext(f)[0]
camera, date = basename.rsplit("-", maxsplit=1)
start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
grouped_recordings[camera].append(
{
"cache_path": cache_path,
"start_time": start_time,
}
2021-02-17 14:23:32 +01:00
)
2020-11-30 04:31:02 +01:00
2021-11-19 14:16:29 +01:00
# delete all cached files past the most recent 5
keep_count = 5
2021-11-17 15:57:57 +01:00
for camera in grouped_recordings.keys():
segment_count = len(grouped_recordings[camera])
if segment_count > keep_count:
2022-07-20 13:55:06 +02:00
####
# Need to find a way to tell if these are aging out based on retention settings or if the system is overloaded.
####
# logger.warning(
# f"Too many recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {segment_count}, discarding the rest..."
# )
to_remove = grouped_recordings[camera][:-keep_count]
2021-11-17 15:57:57 +01:00
for f in to_remove:
cache_path = f["cache_path"]
2022-07-20 13:55:06 +02:00
####
# Need to find a way to tell if these are aging out based on retention settings or if the system is overloaded.
####
# logger.warning(f"Discarding a recording segment: {cache_path}")
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
2021-11-11 04:12:41 +01:00
for camera, recordings in grouped_recordings.items():
# clear out all the recording info for old frames
while (
len(self.recordings_info[camera]) > 0
and self.recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp()
):
self.recordings_info[camera].pop(0)
# get all events with the end time after the start of the oldest cache file
# or with end_time None
events: Event = (
Event.select()
.where(
Event.camera == camera,
(Event.end_time == None)
| (Event.end_time >= recordings[0]["start_time"].timestamp()),
Event.has_clip,
)
.order_by(Event.start_time)
)
for r in recordings:
cache_path = r["cache_path"]
start_time = r["start_time"]
# Just delete files if recordings are turned off
if (
not camera in self.config.cameras
or not self.config.cameras[camera].record.enabled
):
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
continue
2020-11-30 04:31:02 +01:00
if cache_path in self.end_time_cache:
2021-11-19 23:56:00 +01:00
end_time, duration = self.end_time_cache[cache_path]
else:
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{cache_path}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0:
duration = float(p.stdout.decode().strip())
2021-11-19 23:56:00 +01:00
end_time = start_time + datetime.timedelta(seconds=duration)
self.end_time_cache[cache_path] = (end_time, duration)
else:
logger.warning(f"Discarding a corrupt recording segment: {f}")
Path(cache_path).unlink(missing_ok=True)
continue
2021-10-22 14:23:18 +02:00
# if cached file's start_time is earlier than the retain days for the camera
if start_time <= (
(
datetime.datetime.now()
- datetime.timedelta(
days=self.config.cameras[camera].record.retain.days
)
)
):
# if the cached segment overlaps with the events:
overlaps = False
for event in events:
# if the event starts in the future, stop checking events
# and remove this segment
if event.start_time > end_time.timestamp():
overlaps = False
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if (
event.end_time is None
or event.end_time >= start_time.timestamp()
):
overlaps = True
break
if overlaps:
2021-12-11 20:11:39 +01:00
record_mode = self.config.cameras[
camera
].record.events.retain.mode
# move from cache to recordings immediately
self.store_segment(
camera,
start_time,
end_time,
duration,
cache_path,
2021-12-11 20:11:39 +01:00
record_mode,
)
# else retain days includes this segment
else:
2021-12-11 20:11:39 +01:00
record_mode = self.config.cameras[camera].record.retain.mode
self.store_segment(
2021-12-11 20:11:39 +01:00
camera, start_time, end_time, duration, cache_path, record_mode
)
2021-12-11 20:11:39 +01:00
def segment_stats(self, camera, start_time, end_time):
active_count = 0
motion_count = 0
for frame in self.recordings_info[camera]:
# frame is after end time of segment
if frame[0] > end_time.timestamp():
break
# frame is before start time of segment
if frame[0] < start_time.timestamp():
continue
active_count += len(
[
o
for o in frame[1]
2022-02-06 16:56:06 +01:00
if not o["false_positive"] and o["motionless_count"] == 0
2021-12-11 20:11:39 +01:00
]
)
motion_count += sum([area(box) for box in frame[2]])
return (motion_count, active_count)
def store_segment(
self,
camera,
start_time,
end_time,
duration,
cache_path,
store_mode: RetainModeEnum,
):
motion_count, active_count = self.segment_stats(camera, start_time, end_time)
# check if the segment shouldn't be stored
if (store_mode == RetainModeEnum.motion and motion_count == 0) or (
store_mode == RetainModeEnum.active_objects and active_count == 0
):
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
return
directory = os.path.join(RECORD_DIR, start_time.strftime("%Y-%m/%d/%H"), camera)
if not os.path.exists(directory):
os.makedirs(directory)
file_name = f"{start_time.strftime('%M.%S.mp4')}"
file_path = os.path.join(directory, file_name)
try:
2021-11-11 04:12:41 +01:00
start_frame = datetime.datetime.now().timestamp()
# copy then delete is required when recordings are stored on some network drives
shutil.copyfile(cache_path, file_path)
2021-11-11 04:12:41 +01:00
logger.debug(
f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds."
)
os.remove(cache_path)
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
Recordings.create(
id=f"{start_time.timestamp()}-{rand_id}",
camera=camera,
path=file_path,
start_time=start_time.timestamp(),
end_time=end_time.timestamp(),
duration=duration,
motion=motion_count,
2022-02-06 20:28:45 +01:00
# TODO: update this to store list of active objects at some point
objects=active_count,
)
except Exception as e:
logger.error(f"Unable to store recording segment {cache_path}")
Path(cache_path).unlink(missing_ok=True)
logger.error(e)
2020-11-30 04:31:02 +01:00
# clear end_time cache
self.end_time_cache.pop(cache_path, None)
def run(self):
# Check for new files every 5 seconds
2021-10-22 14:23:18 +02:00
wait_time = 5
while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp()
# empty the recordings info queue
while True:
try:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = self.recordings_info_queue.get(False)
if self.config.cameras[camera].record.enabled:
self.recordings_info[camera].append(
(
frame_time,
current_tracked_objects,
motion_boxes,
regions,
)
)
except queue.Empty:
break
try:
self.move_files()
except Exception as e:
logger.error(
"Error occurred when attempting to maintain recording cache"
)
logger.error(e)
2021-11-17 15:57:57 +01:00
duration = datetime.datetime.now().timestamp() - run_start
wait_time = max(0, 5 - duration)
logger.info(f"Exiting recording maintenance...")
class RecordingCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event):
threading.Thread.__init__(self)
self.name = "recording_cleanup"
self.config = config
self.stop_event = stop_event
2021-08-11 14:39:03 +02:00
def clean_tmp_clips(self):
# delete any clips more than 5 minutes old
for p in Path("/tmp/cache").rglob("clip_*.mp4"):
logger.debug(f"Checking tmp clip {p}.")
if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1):
logger.debug("Deleting tmp clip.")
p.unlink(missing_ok=True)
2021-07-09 22:14:16 +02:00
def expire_recordings(self):
2021-07-11 06:22:45 +02:00
logger.debug("Start expire recordings (new).")
2021-07-09 22:14:16 +02:00
2021-07-11 06:22:45 +02:00
logger.debug("Start deleted cameras.")
2021-07-09 22:14:16 +02:00
# Handle deleted cameras
expire_days = self.config.record.retain.days
expire_before = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
2021-07-09 22:14:16 +02:00
no_camera_recordings: Recordings = Recordings.select().where(
Recordings.camera.not_in(list(self.config.cameras.keys())),
Recordings.end_time < expire_before,
2021-07-09 22:14:16 +02:00
)
deleted_recordings = set()
2021-07-09 22:14:16 +02:00
for recording in no_camera_recordings:
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
Recordings.delete().where(Recordings.id << deleted_recordings).execute()
2021-07-11 06:22:45 +02:00
logger.debug("End deleted cameras.")
2021-07-09 22:14:16 +02:00
2021-07-11 06:22:45 +02:00
logger.debug("Start all cameras.")
2021-07-09 22:14:16 +02:00
for camera, config in self.config.cameras.items():
2021-07-11 06:22:45 +02:00
logger.debug(f"Start camera: {camera}.")
# Get the timestamp for cutoff of retained days
expire_days = config.record.retain.days
expire_date = (
2021-07-11 06:22:45 +02:00
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
# Get recordings to check for expiration
recordings: Recordings = (
Recordings.select()
.where(
Recordings.camera == camera,
Recordings.end_time < expire_date,
)
.order_by(Recordings.start_time)
2021-07-09 22:14:16 +02:00
)
# Get all the events to check against
events: Event = (
Event.select()
.where(
Event.camera == camera,
# need to ensure segments for all events starting
# before the expire date are included
Event.start_time < expire_date,
Event.has_clip,
)
.order_by(Event.start_time)
.objects()
)
# loop over recordings and see if they overlap with any non-expired events
2021-12-11 20:11:39 +01:00
# TODO: expire segments based on segment stats according to config
event_start = 0
2021-09-03 03:40:38 +02:00
deleted_recordings = set()
for recording in recordings.objects().iterator():
2021-07-11 06:22:45 +02:00
keep = False
# Now look for a reason to keep this recording segment
for idx in range(event_start, len(events)):
event = events[idx]
2021-09-03 13:31:06 +02:00
# if the event starts in the future, stop checking events
# and let this recording segment expire
if event.start_time > recording.end_time:
keep = False
break
2021-07-11 06:22:45 +02:00
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event.end_time is None or event.end_time >= recording.start_time:
keep = True
break
2021-09-03 13:31:06 +02:00
# if the event ends before this recording segment starts, skip
# this event and check the next event for an overlap.
# since the events and recordings are sorted, we can skip events
# that end before the previous recording segment started on future segments
if event.end_time < recording.start_time:
event_start = idx
# Delete recordings outside of the retention window or based on the retention mode
if (
not keep
or (
config.record.events.retain.mode == RetainModeEnum.motion
and recording.motion == 0
)
or (
config.record.events.retain.mode
== RetainModeEnum.active_objects
and recording.objects == 0
)
):
2021-07-09 22:14:16 +02:00
Path(recording.path).unlink(missing_ok=True)
2021-09-03 03:40:38 +02:00
deleted_recordings.add(recording.id)
2021-09-03 13:31:06 +02:00
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
2022-04-27 13:49:59 +02:00
# delete up to 100,000 at a time
max_deletes = 100000
deleted_recordings_list = list(deleted_recordings)
for i in range(0, len(deleted_recordings_list), max_deletes):
Recordings.delete().where(
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
2021-07-11 06:22:45 +02:00
logger.debug(f"End camera: {camera}.")
logger.debug("End all cameras.")
logger.debug("End expire recordings (new).")
2021-07-09 22:14:16 +02:00
2020-11-30 04:31:02 +01:00
def expire_files(self):
2021-07-11 06:22:45 +02:00
logger.debug("Start expire files (legacy).")
2021-07-09 22:14:16 +02:00
default_expire = (
datetime.datetime.now().timestamp()
- SECONDS_IN_DAY * self.config.record.retain.days
2021-07-09 22:14:16 +02:00
)
2020-11-30 04:31:02 +01:00
delete_before = {}
2020-11-30 04:31:02 +01:00
for name, camera in self.config.cameras.items():
2021-02-17 14:23:32 +01:00
delete_before[name] = (
datetime.datetime.now().timestamp()
- SECONDS_IN_DAY * camera.record.retain.days
2021-02-17 14:23:32 +01:00
)
# find all the recordings older than the oldest recording in the db
try:
2021-10-22 03:46:25 +02:00
oldest_recording = Recordings.select().order_by(Recordings.start_time).get()
p = Path(oldest_recording.path)
oldest_timestamp = p.stat().st_mtime - 1
except DoesNotExist:
oldest_timestamp = datetime.datetime.now().timestamp()
2021-11-11 04:12:41 +01:00
except FileNotFoundError:
logger.warning(f"Unable to find file from recordings database: {p}")
Recordings.delete().where(Recordings.id == oldest_recording.id).execute()
return
logger.debug(f"Oldest recording in the db: {oldest_timestamp}")
process = sp.run(
2021-10-22 03:46:25 +02:00
["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"],
capture_output=True,
text=True,
)
files_to_check = process.stdout.splitlines()
2020-11-30 04:31:02 +01:00
for f in files_to_check:
p = Path(f)
2021-11-21 14:26:31 +01:00
try:
if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire):
p.unlink(missing_ok=True)
except FileNotFoundError:
logger.warning(f"Attempted to expire missing file: {f}")
2020-11-30 04:31:02 +01:00
2021-07-11 06:22:45 +02:00
logger.debug("End expire files (legacy).")
def sync_recordings(self):
logger.debug("Start sync recordings.")
# get all recordings in the db
recordings: Recordings = Recordings.select()
# get all recordings files on disk
process = sp.run(
["find", RECORD_DIR, "-type", "f"],
capture_output=True,
text=True,
)
files_on_disk = process.stdout.splitlines()
recordings_to_delete = []
for recording in recordings.objects().iterator():
if not recording.path in files_on_disk:
recordings_to_delete.append(recording.id)
logger.debug(
f"Deleting {len(recordings_to_delete)} recordings with missing files"
)
2022-04-27 13:49:59 +02:00
# delete up to 100,000 at a time
max_deletes = 100000
for i in range(0, len(recordings_to_delete), max_deletes):
Recordings.delete().where(
Recordings.id << recordings_to_delete[i : i + max_deletes]
).execute()
logger.debug("End sync recordings.")
2020-11-30 04:31:02 +01:00
def run(self):
2021-12-13 13:51:03 +01:00
# on startup sync recordings with disk (disabled due to too much CPU usage)
# self.sync_recordings()
# Expire tmp clips every minute, recordings and clean directories every hour.
for counter in itertools.cycle(range(self.config.record.expire_interval)):
if self.stop_event.wait(60):
logger.info(f"Exiting recording cleanup...")
2020-11-30 04:31:02 +01:00
break
2021-08-11 14:39:03 +02:00
self.clean_tmp_clips()
2021-07-09 22:14:16 +02:00
if counter == 0:
self.expire_recordings()
2020-11-30 04:31:02 +01:00
self.expire_files()
2020-12-01 14:22:23 +01:00
remove_empty_directories(RECORD_DIR)