import datetime import json import logging import os import queue import subprocess as sp import threading import time from collections import defaultdict from pathlib import Path import psutil import shutil from frigate.config import FrigateConfig from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR from frigate.models import Event from peewee import fn logger = logging.getLogger(__name__) class EventProcessor(threading.Thread): def __init__(self, config, camera_processes, event_queue, event_processed_queue, stop_event): threading.Thread.__init__(self) self.name = 'event_processor' self.config = config self.camera_processes = camera_processes self.cached_clips = {} self.event_queue = event_queue self.event_processed_queue = event_processed_queue self.events_in_process = {} self.stop_event = stop_event def should_create_clip(self, camera, event_data): if event_data['false_positive']: return False # if there are required zones and there is no overlap required_zones = self.config.cameras[camera].clips.required_zones if len(required_zones) > 0 and not set(event_data['entered_zones']) & set(required_zones): logger.debug(f"Not creating clip for {event_data['id']} because it did not enter required zones") return False return True def refresh_cache(self): cached_files = os.listdir(CACHE_DIR) files_in_use = [] for process in psutil.process_iter(): try: if process.name() != 'ffmpeg': continue flist = process.open_files() if flist: for nt in flist: if nt.path.startswith(CACHE_DIR): files_in_use.append(nt.path.split('/')[-1]) except: continue for f in cached_files: if f in files_in_use or f in self.cached_clips: continue camera = '-'.join(f.split('-')[:-1]) start_time = datetime.datetime.strptime(f.split('-')[-1].split('.')[0], '%Y%m%d%H%M%S') ffprobe_cmd = " ".join([ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', f"{os.path.join(CACHE_DIR,f)}" ]) p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True) (output, err) = p.communicate() p_status = p.wait() if p_status == 0: duration = float(output.decode('utf-8').strip()) else: logger.info(f"bad file: {f}") os.remove(os.path.join(CACHE_DIR,f)) continue self.cached_clips[f] = { 'path': f, 'camera': camera, 'start_time': start_time.timestamp(), 'duration': duration } if len(self.events_in_process) > 0: earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time'] else: earliest_event = datetime.datetime.now().timestamp() # if the earliest event exceeds the max seconds, cap it max_seconds = self.config.clips.max_seconds if datetime.datetime.now().timestamp()-earliest_event > max_seconds: earliest_event = datetime.datetime.now().timestamp()-max_seconds for f, data in list(self.cached_clips.items()): if earliest_event-90 > data['start_time']+data['duration']: del self.cached_clips[f] logger.debug(f"Cleaning up cached file {f}") os.remove(os.path.join(CACHE_DIR,f)) # if we are still using more than 90% of the cache, proactively cleanup cache_usage = shutil.disk_usage("/tmp/cache") if cache_usage.used/cache_usage.total > .9: logger.warning("More than 90% of the cache is used.") logger.warning("Consider increasing space available at /tmp/cache or reducing max_seconds in your clips config.") logger.warning("Proactively cleaning up the cache...") while cache_usage.used/cache_usage.total > .9: oldest_clip = min(self.cached_clips.values(), key=lambda x:x['start_time']) del self.cached_clips[oldest_clip['path']] os.remove(os.path.join(CACHE_DIR,oldest_clip['path'])) cache_usage = shutil.disk_usage("/tmp/cache") def create_clip(self, camera, event_data, pre_capture, post_capture): # get all clips from the camera with the event sorted sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time']) # if there are no clips in the cache or we are still waiting on a needed file check every 5 seconds wait_count = 0 while len(sorted_clips) == 0 or sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']+post_capture: if wait_count > 4: logger.warning(f"Unable to create clip for {camera} and event {event_data['id']}. There were no cache files for this event.") return False logger.debug(f"No cache clips for {camera}. Waiting...") time.sleep(5) self.refresh_cache() # get all clips from the camera with the event sorted sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time']) wait_count += 1 playlist_start = event_data['start_time']-pre_capture playlist_end = event_data['end_time']+post_capture playlist_lines = [] for clip in sorted_clips: # clip ends before playlist start time, skip if clip['start_time']+clip['duration'] < playlist_start: continue # clip starts after playlist ends, finish if clip['start_time'] > playlist_end: break playlist_lines.append(f"file '{os.path.join(CACHE_DIR,clip['path'])}'") # if this is the starting clip, add an inpoint if clip['start_time'] < playlist_start: playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}") # if this is the ending clip, add an outpoint if clip['start_time']+clip['duration'] > playlist_end: playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}") clip_name = f"{camera}-{event_data['id']}" ffmpeg_cmd = [ 'ffmpeg', '-y', '-protocol_whitelist', 'pipe,file', '-f', 'concat', '-safe', '0', '-i', '-', '-c', 'copy', '-movflags', '+faststart', f"{os.path.join(CLIPS_DIR, clip_name)}.mp4" ] p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True) if p.returncode != 0: logger.error(p.stderr) return False return True def run(self): while True: if self.stop_event.is_set(): logger.info(f"Exiting event processor...") break try: event_type, camera, event_data = self.event_queue.get(timeout=10) except queue.Empty: if not self.stop_event.is_set(): self.refresh_cache() continue logger.debug(f"Event received: {event_type} {camera} {event_data['id']}") self.refresh_cache() if event_type == 'start': self.events_in_process[event_data['id']] = event_data if event_type == 'end': clips_config = self.config.cameras[camera].clips if self.should_create_clip(camera, event_data): clip_created = False if clips_config.enabled and (clips_config.objects is None or event_data['label'] in clips_config.objects): clip_created = self.create_clip(camera, event_data, clips_config.pre_capture, clips_config.post_capture) Event.create( id=event_data['id'], label=event_data['label'], camera=camera, start_time=event_data['start_time'], end_time=event_data['end_time'], top_score=event_data['top_score'], false_positive=event_data['false_positive'], zones=list(event_data['entered_zones']), thumbnail=event_data['thumbnail'], has_clip=clip_created, has_snapshot=event_data['has_snapshot'], ) del self.events_in_process[event_data['id']] self.event_processed_queue.put((event_data['id'], camera)) class EventCleanup(threading.Thread): def __init__(self, config: FrigateConfig, stop_event): threading.Thread.__init__(self) self.name = 'event_cleanup' self.config = config self.stop_event = stop_event self.camera_keys = list(self.config.cameras.keys()) def expire(self, media): ## Expire events from unlisted cameras based on the global config if media == 'clips': retain_config = self.config.clips.retain file_extension = 'mp4' update_params = {'has_clip': False} else: retain_config = self.config.snapshots.retain file_extension = 'jpg' update_params = {'has_snapshot': False} distinct_labels = (Event.select(Event.label) .where(Event.camera.not_in(self.camera_keys)) .distinct()) # loop over object types in db for l in distinct_labels: # get expiration time for this label expire_days = retain_config.objects.get(l.label, retain_config.default) expire_after = (datetime.datetime.now() - datetime.timedelta(days=expire_days)).timestamp() # grab all events after specific time expired_events = ( Event.select() .where(Event.camera.not_in(self.camera_keys), Event.start_time < expire_after, Event.label == l.label) ) # delete the media from disk for event in expired_events: media_name = f"{event.camera}-{event.id}" media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}") media.unlink(missing_ok=True) # update the clips attribute for the db entry update_query = ( Event.update(update_params) .where(Event.camera.not_in(self.camera_keys), Event.start_time < expire_after, Event.label == l.label) ) update_query.execute() ## Expire events from cameras based on the camera config for name, camera in self.config.cameras.items(): if media == 'clips': retain_config = camera.clips.retain else: retain_config = camera.snapshots.retain # get distinct objects in database for this camera distinct_labels = (Event.select(Event.label) .where(Event.camera == name) .distinct()) # loop over object types in db for l in distinct_labels: # get expiration time for this label expire_days = retain_config.objects.get(l.label, retain_config.default) expire_after = (datetime.datetime.now() - datetime.timedelta(days=expire_days)).timestamp() # grab all events after specific time expired_events = ( Event.select() .where(Event.camera == name, Event.start_time < expire_after, Event.label == l.label) ) # delete the grabbed clips from disk for event in expired_events: media_name = f"{event.camera}-{event.id}" media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}") media.unlink(missing_ok=True) # update the clips attribute for the db entry update_query = ( Event.update(update_params) .where( Event.camera == name, Event.start_time < expire_after, Event.label == l.label) ) update_query.execute() def purge_duplicates(self): duplicate_query = """with grouped_events as ( select id, label, camera, has_snapshot, has_clip, row_number() over ( partition by label, camera, round(start_time/5,0)*5 order by end_time-start_time desc ) as copy_number from event ) select distinct id, camera, has_snapshot, has_clip from grouped_events where copy_number > 1;""" duplicate_events = Event.raw(duplicate_query) for event in duplicate_events: logger.debug(f"Removing duplicate: {event.id}") media_name = f"{event.camera}-{event.id}" if event.has_snapshot: media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg") media.unlink(missing_ok=True) if event.has_clip: media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4") media.unlink(missing_ok=True) (Event.delete() .where( Event.id << [event.id for event in duplicate_events] ) .execute()) def run(self): counter = 0 while(True): if self.stop_event.is_set(): logger.info(f"Exiting event cleanup...") break # only expire events every 5 minutes, but check for stop events every 10 seconds time.sleep(10) counter = counter + 1 if counter < 30: continue counter = 0 self.expire('clips') self.expire('snapshots') self.purge_duplicates() # drop events from db where has_clip and has_snapshot are false delete_query = ( Event.delete() .where( Event.has_clip == False, Event.has_snapshot == False) ) delete_query.execute()