2023-04-26 15:25:26 +02:00
""" Maintain recording segments in cache. """
2023-06-16 15:09:13 +02:00
import asyncio
2020-11-30 04:31:02 +01:00
import datetime
import logging
2023-07-16 14:42:56 +02:00
import multiprocessing as mp
2020-11-30 04:31:02 +01:00
import os
2021-12-11 05:56:29 +01:00
import queue
2021-06-07 03:24:36 +02:00
import random
import string
2020-11-30 04:31:02 +01:00
import threading
2021-10-23 23:18:13 +02:00
from collections import defaultdict
2023-04-26 15:25:26 +02:00
from multiprocessing . synchronize import Event as MpEvent
2020-11-30 04:31:02 +01:00
from pathlib import Path
2023-07-15 02:05:14 +02:00
from typing import Any , Optional , Tuple
2021-07-09 22:14:16 +02:00
2023-07-15 02:05:14 +02:00
import numpy as np
2023-05-29 12:31:17 +02:00
import psutil
from frigate . config import FrigateConfig , RetainModeEnum
2023-07-26 12:55:08 +02:00
from frigate . const import (
CACHE_DIR ,
INSERT_MANY_RECORDINGS ,
MAX_SEGMENT_DURATION ,
RECORD_DIR ,
)
2021-07-09 22:14:16 +02:00
from frigate . models import Event , Recordings
2023-07-01 15:18:33 +02:00
from frigate . types import FeatureMetricsTypes
2023-07-06 16:28:50 +02:00
from frigate . util . image import area
from frigate . util . services import get_video_properties
2020-11-30 04:31:02 +01:00
logger = logging . getLogger ( __name__ )
2020-12-01 04:08:47 +01:00
2023-07-16 20:07:15 +02:00
class SegmentInfo :
def __init__ (
self , motion_box_count : int , active_object_count : int , average_dBFS : int
) - > None :
self . motion_box_count = motion_box_count
self . active_object_count = active_object_count
self . average_dBFS = average_dBFS
def should_discard_segment ( self , retain_mode : RetainModeEnum ) - > bool :
return (
retain_mode == RetainModeEnum . motion
and self . motion_box_count == 0
and self . average_dBFS == 0
) or (
retain_mode == RetainModeEnum . active_objects
and self . active_object_count == 0
)
2020-11-30 04:31:02 +01:00
class RecordingMaintainer ( threading . Thread ) :
2021-12-11 05:56:29 +01:00
def __init__ (
2023-04-26 15:25:26 +02:00
self ,
config : FrigateConfig ,
2023-07-26 12:55:08 +02:00
inter_process_queue : mp . Queue ,
2023-07-16 14:42:56 +02:00
object_recordings_info_queue : mp . Queue ,
audio_recordings_info_queue : Optional [ mp . Queue ] ,
2023-07-01 15:18:33 +02:00
process_info : dict [ str , FeatureMetricsTypes ] ,
2023-04-26 15:25:26 +02:00
stop_event : MpEvent ,
2021-12-11 05:56:29 +01:00
) :
2020-11-30 04:31:02 +01:00
threading . Thread . __init__ ( self )
2023-04-26 15:25:26 +02:00
self . name = " recording_maintainer "
2020-11-30 04:31:02 +01:00
self . config = config
2023-07-26 12:55:08 +02:00
self . inter_process_queue = inter_process_queue
2023-07-15 02:05:14 +02:00
self . object_recordings_info_queue = object_recordings_info_queue
self . audio_recordings_info_queue = audio_recordings_info_queue
2023-04-26 15:25:26 +02:00
self . process_info = process_info
2020-11-30 04:31:02 +01:00
self . stop_event = stop_event
2023-07-15 02:05:14 +02:00
self . object_recordings_info : dict [ str , list ] = defaultdict ( list )
self . audio_recordings_info : dict [ str , list ] = defaultdict ( list )
2023-04-26 15:25:26 +02:00
self . end_time_cache : dict [ str , Tuple [ datetime . datetime , float ] ] = { }
2020-11-30 04:31:02 +01:00
2023-06-16 15:09:13 +02:00
async def move_files ( self ) - > None :
2021-12-11 05:56:29 +01:00
cache_files = sorted (
[
d
for d in os . listdir ( CACHE_DIR )
if os . path . isfile ( os . path . join ( CACHE_DIR , d ) )
and d . endswith ( " .mp4 " )
and not d . startswith ( " clip_ " )
]
)
2020-11-30 04:31:02 +01:00
files_in_use = [ ]
for process in psutil . process_iter ( ) :
try :
2021-02-17 14:23:32 +01:00
if process . name ( ) != " ffmpeg " :
2020-12-24 21:23:59 +01:00
continue
2020-11-30 04:31:02 +01:00
flist = process . open_files ( )
if flist :
for nt in flist :
2021-07-09 22:14:16 +02:00
if nt . path . startswith ( CACHE_DIR ) :
2021-02-17 14:23:32 +01:00
files_in_use . append ( nt . path . split ( " / " ) [ - 1 ] )
2023-05-29 12:31:17 +02:00
except psutil . Error :
2020-11-30 04:31:02 +01:00
continue
2021-10-23 23:18:13 +02:00
# group recordings by camera
2023-04-26 15:25:26 +02:00
grouped_recordings : defaultdict [ str , list [ dict [ str , Any ] ] ] = defaultdict ( list )
for cache in cache_files :
2021-07-11 21:34:48 +02:00
# Skip files currently in use
2023-04-26 15:25:26 +02:00
if cache in files_in_use :
2020-11-30 04:31:02 +01:00
continue
2023-04-26 15:25:26 +02:00
cache_path = os . path . join ( CACHE_DIR , cache )
basename = os . path . splitext ( cache ) [ 0 ]
2021-05-22 05:35:25 +02:00
camera , date = basename . rsplit ( " - " , maxsplit = 1 )
start_time = datetime . datetime . strptime ( date , " % Y % m %d % H % M % S " )
2021-10-23 23:18:13 +02:00
grouped_recordings [ camera ] . append (
{
" cache_path " : cache_path ,
" start_time " : start_time ,
}
2021-02-17 14:23:32 +01:00
)
2020-11-30 04:31:02 +01:00
2021-11-19 14:16:29 +01:00
# delete all cached files past the most recent 5
2021-11-19 14:19:45 +01:00
keep_count = 5
2021-11-17 15:57:57 +01:00
for camera in grouped_recordings . keys ( ) :
2022-07-19 14:24:44 +02:00
segment_count = len ( grouped_recordings [ camera ] )
if segment_count > keep_count :
2023-01-31 00:42:53 +01:00
logger . warning (
f " Unable to keep up with recording segments in cache for { camera } . Keeping the { keep_count } most recent segments out of { segment_count } and discarding the rest... "
)
2021-12-11 05:56:29 +01:00
to_remove = grouped_recordings [ camera ] [ : - keep_count ]
2023-04-26 15:25:26 +02:00
for rec in to_remove :
cache_path = rec [ " cache_path " ]
2022-07-19 14:24:44 +02:00
Path ( cache_path ) . unlink ( missing_ok = True )
self . end_time_cache . pop ( cache_path , None )
2021-12-11 05:56:29 +01:00
grouped_recordings [ camera ] = grouped_recordings [ camera ] [ - keep_count : ]
2021-11-11 04:12:41 +01:00
2023-07-21 14:29:50 +02:00
tasks = [ ]
2021-10-23 23:18:13 +02:00
for camera , recordings in grouped_recordings . items ( ) :
2023-07-15 02:05:14 +02:00
# clear out all the object recording info for old frames
2021-12-11 05:56:29 +01:00
while (
2023-07-15 02:05:14 +02:00
len ( self . object_recordings_info [ camera ] ) > 0
and self . object_recordings_info [ camera ] [ 0 ] [ 0 ]
2021-12-11 05:56:29 +01:00
< recordings [ 0 ] [ " start_time " ] . timestamp ( )
) :
2023-07-15 02:05:14 +02:00
self . object_recordings_info [ camera ] . pop ( 0 )
# clear out all the audio recording info for old frames
while (
len ( self . audio_recordings_info [ camera ] ) > 0
and self . audio_recordings_info [ camera ] [ 0 ] [ 0 ]
< recordings [ 0 ] [ " start_time " ] . timestamp ( )
) :
self . audio_recordings_info [ camera ] . pop ( 0 )
2021-12-11 05:56:29 +01:00
2021-10-23 23:18:13 +02:00
# get all events with the end time after the start of the oldest cache file
# or with end_time None
events : Event = (
Event . select ( )
. where (
Event . camera == camera ,
2023-06-11 14:18:47 +02:00
( Event . end_time == None )
2021-11-21 16:43:37 +01:00
| ( Event . end_time > = recordings [ 0 ] [ " start_time " ] . timestamp ( ) ) ,
2021-10-23 23:18:13 +02:00
Event . has_clip ,
)
. order_by ( Event . start_time )
)
2020-11-30 04:31:02 +01:00
2023-07-21 14:29:50 +02:00
tasks . extend (
[ self . validate_and_move_segment ( camera , events , r ) for r in recordings ]
2023-06-16 15:09:13 +02:00
)
2022-11-02 12:37:27 +01:00
2023-07-21 14:29:50 +02:00
recordings_to_insert : list [ Optional [ Recordings ] ] = await asyncio . gather ( * tasks )
2023-07-26 12:55:08 +02:00
# fire and forget recordings entries
self . inter_process_queue . put (
( INSERT_MANY_RECORDINGS , [ r for r in recordings_to_insert if r is not None ] )
)
2023-07-21 14:29:50 +02:00
2023-06-16 15:09:13 +02:00
async def validate_and_move_segment (
self , camera : str , events : Event , recording : dict [ str , any ]
) - > None :
cache_path = recording [ " cache_path " ]
start_time = recording [ " start_time " ]
2021-10-22 14:23:18 +02:00
2023-06-16 15:09:13 +02:00
# Just delete files if recordings are turned off
if (
camera not in self . config . cameras
or not self . process_info [ camera ] [ " record_enabled " ] . value
) :
Path ( cache_path ) . unlink ( missing_ok = True )
self . end_time_cache . pop ( cache_path , None )
return
if cache_path in self . end_time_cache :
end_time , duration = self . end_time_cache [ cache_path ]
else :
2023-07-26 12:55:08 +02:00
segment_info = await get_video_properties ( cache_path , get_duration = True )
2023-06-16 15:09:13 +02:00
if segment_info [ " duration " ] :
duration = float ( segment_info [ " duration " ] )
else :
duration = - 1
# ensure duration is within expected length
if 0 < duration < MAX_SEGMENT_DURATION :
end_time = start_time + datetime . timedelta ( seconds = duration )
self . end_time_cache [ cache_path ] = ( end_time , duration )
else :
if duration == - 1 :
logger . warning ( f " Failed to probe corrupt segment { cache_path } " )
logger . warning ( f " Discarding a corrupt recording segment: { cache_path } " )
Path ( cache_path ) . unlink ( missing_ok = True )
return
# if cached file's start_time is earlier than the retain days for the camera
if start_time < = (
(
datetime . datetime . now ( )
- datetime . timedelta (
days = self . config . cameras [ camera ] . record . retain . days
)
)
) :
# if the cached segment overlaps with the events:
overlaps = False
for event in events :
# if the event starts in the future, stop checking events
# and remove this segment
if event . start_time > end_time . timestamp ( ) :
2021-10-23 23:18:13 +02:00
overlaps = False
2023-06-16 15:09:13 +02:00
Path ( cache_path ) . unlink ( missing_ok = True )
self . end_time_cache . pop ( cache_path , None )
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event . end_time is None or event . end_time > = start_time . timestamp ( ) :
overlaps = True
break
if overlaps :
record_mode = self . config . cameras [ camera ] . record . events . retain . mode
# move from cache to recordings immediately
2023-07-26 12:55:08 +02:00
return await self . move_segment (
2023-06-16 15:09:13 +02:00
camera ,
start_time ,
end_time ,
duration ,
cache_path ,
record_mode ,
)
# if it doesn't overlap with an event, go ahead and drop the segment
# if it ends more than the configured pre_capture for the camera
else :
pre_capture = self . config . cameras [ camera ] . record . events . pre_capture
2023-07-15 02:05:14 +02:00
most_recently_processed_frame_time = self . object_recordings_info [
camera
] [ - 1 ] [ 0 ]
2023-06-16 15:09:13 +02:00
retain_cutoff = most_recently_processed_frame_time - pre_capture
if end_time . timestamp ( ) < retain_cutoff :
Path ( cache_path ) . unlink ( missing_ok = True )
self . end_time_cache . pop ( cache_path , None )
# else retain days includes this segment
else :
record_mode = self . config . cameras [ camera ] . record . retain . mode
2023-07-26 12:55:08 +02:00
return await self . move_segment (
2023-06-16 15:09:13 +02:00
camera , start_time , end_time , duration , cache_path , record_mode
)
2021-10-23 23:18:13 +02:00
2023-04-26 15:25:26 +02:00
def segment_stats (
self , camera : str , start_time : datetime . datetime , end_time : datetime . datetime
2023-07-16 20:07:15 +02:00
) - > SegmentInfo :
2021-12-11 20:11:39 +01:00
active_count = 0
motion_count = 0
2023-07-15 02:05:14 +02:00
for frame in self . object_recordings_info [ camera ] :
2021-12-11 20:11:39 +01:00
# frame is after end time of segment
if frame [ 0 ] > end_time . timestamp ( ) :
break
# frame is before start time of segment
if frame [ 0 ] < start_time . timestamp ( ) :
continue
active_count + = len (
[
o
for o in frame [ 1 ]
2022-02-06 16:56:06 +01:00
if not o [ " false_positive " ] and o [ " motionless_count " ] == 0
2021-12-11 20:11:39 +01:00
]
)
motion_count + = sum ( [ area ( box ) for box in frame [ 2 ] ] )
2023-07-15 02:05:14 +02:00
audio_values = [ ]
for frame in self . audio_recordings_info [ camera ] :
# frame is after end time of segment
if frame [ 0 ] > end_time . timestamp ( ) :
break
# frame is before start time of segment
if frame [ 0 ] < start_time . timestamp ( ) :
continue
audio_values . append ( frame [ 1 ] )
average_dBFS = 0 if not audio_values else np . average ( audio_values )
2023-07-16 20:07:15 +02:00
return SegmentInfo ( motion_count , active_count , round ( average_dBFS ) )
2021-12-11 20:11:39 +01:00
2023-07-26 12:55:08 +02:00
async def move_segment (
2021-12-11 20:11:39 +01:00
self ,
2023-04-26 15:25:26 +02:00
camera : str ,
2022-12-11 14:45:32 +01:00
start_time : datetime . datetime ,
end_time : datetime . datetime ,
2023-04-26 15:25:26 +02:00
duration : float ,
cache_path : str ,
2021-12-11 20:11:39 +01:00
store_mode : RetainModeEnum ,
2023-07-21 14:29:50 +02:00
) - > Optional [ Recordings ] :
2023-07-16 20:07:15 +02:00
segment_info = self . segment_stats ( camera , start_time , end_time )
2021-12-11 20:11:39 +01:00
# check if the segment shouldn't be stored
2023-07-16 20:07:15 +02:00
if segment_info . should_discard_segment ( store_mode ) :
2021-12-11 20:11:39 +01:00
Path ( cache_path ) . unlink ( missing_ok = True )
self . end_time_cache . pop ( cache_path , None )
return
2022-12-11 14:45:32 +01:00
directory = os . path . join (
RECORD_DIR ,
2023-01-12 12:53:38 +01:00
start_time . astimezone ( tz = datetime . timezone . utc ) . strftime ( " % Y- % m- %d / % H " ) ,
2022-12-11 14:45:32 +01:00
camera ,
)
2021-10-23 23:18:13 +02:00
if not os . path . exists ( directory ) :
os . makedirs ( directory )
2022-12-11 14:45:32 +01:00
file_name = (
f " { start_time . replace ( tzinfo = datetime . timezone . utc ) . strftime ( ' % M. % S.mp4 ' ) } "
)
2021-10-23 23:18:13 +02:00
file_path = os . path . join ( directory , file_name )
2021-11-09 14:05:21 +01:00
try :
2022-10-02 01:11:29 +02:00
if not os . path . exists ( file_path ) :
start_frame = datetime . datetime . now ( ) . timestamp ( )
2022-12-18 00:53:34 +01:00
# add faststart to kept segments to improve metadata reading
2023-07-26 12:55:08 +02:00
p = await asyncio . create_subprocess_exec (
2022-12-18 00:53:34 +01:00
" ffmpeg " ,
2023-06-01 12:46:34 +02:00
" -hide_banner " ,
2022-12-18 00:53:34 +01:00
" -y " ,
" -i " ,
cache_path ,
" -c " ,
" copy " ,
" -movflags " ,
" +faststart " ,
file_path ,
2023-07-26 12:55:08 +02:00
stderr = asyncio . subprocess . PIPE ,
2022-10-02 01:11:29 +02:00
)
2023-07-26 12:55:08 +02:00
await p . wait ( )
2021-11-09 14:05:21 +01:00
2022-12-18 00:53:34 +01:00
if p . returncode != 0 :
logger . error ( f " Unable to convert { cache_path } to { file_path } " )
2023-07-26 12:55:08 +02:00
logger . error ( ( await p . stderr . read ( ) ) . decode ( " ascii " ) )
2023-07-21 14:29:50 +02:00
return None
2022-12-18 00:53:34 +01:00
else :
logger . debug (
f " Copied { file_path } in { datetime . datetime . now ( ) . timestamp ( ) - start_frame } seconds. "
)
2022-10-09 13:28:26 +02:00
try :
2022-12-18 00:53:34 +01:00
# get the segment size of the cache file
# file without faststart is same size
2022-10-09 13:28:26 +02:00
segment_size = round (
2023-06-11 21:49:13 +02:00
float ( os . path . getsize ( cache_path ) ) / pow ( 2 , 20 ) , 1
2022-10-09 13:28:26 +02:00
)
except OSError :
segment_size = 0
os . remove ( cache_path )
2022-10-02 01:11:29 +02:00
rand_id = " " . join (
random . choices ( string . ascii_lowercase + string . digits , k = 6 )
)
2023-07-21 14:29:50 +02:00
return {
Recordings . id : f " { start_time . timestamp ( ) } - { rand_id } " ,
Recordings . camera : camera ,
Recordings . path : file_path ,
Recordings . start_time : start_time . timestamp ( ) ,
Recordings . end_time : end_time . timestamp ( ) ,
Recordings . duration : duration ,
Recordings . motion : segment_info . motion_box_count ,
2022-10-02 01:11:29 +02:00
# TODO: update this to store list of active objects at some point
2023-07-21 14:29:50 +02:00
Recordings . objects : segment_info . active_object_count ,
Recordings . dBFS : segment_info . average_dBFS ,
Recordings . segment_size : segment_size ,
}
2021-11-09 14:05:21 +01:00
except Exception as e :
logger . error ( f " Unable to store recording segment { cache_path } " )
Path ( cache_path ) . unlink ( missing_ok = True )
logger . error ( e )
2020-11-30 04:31:02 +01:00
2021-11-19 14:19:45 +01:00
# clear end_time cache
self . end_time_cache . pop ( cache_path , None )
2023-07-21 14:29:50 +02:00
return None
2021-11-19 14:19:45 +01:00
2023-04-26 15:25:26 +02:00
def run ( self ) - > None :
2021-07-11 21:34:48 +02:00
# Check for new files every 5 seconds
2023-07-21 14:29:50 +02:00
wait_time = 0.0
2021-10-22 14:23:18 +02:00
while not self . stop_event . wait ( wait_time ) :
run_start = datetime . datetime . now ( ) . timestamp ( )
2021-12-11 05:56:29 +01:00
2023-07-15 02:05:14 +02:00
# empty the object recordings info queue
2021-12-11 05:56:29 +01:00
while True :
try :
(
camera ,
frame_time ,
current_tracked_objects ,
motion_boxes ,
regions ,
2023-07-15 02:05:14 +02:00
) = self . object_recordings_info_queue . get ( False )
2021-12-11 05:56:29 +01:00
2023-04-26 15:25:26 +02:00
if self . process_info [ camera ] [ " record_enabled " ] . value :
2023-07-15 02:05:14 +02:00
self . object_recordings_info [ camera ] . append (
2021-12-11 05:56:29 +01:00
(
frame_time ,
current_tracked_objects ,
motion_boxes ,
regions ,
)
)
except queue . Empty :
break
2023-07-15 02:05:14 +02:00
# empty the audio recordings info queue if audio is enabled
if self . audio_recordings_info_queue :
while True :
try :
(
camera ,
frame_time ,
dBFS ,
) = self . audio_recordings_info_queue . get ( False )
if self . process_info [ camera ] [ " record_enabled " ] . value :
self . audio_recordings_info [ camera ] . append (
(
frame_time ,
dBFS ,
)
)
except queue . Empty :
break
2021-11-09 14:05:21 +01:00
try :
2023-06-16 15:09:13 +02:00
asyncio . run ( self . move_files ( ) )
2021-11-09 14:05:21 +01:00
except Exception as e :
logger . error (
" Error occurred when attempting to maintain recording cache "
)
logger . error ( e )
2021-11-17 15:57:57 +01:00
duration = datetime . datetime . now ( ) . timestamp ( ) - run_start
wait_time = max ( 0 , 5 - duration )
2021-07-11 21:34:48 +02:00
2023-05-29 12:31:17 +02:00
logger . info ( " Exiting recording maintenance... " )