retain frame data for recording maintenance

This commit is contained in:
Blake Blackshear 2021-12-10 22:56:29 -06:00
parent 93f418ac0b
commit ef214fb80a
3 changed files with 74 additions and 15 deletions

View File

@ -108,6 +108,9 @@ class FrigateApp:
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )
# Queue for recordings info
self.recordings_info_queue = mp.Queue()
def init_database(self): def init_database(self):
# Migrate DB location # Migrate DB location
old_db_path = os.path.join(CLIPS_DIR, "frigate.db") old_db_path = os.path.join(CLIPS_DIR, "frigate.db")
@ -206,6 +209,7 @@ class FrigateApp:
self.event_queue, self.event_queue,
self.event_processed_queue, self.event_processed_queue,
self.video_output_queue, self.video_output_queue,
self.recordings_info_queue,
self.stop_event, self.stop_event,
) )
self.detected_frames_processor.start() self.detected_frames_processor.start()
@ -273,7 +277,9 @@ class FrigateApp:
self.event_cleanup.start() self.event_cleanup.start()
def start_recording_maintainer(self): def start_recording_maintainer(self):
self.recording_maintainer = RecordingMaintainer(self.config, self.stop_event) self.recording_maintainer = RecordingMaintainer(
self.config, self.recordings_info_queue, self.stop_event
)
self.recording_maintainer.start() self.recording_maintainer.start()
def start_recording_cleanup(self): def start_recording_cleanup(self):

View File

@ -584,6 +584,7 @@ class TrackedObjectProcessor(threading.Thread):
event_queue, event_queue,
event_processed_queue, event_processed_queue,
video_output_queue, video_output_queue,
recordings_info_queue,
stop_event, stop_event,
): ):
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -595,6 +596,7 @@ class TrackedObjectProcessor(threading.Thread):
self.event_queue = event_queue self.event_queue = event_queue
self.event_processed_queue = event_processed_queue self.event_processed_queue = event_processed_queue
self.video_output_queue = video_output_queue self.video_output_queue = video_output_queue
self.recordings_info_queue = recordings_info_queue
self.stop_event = stop_event self.stop_event = stop_event
self.camera_states: Dict[str, CameraState] = {} self.camera_states: Dict[str, CameraState] = {}
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
@ -823,6 +825,17 @@ class TrackedObjectProcessor(threading.Thread):
) )
) )
# send info on this frame to the recordings maintainer
self.recordings_info_queue.put(
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
)
)
# update zone counts for each label # update zone counts for each label
# for each zone in the current camera # for each zone in the current camera
for zone in self.config.cameras[camera].zones.keys(): for zone in self.config.cameras[camera].zones.keys():

View File

@ -1,13 +1,15 @@
import datetime import datetime
import time
import itertools import itertools
import logging import logging
import multiprocessing as mp
import os import os
import queue
import random import random
import shutil import shutil
import string import string
import subprocess as sp import subprocess as sp
import threading import threading
import time
from collections import defaultdict from collections import defaultdict
from pathlib import Path from pathlib import Path
@ -40,22 +42,28 @@ def remove_empty_directories(directory):
class RecordingMaintainer(threading.Thread): class RecordingMaintainer(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event): def __init__(
self, config: FrigateConfig, recordings_info_queue: mp.Queue, stop_event
):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "recording_maint" self.name = "recording_maint"
self.config = config self.config = config
self.recordings_info_queue = recordings_info_queue
self.stop_event = stop_event self.stop_event = stop_event
self.first_pass = True self.first_pass = True
self.recordings_info = defaultdict(list)
self.end_time_cache = {} self.end_time_cache = {}
def move_files(self): def move_files(self):
cache_files = [ cache_files = sorted(
[
d d
for d in os.listdir(CACHE_DIR) for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d)) if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4") and d.endswith(".mp4")
and not d.startswith("clip_") and not d.startswith("clip_")
] ]
)
files_in_use = [] files_in_use = []
for process in psutil.process_iter(): for process in psutil.process_iter():
@ -93,16 +101,22 @@ class RecordingMaintainer(threading.Thread):
keep_count = 5 keep_count = 5
for camera in grouped_recordings.keys(): for camera in grouped_recordings.keys():
if len(grouped_recordings[camera]) > keep_count: if len(grouped_recordings[camera]) > keep_count:
sorted_recordings = sorted( to_remove = grouped_recordings[camera][:-keep_count]
grouped_recordings[camera], key=lambda i: i["start_time"]
)
to_remove = sorted_recordings[:-keep_count]
for f in to_remove: for f in to_remove:
Path(f["cache_path"]).unlink(missing_ok=True) Path(f["cache_path"]).unlink(missing_ok=True)
self.end_time_cache.pop(f["cache_path"], None) self.end_time_cache.pop(f["cache_path"], None)
grouped_recordings[camera] = sorted_recordings[-keep_count:] grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
for camera, recordings in grouped_recordings.items(): for camera, recordings in grouped_recordings.items():
# clear out all the recording info for old frames
while (
len(self.recordings_info[camera]) > 0
and self.recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp()
):
self.recordings_info[camera].pop(0)
# get all events with the end time after the start of the oldest cache file # get all events with the end time after the start of the oldest cache file
# or with end_time None # or with end_time None
events: Event = ( events: Event = (
@ -167,6 +181,8 @@ class RecordingMaintainer(threading.Thread):
# and remove this segment # and remove this segment
if event.start_time > end_time.timestamp(): if event.start_time > end_time.timestamp():
overlaps = False overlaps = False
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
break break
# if the event is in progress or ends after the recording starts, keep it # if the event is in progress or ends after the recording starts, keep it
@ -235,6 +251,30 @@ class RecordingMaintainer(threading.Thread):
wait_time = 5 wait_time = 5
while not self.stop_event.wait(wait_time): while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp() run_start = datetime.datetime.now().timestamp()
# empty the recordings info queue
while True:
try:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = self.recordings_info_queue.get(False)
if self.config.cameras[camera].record.enabled:
self.recordings_info[camera].append(
(
frame_time,
current_tracked_objects,
motion_boxes,
regions,
)
)
except queue.Empty:
break
try: try:
self.move_files() self.move_files()
except Exception as e: except Exception as e: