diff --git a/frigate/app.py b/frigate/app.py index 5833d5fe9..9be814b91 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -108,6 +108,9 @@ class FrigateApp: maxsize=len(self.config.cameras.keys()) * 2 ) + # Queue for recordings info + self.recordings_info_queue = mp.Queue() + def init_database(self): # Migrate DB location old_db_path = os.path.join(CLIPS_DIR, "frigate.db") @@ -206,6 +209,7 @@ class FrigateApp: self.event_queue, self.event_processed_queue, self.video_output_queue, + self.recordings_info_queue, self.stop_event, ) self.detected_frames_processor.start() @@ -273,7 +277,9 @@ class FrigateApp: self.event_cleanup.start() def start_recording_maintainer(self): - self.recording_maintainer = RecordingMaintainer(self.config, self.stop_event) + self.recording_maintainer = RecordingMaintainer( + self.config, self.recordings_info_queue, self.stop_event + ) self.recording_maintainer.start() def start_recording_cleanup(self): diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 9a8ad8cc6..f618f9af8 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -584,6 +584,7 @@ class TrackedObjectProcessor(threading.Thread): event_queue, event_processed_queue, video_output_queue, + recordings_info_queue, stop_event, ): threading.Thread.__init__(self) @@ -595,6 +596,7 @@ class TrackedObjectProcessor(threading.Thread): self.event_queue = event_queue self.event_processed_queue = event_processed_queue self.video_output_queue = video_output_queue + self.recordings_info_queue = recordings_info_queue self.stop_event = stop_event self.camera_states: Dict[str, CameraState] = {} self.frame_manager = SharedMemoryFrameManager() @@ -823,6 +825,17 @@ class TrackedObjectProcessor(threading.Thread): ) ) + # send info on this frame to the recordings maintainer + self.recordings_info_queue.put( + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) + ) + # update zone counts for each label # for each zone in the current camera for zone in self.config.cameras[camera].zones.keys(): diff --git a/frigate/record.py b/frigate/record.py index f20dfb5dc..41ed0cc7e 100644 --- a/frigate/record.py +++ b/frigate/record.py @@ -1,13 +1,15 @@ import datetime -import time import itertools import logging +import multiprocessing as mp import os +import queue import random import shutil import string import subprocess as sp import threading +import time from collections import defaultdict from pathlib import Path @@ -40,22 +42,28 @@ def remove_empty_directories(directory): class RecordingMaintainer(threading.Thread): - def __init__(self, config: FrigateConfig, stop_event): + def __init__( + self, config: FrigateConfig, recordings_info_queue: mp.Queue, stop_event + ): threading.Thread.__init__(self) self.name = "recording_maint" self.config = config + self.recordings_info_queue = recordings_info_queue self.stop_event = stop_event self.first_pass = True + self.recordings_info = defaultdict(list) self.end_time_cache = {} def move_files(self): - cache_files = [ - d - for d in os.listdir(CACHE_DIR) - if os.path.isfile(os.path.join(CACHE_DIR, d)) - and d.endswith(".mp4") - and not d.startswith("clip_") - ] + cache_files = sorted( + [ + d + for d in os.listdir(CACHE_DIR) + if os.path.isfile(os.path.join(CACHE_DIR, d)) + and d.endswith(".mp4") + and not d.startswith("clip_") + ] + ) files_in_use = [] for process in psutil.process_iter(): @@ -93,16 +101,22 @@ class RecordingMaintainer(threading.Thread): keep_count = 5 for camera in grouped_recordings.keys(): if len(grouped_recordings[camera]) > keep_count: - sorted_recordings = sorted( - grouped_recordings[camera], key=lambda i: i["start_time"] - ) - to_remove = sorted_recordings[:-keep_count] + to_remove = grouped_recordings[camera][:-keep_count] for f in to_remove: Path(f["cache_path"]).unlink(missing_ok=True) self.end_time_cache.pop(f["cache_path"], None) - grouped_recordings[camera] = sorted_recordings[-keep_count:] + grouped_recordings[camera] = grouped_recordings[camera][-keep_count:] for camera, recordings in grouped_recordings.items(): + + # clear out all the recording info for old frames + while ( + len(self.recordings_info[camera]) > 0 + and self.recordings_info[camera][0][0] + < recordings[0]["start_time"].timestamp() + ): + self.recordings_info[camera].pop(0) + # get all events with the end time after the start of the oldest cache file # or with end_time None events: Event = ( @@ -167,6 +181,8 @@ class RecordingMaintainer(threading.Thread): # and remove this segment if event.start_time > end_time.timestamp(): overlaps = False + Path(cache_path).unlink(missing_ok=True) + self.end_time_cache.pop(cache_path, None) break # if the event is in progress or ends after the recording starts, keep it @@ -235,6 +251,30 @@ class RecordingMaintainer(threading.Thread): wait_time = 5 while not self.stop_event.wait(wait_time): run_start = datetime.datetime.now().timestamp() + + # empty the recordings info queue + while True: + try: + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = self.recordings_info_queue.get(False) + + if self.config.cameras[camera].record.enabled: + self.recordings_info[camera].append( + ( + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) + ) + except queue.Empty: + break + try: self.move_files() except Exception as e: