blakeblackshear.frigate/frigate/events.py
2020-12-05 08:48:46 -06:00

190 lines
7.1 KiB
Python

import datetime
import json
import logging
import os
import queue
import subprocess as sp
import threading
import time
from collections import defaultdict
import psutil
from frigate.models import Event
logger = logging.getLogger(__name__)
class EventProcessor(threading.Thread):
def __init__(self, config, camera_processes, event_queue, stop_event):
threading.Thread.__init__(self)
self.name = 'event_processor'
self.config = config
self.cache_dir = self.config.save_clips.cache_dir
self.clips_dir = self.config.save_clips.clips_dir
self.camera_processes = camera_processes
self.cached_clips = {}
self.event_queue = event_queue
self.events_in_process = {}
self.stop_event = stop_event
def refresh_cache(self):
cached_files = os.listdir(self.cache_dir)
files_in_use = []
for process_data in self.camera_processes.values():
try:
ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_pid'].value)
flist = ffmpeg_process.open_files()
if flist:
for nt in flist:
if nt.path.startswith(self.cache_dir):
files_in_use.append(nt.path.split('/')[-1])
except:
continue
for f in cached_files:
if f in files_in_use or f in self.cached_clips:
continue
camera = '-'.join(f.split('-')[:-1])
start_time = datetime.datetime.strptime(f.split('-')[-1].split('.')[0], '%Y%m%d%H%M%S')
ffprobe_cmd = " ".join([
'ffprobe',
'-v',
'error',
'-show_entries',
'format=duration',
'-of',
'default=noprint_wrappers=1:nokey=1',
f"{os.path.join(self.cache_dir,f)}"
])
p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
if p_status == 0:
duration = float(output.decode('utf-8').strip())
else:
logger.info(f"bad file: {f}")
os.remove(os.path.join(self.cache_dir,f))
continue
self.cached_clips[f] = {
'path': f,
'camera': camera,
'start_time': start_time.timestamp(),
'duration': duration
}
if len(self.events_in_process) > 0:
earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time']
else:
earliest_event = datetime.datetime.now().timestamp()
# if the earliest event exceeds the max seconds, cap it
max_seconds = self.config.save_clips.max_seconds
if datetime.datetime.now().timestamp()-earliest_event > max_seconds:
earliest_event = datetime.datetime.now().timestamp()-max_seconds
for f, data in list(self.cached_clips.items()):
if earliest_event-90 > data['start_time']+data['duration']:
del self.cached_clips[f]
os.remove(os.path.join(self.cache_dir,f))
def create_clip(self, camera, event_data, pre_capture):
# get all clips from the camera with the event sorted
sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
while sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']:
time.sleep(5)
self.refresh_cache()
# get all clips from the camera with the event sorted
sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
playlist_start = event_data['start_time']-pre_capture
playlist_end = event_data['end_time']+5
playlist_lines = []
for clip in sorted_clips:
# clip ends before playlist start time, skip
if clip['start_time']+clip['duration'] < playlist_start:
continue
# clip starts after playlist ends, finish
if clip['start_time'] > playlist_end:
break
playlist_lines.append(f"file '{os.path.join(self.cache_dir,clip['path'])}'")
# if this is the starting clip, add an inpoint
if clip['start_time'] < playlist_start:
playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}")
# if this is the ending clip, add an outpoint
if clip['start_time']+clip['duration'] > playlist_end:
playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}")
clip_name = f"{camera}-{event_data['id']}"
ffmpeg_cmd = [
'ffmpeg',
'-y',
'-protocol_whitelist',
'pipe,file',
'-f',
'concat',
'-safe',
'0',
'-i',
'-',
'-c',
'copy',
f"{os.path.join(self.clips_dir, clip_name)}.mp4"
]
p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True)
if p.returncode != 0:
logger.error(p.stderr)
return
def run(self):
while True:
if self.stop_event.is_set():
logger.info(f"Exiting event processor...")
break
try:
event_type, camera, event_data = self.event_queue.get(timeout=10)
except queue.Empty:
if not self.stop_event.is_set():
self.refresh_cache()
continue
self.refresh_cache()
save_clips_config = self.config.cameras[camera].save_clips
# if save clips is not enabled for this camera, just continue
if not save_clips_config.enabled:
continue
# if specific objects are listed for this camera, only save clips for them
# TODO: default to all tracked objects rather than checking for None
if save_clips_config.objects:
if not event_data['label'] in save_clips_config.objects:
continue
if event_type == 'start':
self.events_in_process[event_data['id']] = event_data
if event_type == 'end':
Event.create(
id=event_data['id'],
label=event_data['label'],
camera=camera,
start_time=event_data['start_time'],
end_time=event_data['end_time'],
top_score=event_data['top_score'],
false_positive=event_data['false_positive'],
zones=list(event_data['entered_zones'])
)
if len(self.cached_clips) > 0 and not event_data['false_positive']:
self.create_clip(camera, event_data, save_clips_config.pre_capture)
del self.events_in_process[event_data['id']]