import datetime import json import logging import os import queue import subprocess as sp import threading import time from collections import defaultdict from pathlib import Path import psutil from frigate.config import FrigateConfig from frigate.models import Event from peewee import fn logger = logging.getLogger(__name__) class EventProcessor(threading.Thread): def __init__(self, config, camera_processes, event_queue, stop_event): threading.Thread.__init__(self) self.name = 'event_processor' self.config = config self.cache_dir = self.config.save_clips.cache_dir self.clips_dir = self.config.save_clips.clips_dir self.camera_processes = camera_processes self.cached_clips = {} self.event_queue = event_queue self.events_in_process = {} self.stop_event = stop_event def refresh_cache(self): cached_files = os.listdir(self.cache_dir) files_in_use = [] for process_data in self.camera_processes.values(): try: ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_pid'].value) flist = ffmpeg_process.open_files() if flist: for nt in flist: if nt.path.startswith(self.cache_dir): files_in_use.append(nt.path.split('/')[-1]) except: continue for f in cached_files: if f in files_in_use or f in self.cached_clips: continue camera = '-'.join(f.split('-')[:-1]) start_time = datetime.datetime.strptime(f.split('-')[-1].split('.')[0], '%Y%m%d%H%M%S') ffprobe_cmd = " ".join([ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', f"{os.path.join(self.cache_dir,f)}" ]) p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True) (output, err) = p.communicate() p_status = p.wait() if p_status == 0: duration = float(output.decode('utf-8').strip()) else: logger.info(f"bad file: {f}") os.remove(os.path.join(self.cache_dir,f)) continue self.cached_clips[f] = { 'path': f, 'camera': camera, 'start_time': start_time.timestamp(), 'duration': duration } if len(self.events_in_process) > 0: earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time'] else: earliest_event = datetime.datetime.now().timestamp() # if the earliest event exceeds the max seconds, cap it max_seconds = self.config.save_clips.max_seconds if datetime.datetime.now().timestamp()-earliest_event > max_seconds: earliest_event = datetime.datetime.now().timestamp()-max_seconds for f, data in list(self.cached_clips.items()): if earliest_event-90 > data['start_time']+data['duration']: del self.cached_clips[f] os.remove(os.path.join(self.cache_dir,f)) def create_clip(self, camera, event_data, pre_capture): # get all clips from the camera with the event sorted sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time']) while sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']: time.sleep(5) self.refresh_cache() # get all clips from the camera with the event sorted sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time']) playlist_start = event_data['start_time']-pre_capture playlist_end = event_data['end_time']+5 playlist_lines = [] for clip in sorted_clips: # clip ends before playlist start time, skip if clip['start_time']+clip['duration'] < playlist_start: continue # clip starts after playlist ends, finish if clip['start_time'] > playlist_end: break playlist_lines.append(f"file '{os.path.join(self.cache_dir,clip['path'])}'") # if this is the starting clip, add an inpoint if clip['start_time'] < playlist_start: playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}") # if this is the ending clip, add an outpoint if clip['start_time']+clip['duration'] > playlist_end: playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}") clip_name = f"{camera}-{event_data['id']}" ffmpeg_cmd = [ 'ffmpeg', '-y', '-protocol_whitelist', 'pipe,file', '-f', 'concat', '-safe', '0', '-i', '-', '-c', 'copy', f"{os.path.join(self.clips_dir, clip_name)}.mp4" ] p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True) if p.returncode != 0: logger.error(p.stderr) return def run(self): while True: if self.stop_event.is_set(): logger.info(f"Exiting event processor...") break try: event_type, camera, event_data = self.event_queue.get(timeout=10) except queue.Empty: if not self.stop_event.is_set(): self.refresh_cache() continue self.refresh_cache() save_clips_config = self.config.cameras[camera].save_clips # if save clips is not enabled for this camera, just continue if not save_clips_config.enabled: continue # if specific objects are listed for this camera, only save clips for them # TODO: default to all tracked objects rather than checking for None if save_clips_config.objects: if not event_data['label'] in save_clips_config.objects: continue if event_type == 'start': self.events_in_process[event_data['id']] = event_data if event_type == 'end': if len(self.cached_clips) > 0 and not event_data['false_positive']: self.create_clip(camera, event_data, save_clips_config.pre_capture) Event.create( id=event_data['id'], label=event_data['label'], camera=camera, start_time=event_data['start_time'], end_time=event_data['end_time'], top_score=event_data['top_score'], false_positive=event_data['false_positive'], zones=list(event_data['entered_zones']), thumbnail=event_data['thumbnail'] ) del self.events_in_process[event_data['id']] class EventCleanup(threading.Thread): def __init__(self, config: FrigateConfig, stop_event): threading.Thread.__init__(self) self.name = 'event_cleanup' self.config = config self.clips_dir = self.config.save_clips.clips_dir self.stop_event = stop_event def run(self): counter = 0 while(True): if self.stop_event.is_set(): logger.info(f"Exiting event cleanup...") break # only expire events every 10 minutes, but check for stop events every 10 seconds counter = counter + 1 if counter < 60: continue counter = 0 camera_keys = list(self.config.cameras.keys()) # Expire events from unlisted cameras based on the global config retain_config = self.config.save_clips.retain distinct_labels = (Event.select(Event.label) .where(Event.camera.not_in(camera_keys)) .distinct()) # loop over object types in db for l in distinct_labels: # get expiration time for this label expire_days = retain_config.objects.get(l.label, retain_config.default) expire_after = (datetime.datetime.now() - datetime.timedelta(days=expire_days)).timestamp() # grab all events after specific time expired_events = ( Event.select() .where(Event.camera.not_in(camera_keys), Event.start_time < expire_after, Event.label == l.label) ) # delete the grabbed clips from disk for event in expired_events: clip_name = f"{event.camera}-{event.id}" clip = Path(f"{os.path.join(self.clips_dir, clip_name)}.mp4") clip.unlink(missing_ok=True) # delete the event for this type from the db delete_query = ( Event.delete() .where(Event.camera.not_in(camera_keys), Event.start_time < expire_after, Event.label == l.label) ) delete_query.execute() # Expire events from cameras based on the camera config for name, camera in self.config.cameras.items(): retain_config = camera.save_clips.retain # get distinct objects in database for this camera distinct_labels = (Event.select(Event.label) .where(Event.camera == name) .distinct()) # loop over object types in db for l in distinct_labels: # get expiration time for this label expire_days = retain_config.objects.get(l.label, retain_config.default) expire_after = (datetime.datetime.now() - datetime.timedelta(days=expire_days)).timestamp() # grab all events after specific time expired_events = ( Event.select() .where(Event.camera == name, Event.start_time < expire_after, Event.label == l.label) ) # delete the grabbed clips from disk for event in expired_events: clip_name = f"{event.camera}-{event.id}" clip = Path(f"{os.path.join(self.clips_dir, clip_name)}.mp4") clip.unlink(missing_ok=True) # delete the event for this type from the db delete_query = ( Event.delete() .where( Event.camera == name, Event.start_time < expire_after, Event.label == l.label) ) delete_query.execute() time.sleep(10)