From 7383db60b0c0e5f1353a95d00521cde16d7b9eee Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Thu, 9 Jul 2020 06:57:16 -0500 Subject: [PATCH] save clips for tracked objects --- Dockerfile | 3 + config/config.example.yml | 9 +++ detect_objects.py | 30 ++++++- frigate/events.py | 152 +++++++++++++++++++++++++++++++++++ frigate/object_processing.py | 30 ++++++- frigate/objects.py | 4 + 6 files changed, 223 insertions(+), 5 deletions(-) create mode 100644 frigate/events.py diff --git a/Dockerfile b/Dockerfile index d35087435..c28e86a0e 100755 --- a/Dockerfile +++ b/Dockerfile @@ -24,6 +24,7 @@ RUN apt -qq update && apt -qq install --no-install-recommends -y \ numpy \ imutils \ scipy \ + psutil \ && python3.7 -m pip install -U \ Flask \ paho-mqtt \ @@ -49,6 +50,8 @@ RUN wget -q https://dl.google.com/coral/canned_models/coco_labels.txt -O /labelm RUN wget -q https://github.com/google-coral/edgetpu/raw/master/test_data/ssd_mobilenet_v2_coco_quant_postprocess.tflite -O /cpu_model.tflite +RUN mkdir /cache && mkdir /clips + WORKDIR /opt/frigate/ ADD frigate frigate/ COPY detect_objects.py . diff --git a/config/config.example.yml b/config/config.example.yml index 0cac3afba..0975ce2df 100644 --- a/config/config.example.yml +++ b/config/config.example.yml @@ -110,6 +110,15 @@ cameras: ################ take_frame: 1 + ################ + # This will save a clip for each tracked object by frigate along with a json file that contains + # data related to the tracked object. This works by telling ffmpeg to write video segments to /cache + # from the video stream without re-encoding. Clips are them created by using ffmpeg to merge segments + # without re-encoding. The segements saved are unaltered from what frigate receives to avoid re-encoding. + # They do not contain bounding boxes. 30 seconds of video is added to the start of the clip. + ################ + save_clips: False + ################ # Configuration for the snapshots in the debug view and mqtt ################ diff --git a/detect_objects.py b/detect_objects.py index 9c516aec7..bf4291604 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -17,6 +17,7 @@ import paho.mqtt.client as mqtt from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg from frigate.object_processing import TrackedObjectProcessor +from frigate.events import EventProcessor from frigate.util import EventsPerSecond from frigate.edgetpu import EdgeTPUProcess @@ -176,6 +177,9 @@ def main(): # Queue for cameras to push tracked objects to tracked_objects_queue = mp.SimpleQueue() + + # Queue for clip processing + event_queue = mp.Queue() # Start the shared tflite process tflite_process = EdgeTPUProcess() @@ -190,6 +194,25 @@ def main(): ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args']) ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args']) ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args']) + if config.get('save_clips', False): + ffmpeg_output_args = [ + "-f", + "segment", + "-segment_time", + "10", + "-segment_format", + "mp4", + "-reset_timestamps", + "1", + "-strftime", + "1", + "-c", + "copy", + "-an", + "-map", + "0", + f"/cache/{name}-%Y%m%d%H%M%S.mp4" + ] + ffmpeg_output_args ffmpeg_cmd = (['ffmpeg'] + ffmpeg_global_args + ffmpeg_hwaccel_args + @@ -239,8 +262,11 @@ def main(): for name, camera_process in camera_processes.items(): camera_process['process'].start() print(f"Camera_process started for {name}: {camera_process['process'].pid}") - - object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue) + + event_processor = EventProcessor(CONFIG['cameras'], camera_processes, '/cache', '/clips', event_queue) + event_processor.start() + + object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue) object_processor.start() camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process) diff --git a/frigate/events.py b/frigate/events.py new file mode 100644 index 000000000..3a1c892ac --- /dev/null +++ b/frigate/events.py @@ -0,0 +1,152 @@ +import os +import time +import psutil +import threading +from collections import defaultdict +import json +import datetime +import subprocess as sp +import queue + +class EventProcessor(threading.Thread): + def __init__(self, config, camera_processes, cache_dir, clip_dir, event_queue): + threading.Thread.__init__(self) + self.config = config + self.camera_processes = camera_processes + self.cache_dir = cache_dir + self.clip_dir = clip_dir + self.cached_clips = {} + self.event_queue = event_queue + self.events_in_process = {} + + def refresh_cache(self): + cached_files = os.listdir(self.cache_dir) + + files_in_use = [] + for process_data in self.camera_processes.values(): + try: + ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_process'].pid) + flist = ffmpeg_process.open_files() + if flist: + for nt in flist: + if nt.path.startswith(self.cache_dir): + files_in_use.append(nt.path.split('/')[-1]) + except: + continue + + for f in cached_files: + if f in files_in_use or f in self.cached_clips: + continue + + camera = f.split('-')[0] + start_time = datetime.datetime.strptime(f.split('-')[1].split('.')[0], '%Y%m%d%H%M%S') + + ffprobe_cmd = " ".join([ + 'ffprobe', + '-v', + 'error', + '-show_entries', + 'format=duration', + '-of', + 'default=noprint_wrappers=1:nokey=1', + f"{os.path.join(self.cache_dir,f)}" + ]) + p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True) + (output, err) = p.communicate() + p_status = p.wait() + if p_status == 0: + duration = float(output.decode('utf-8').strip()) + else: + print(f"bad file: {f}") + os.remove(os.path.join(self.cache_dir,f)) + continue + + self.cached_clips[f] = { + 'path': f, + 'camera': camera, + 'start_time': start_time.timestamp(), + 'duration': duration + } + + if len(self.events_in_process) > 0: + earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time'] + else: + earliest_event = datetime.datetime.now().timestamp() + + for f, data in list(self.cached_clips.items()): + if earliest_event-90 > data['start_time']+data['duration']: + del self.cached_clips[f] + os.remove(os.path.join(self.cache_dir,f)) + + def create_clip(self, camera, event_data): + # get all clips from the camera with the event sorted + sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time']) + + while sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']: + time.sleep(5) + self.refresh_cache() + # get all clips from the camera with the event sorted + sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time']) + + playlist_start = event_data['start_time']-30 + playlist_end = event_data['end_time']+5 + playlist_lines = [] + for clip in sorted_clips: + # clip ends before playlist start time, skip + if clip['start_time']+clip['duration'] < playlist_start: + continue + # clip starts after playlist ends, finish + if clip['start_time'] > playlist_end: + break + playlist_lines.append(f"file '{os.path.join(self.cache_dir,clip['path'])}'") + # if this is the starting clip, add an inpoint + if clip['start_time'] < playlist_start: + playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}") + # if this is the ending clip, add an outpoint + if clip['start_time']+clip['duration'] > playlist_end: + playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}") + + clip_name = f"{camera}-{event_data['id']}" + ffmpeg_cmd = [ + 'ffmpeg', + '-y', + '-protocol_whitelist', + 'pipe,file', + '-f', + 'concat', + '-safe', + '0', + '-i', + '-', + '-c', + 'copy', + f"{os.path.join(self.clip_dir, clip_name)}.mp4" + ] + + p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True) + if p.returncode != 0: + print(p.stderr) + return + + with open(f"{os.path.join(self.clip_dir, clip_name)}.json", 'w') as outfile: + json.dump(event_data, outfile) + + def run(self): + while True: + try: + event_type, camera, event_data = self.event_queue.get(timeout=10) + except queue.Empty: + self.refresh_cache() + continue + + self.refresh_cache() + + if event_type == 'start': + self.events_in_process[event_data['id']] = event_data + + if event_type == 'end': + if self.config[camera].get('save_clips', False) and len(self.cached_clips) > 0: + self.create_clip(camera, event_data) + del self.events_in_process[event_data['id']] + + \ No newline at end of file diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 68bb20d4a..ec5a02154 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -23,12 +23,13 @@ for key, val in LABELS.items(): COLOR_MAP[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3]) class TrackedObjectProcessor(threading.Thread): - def __init__(self, config, client, topic_prefix, tracked_objects_queue): + def __init__(self, config, client, topic_prefix, tracked_objects_queue, event_queue): threading.Thread.__init__(self) self.config = config self.client = client self.topic_prefix = topic_prefix self.tracked_objects_queue = tracked_objects_queue + self.event_queue = event_queue self.camera_data = defaultdict(lambda: { 'best_objects': {}, 'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')), @@ -50,12 +51,35 @@ class TrackedObjectProcessor(threading.Thread): def run(self): while True: - camera, frame_time, tracked_objects = self.tracked_objects_queue.get() + camera, frame_time, current_tracked_objects = self.tracked_objects_queue.get() config = self.config[camera] best_objects = self.camera_data[camera]['best_objects'] current_object_status = self.camera_data[camera]['object_status'] - self.camera_data[camera]['tracked_objects'] = tracked_objects + tracked_objects = self.camera_data[camera]['tracked_objects'] + + current_ids = current_tracked_objects.keys() + previous_ids = tracked_objects.keys() + removed_ids = list(set(previous_ids).difference(current_ids)) + new_ids = list(set(current_ids).difference(previous_ids)) + updated_ids = list(set(current_ids).intersection(previous_ids)) + + for id in new_ids: + tracked_objects[id] = current_tracked_objects[id] + # publish events to mqtt + self.client.publish(f"{self.topic_prefix}/{camera}/events/start", json.dumps(tracked_objects[id]), retain=False) + self.event_queue.put(('start', camera, tracked_objects[id])) + + for id in updated_ids: + tracked_objects[id] = current_tracked_objects[id] + + for id in removed_ids: + # publish events to mqtt + tracked_objects[id]['end_time'] = frame_time + self.client.publish(f"{self.topic_prefix}/{camera}/events/end", json.dumps(tracked_objects[id]), retain=False) + self.event_queue.put(('end', camera, tracked_objects[id])) + del tracked_objects[id] + self.camera_data[camera]['current_frame_time'] = frame_time ### diff --git a/frigate/objects.py b/frigate/objects.py index 411835d2b..f95b0b021 100644 --- a/frigate/objects.py +++ b/frigate/objects.py @@ -19,6 +19,7 @@ class ObjectTracker(): def register(self, index, obj): id = f"{obj['frame_time']}-{index}" obj['id'] = id + obj['start_time'] = obj['frame_time'] obj['top_score'] = obj['score'] self.add_history(obj) self.tracked_objects[id] = obj @@ -45,6 +46,9 @@ class ObjectTracker(): } if 'history' in obj: obj['history'].append(entry) + # only maintain the last 20 in history + if len(obj['history']) > 20: + obj['history'] = obj['history'][-20:] else: obj['history'] = [entry]