From 4e0cf3681e16ff3cc6299d72c76696088afe69a2 Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Sun, 29 Nov 2020 15:55:53 -0600 Subject: [PATCH] add multiple streams per camera --- frigate/config.py | 205 +++++++++++++++++++++--------------- frigate/events.py | 7 +- frigate/test/test_config.py | 62 +++++++++-- frigate/video.py | 52 ++++++--- 4 files changed, 211 insertions(+), 115 deletions(-) diff --git a/frigate/config.py b/frigate/config.py index dc5809b41..b686cf080 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -56,22 +56,30 @@ SAVE_CLIPS_SCHEMA = vol.Schema( FFMPEG_GLOBAL_ARGS_DEFAULT = ['-hide_banner','-loglevel','panic'] FFMPEG_INPUT_ARGS_DEFAULT = ['-avoid_negative_ts', 'make_zero', - '-fflags', 'nobuffer', - '-flags', 'low_delay', - '-strict', 'experimental', - '-fflags', '+genpts+discardcorrupt', '-rtsp_transport', 'tcp', '-stimeout', '5000000', '-use_wallclock_as_timestamps', '1'] -FFMPEG_OUTPUT_ARGS_DEFAULT = ['-f', 'rawvideo', +DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT = ['-f', 'rawvideo', '-pix_fmt', 'yuv420p'] +RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-c", "copy", "-f", "flv"] +SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "segment", "-segment_time", + "10", "-segment_format", "mp4", "-reset_timestamps", "1", "-strftime", + "1", "-c", "copy", "-an"] +RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "segment", "-segment_time", + "60", "-segment_format", "mp4", "-reset_timestamps", "1", "-strftime", + "1", "-c", "copy", "-an"] GLOBAL_FFMPEG_SCHEMA = vol.Schema( { - vol.Optional('global_args', default=FFMPEG_GLOBAL_ARGS_DEFAULT): [str], - vol.Optional('hwaccel_args', default=[]): [str], - vol.Optional('input_args', default=FFMPEG_INPUT_ARGS_DEFAULT): [str], - vol.Optional('output_args', default=FFMPEG_OUTPUT_ARGS_DEFAULT): [str] + vol.Optional('global_args', default=FFMPEG_GLOBAL_ARGS_DEFAULT): vol.Any(str, [str]), + vol.Optional('hwaccel_args', default=[]): vol.Any(str, [str]), + vol.Optional('input_args', default=FFMPEG_INPUT_ARGS_DEFAULT): vol.Any(str, [str]), + vol.Optional('output_args', default={}): { + vol.Optional('detect', default=DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]), + vol.Optional('record', default=RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]), + vol.Optional('clips', default=SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]), + vol.Optional('rtmp', default=RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]), + } } ) @@ -111,13 +119,28 @@ DEFAULT_CAMERA_SNAPSHOTS = { 'crop_to_region': True } +def each_role_used_once(inputs): + roles = [role for i in inputs for role in i['roles']] + roles_set = set(roles) + if len(roles) > len(roles_set): + raise ValueError + return inputs + CAMERA_FFMPEG_SCHEMA = vol.Schema( { - vol.Required('input'): str, - 'global_args': [str], - 'hwaccel_args': [str], - 'input_args': [str], - 'output_args': [str] + vol.Required('inputs'): vol.All([{ + vol.Required('path'): str, + vol.Required('roles'): ['detect', 'clips', 'record', 'rtmp'], + 'global_args': vol.Any(str, [str]), + 'hwaccel_args': vol.Any(str, [str]), + 'input_args': vol.Any(str, [str]), + }], vol.Msg(each_role_used_once, msg="Each input role may only be used once")), + 'output_args': { + vol.Optional('detect', default=DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]), + vol.Optional('record', default=RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]), + vol.Optional('clips', default=SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]), + vol.Optional('rtmp', default=RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]), + } } ) @@ -140,10 +163,10 @@ CAMERAS_SCHEMA = vol.Schema( vol.Optional('enabled', default=False): bool, vol.Optional('pre_capture', default=30): int, 'objects': [str], - vol.Optional('retain', default={}): SAVE_CLIPS_RETAIN_SCHEMA + vol.Optional('retain', default={}): SAVE_CLIPS_RETAIN_SCHEMA, }, vol.Optional('rtmp', default={}): { - vol.Required('enabled', default=True): bool + vol.Required('enabled', default=True): bool, }, vol.Optional('snapshots', default=DEFAULT_CAMERA_SNAPSHOTS): { vol.Optional('show_timestamp', default=True): bool, @@ -230,6 +253,47 @@ class MqttConfig(): 'user': self.user } +class CameraInput(): + def __init__(self, global_config, ffmpeg_input): + self._path = ffmpeg_input['path'] + self._roles = ffmpeg_input['roles'] + self._global_args = ffmpeg_input.get('global_args', global_config['global_args']) + self._hwaccel_args = ffmpeg_input.get('hwaccel_args', global_config['hwaccel_args']) + self._input_args = ffmpeg_input.get('input_args', global_config['input_args']) + + @property + def path(self): + return self._path + + @property + def roles(self): + return self._roles + + @property + def global_args(self): + return self._global_args if isinstance(self._global_args, list) else self._global_args.split(' ') + + @property + def hwaccel_args(self): + return self._hwaccel_args if isinstance(self._hwaccel_args, list) else self._hwaccel_args.split(' ') + + @property + def input_args(self): + return self._input_args if isinstance(self._input_args, list) else self._input_args.split(' ') + +class CameraFfmpegConfig(): + def __init__(self, global_config, config): + self._inputs = [CameraInput(global_config, i) for i in config['inputs']] + self._output_args = config.get('output_args', global_config['output_args']) + + @property + def inputs(self): + return self._inputs + + @property + def output_args(self): + return {k: v if isinstance(v, list) else v.split(' ') for k, v in self._output_args.items()} + class SaveClipsRetainConfig(): def __init__(self, global_config, config): self._default = config.get('default', global_config.get('default')) @@ -280,34 +344,6 @@ class SaveClipsConfig(): 'retain': self.retain.to_dict() } -class FfmpegConfig(): - def __init__(self, global_config, config): - self._input = config.get('input') - self._global_args = config.get('global_args', global_config['global_args']) - self._hwaccel_args = config.get('hwaccel_args', global_config['hwaccel_args']) - self._input_args = config.get('input_args', global_config['input_args']) - self._output_args = config.get('output_args', global_config['output_args']) - - @property - def input(self): - return self._input - - @property - def global_args(self): - return self._global_args - - @property - def hwaccel_args(self): - return self._hwaccel_args - - @property - def input_args(self): - return self._input_args - - @property - def output_args(self): - return self._output_args - class FilterConfig(): def __init__(self, config): self._min_area = config['min_area'] @@ -403,7 +439,7 @@ class CameraSaveClipsConfig(): self._enabled = config['enabled'] self._pre_capture = config['pre_capture'] self._objects = config.get('objects') - self._retain = SaveClipsRetainConfig(global_config['retain'], config['retain']) + self._retain = SaveClipsRetainConfig(global_config['save_clips']['retain'], config['retain']) @property def enabled(self): @@ -428,8 +464,9 @@ class CameraSaveClipsConfig(): 'objects': self.objects, 'retain': self.retain.to_dict() } + class CameraRtmpConfig(): - def __init__(self, config): + def __init__(self, global_config, config): self._enabled = config['enabled'] @property @@ -438,7 +475,7 @@ class CameraRtmpConfig(): def to_dict(self): return { - 'enabled': self.enabled + 'enabled': self.enabled, } class ZoneConfig(): @@ -489,7 +526,7 @@ class ZoneConfig(): class CameraConfig(): def __init__(self, name, config, cache_dir, global_config): self._name = name - self._ffmpeg = FfmpegConfig(global_config['ffmpeg'], config['ffmpeg']) + self._ffmpeg = CameraFfmpegConfig(global_config['ffmpeg'], config['ffmpeg']) self._height = config.get('height') self._width = config.get('width') self._frame_shape = (self._height, self._width) @@ -498,12 +535,18 @@ class CameraConfig(): self._mask = self._create_mask(config.get('mask')) self._best_image_timeout = config['best_image_timeout'] self._zones = { name: ZoneConfig(name, z) for name, z in config['zones'].items() } - self._save_clips = CameraSaveClipsConfig(global_config['save_clips'], config['save_clips']) - self._rtmp = CameraRtmpConfig(config['rtmp']) + self._save_clips = CameraSaveClipsConfig(global_config, config['save_clips']) + self._rtmp = CameraRtmpConfig(global_config, config['rtmp']) self._snapshots = CameraSnapshotsConfig(config['snapshots']) self._objects = ObjectConfig(global_config['objects'], config.get('objects', {})) - self._ffmpeg_cmd = self._get_ffmpeg_cmd(cache_dir) + self._ffmpeg_cmds = [] + for ffmpeg_input in self._ffmpeg.inputs: + self._ffmpeg_cmds.append({ + 'roles': ffmpeg_input.roles, + 'cmd': self._get_ffmpeg_cmd(ffmpeg_input, cache_dir) + }) + self._set_zone_colors(self._zones) @@ -530,42 +573,31 @@ class CameraConfig(): return mask_img - def _get_ffmpeg_cmd(self, cache_dir): - ffmpeg_output_args = self.ffmpeg.output_args - if self.fps: - ffmpeg_output_args = ["-r", str(self.fps)] + ffmpeg_output_args - if self.rtmp.enabled: - ffmpeg_output_args = [ - "-c", - "copy", - "-f", - "flv", + def _get_ffmpeg_cmd(self, ffmpeg_input, cache_dir): + ffmpeg_output_args = [] + # TODO: ensure output args exist for each role and each role is only used once + if 'detect' in ffmpeg_input.roles: + ffmpeg_output_args = self.ffmpeg.output_args['detect'] + ffmpeg_output_args + ['pipe:'] + if self.fps: + ffmpeg_output_args = ["-r", str(self.fps)] + ffmpeg_output_args + if 'rtmp' in ffmpeg_input.roles and self.rtmp.enabled: + ffmpeg_output_args = self.ffmpeg.output_args['rtmp'] + [ f"rtmp://127.0.0.1/live/{self.name}" ] + ffmpeg_output_args - if self.save_clips.enabled: - ffmpeg_output_args = [ - "-f", - "segment", - "-segment_time", - "10", - "-segment_format", - "mp4", - "-reset_timestamps", - "1", - "-strftime", - "1", - "-c", - "copy", - "-an", + if 'clips' in ffmpeg_input.roles and self.save_clips.enabled: + ffmpeg_output_args = self.ffmpeg.output_args['clips'] + [ f"{os.path.join(cache_dir, self.name)}-%Y%m%d%H%M%S.mp4" ] + ffmpeg_output_args + # if 'record' in ffmpeg_input.roles and self.save_clips.enabled: + # ffmpeg_output_args = self.ffmpeg.output_args['record'] + [ + # f"{os.path.join(cache_dir, self.name)}-%Y%m%d%H%M%S.mp4" + # ] + ffmpeg_output_args return (['ffmpeg'] + - self.ffmpeg.global_args + - self.ffmpeg.hwaccel_args + - self.ffmpeg.input_args + - ['-i', self.ffmpeg.input] + - ffmpeg_output_args + - ['pipe:']) + ffmpeg_input.global_args + + ffmpeg_input.hwaccel_args + + ffmpeg_input.input_args + + ['-i', ffmpeg_input.path] + + ffmpeg_output_args) def _set_zone_colors(self, zones: Dict[str, ZoneConfig]): # set colors for zones @@ -635,8 +667,8 @@ class CameraConfig(): return self._frame_shape_yuv @property - def ffmpeg_cmd(self): - return self._ffmpeg_cmd + def ffmpeg_cmds(self): + return self._ffmpeg_cmds def to_dict(self): return { @@ -651,7 +683,7 @@ class CameraConfig(): 'snapshots': self.snapshots.to_dict(), 'objects': self.objects.to_dict(), 'frame_shape': self.frame_shape, - 'ffmpeg_cmd': " ".join(self.ffmpeg_cmd), + 'ffmpeg_cmds': [{'roles': c['roles'], 'cmd': ' '.join(c['cmd'])} for c in self.ffmpeg_cmds], } @@ -680,7 +712,8 @@ class FrigateConfig(): config['mqtt']['password'] = config['mqtt']['password'].format(**frigate_env_vars) for camera in config['cameras'].values(): - camera['ffmpeg']['input'] = camera['ffmpeg']['input'].format(**frigate_env_vars) + for i in camera['ffmpeg']['inputs']: + i['path'] = i['path'].format(**frigate_env_vars) return config diff --git a/frigate/events.py b/frigate/events.py index 68b47bf5d..eaa7dcd4c 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -36,10 +36,11 @@ class EventProcessor(threading.Thread): cached_files = os.listdir(self.cache_dir) files_in_use = [] - for process_data in self.camera_processes.values(): + for process in psutil.process_iter(): + if process.name() != 'ffmpeg': + continue try: - ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_pid'].value) - flist = ffmpeg_process.open_files() + flist = process.open_files() if flist: for nt in flist: if nt.path.startswith(self.cache_dir): diff --git a/frigate/test/test_config.py b/frigate/test/test_config.py index 6f33741c7..c7b94bf5f 100644 --- a/frigate/test/test_config.py +++ b/frigate/test/test_config.py @@ -12,7 +12,9 @@ class TestConfig(TestCase): 'cameras': { 'back': { 'ffmpeg': { - 'input': 'rtsp://10.0.0.1:554/video' + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] } + ] }, 'height': 1080, 'width': 1920 @@ -39,7 +41,9 @@ class TestConfig(TestCase): 'cameras': { 'back': { 'ffmpeg': { - 'input': 'rtsp://10.0.0.1:554/video' + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] } + ] }, 'height': 1080, 'width': 1920 @@ -60,7 +64,9 @@ class TestConfig(TestCase): 'cameras': { 'back': { 'ffmpeg': { - 'input': 'rtsp://10.0.0.1:554/video' + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] } + ] }, 'height': 1080, 'width': 1920, @@ -84,7 +90,9 @@ class TestConfig(TestCase): 'cameras': { 'back': { 'ffmpeg': { - 'input': 'rtsp://10.0.0.1:554/video' + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] } + ] }, 'height': 1080, 'width': 1920 @@ -110,7 +118,9 @@ class TestConfig(TestCase): 'cameras': { 'back': { 'ffmpeg': { - 'input': 'rtsp://10.0.0.1:554/video' + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] } + ] }, 'height': 1080, 'width': 1920 @@ -129,7 +139,9 @@ class TestConfig(TestCase): 'cameras': { 'back': { 'ffmpeg': { - 'input': 'rtsp://10.0.0.1:554/video' + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] } + ] }, 'height': 1080, 'width': 1920, @@ -159,7 +171,9 @@ class TestConfig(TestCase): 'cameras': { 'back': { 'ffmpeg': { - 'input': 'rtsp://10.0.0.1:554/video' + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] } + ] }, 'height': 1080, 'width': 1920, @@ -175,7 +189,7 @@ class TestConfig(TestCase): } } frigate_config = FrigateConfig(config=config) - assert('-re' in frigate_config.cameras['back'].ffmpeg_cmd) + assert('-re' in frigate_config.cameras['back'].ffmpeg_cmds[0]['cmd']) def test_inherit_save_clips_retention(self): config = { @@ -193,7 +207,9 @@ class TestConfig(TestCase): 'cameras': { 'back': { 'ffmpeg': { - 'input': 'rtsp://10.0.0.1:554/video' + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] } + ] }, 'height': 1080, 'width': 1920 @@ -202,6 +218,34 @@ class TestConfig(TestCase): } frigate_config = FrigateConfig(config=config) assert(frigate_config.cameras['back'].save_clips.retain.objects['person'] == 30) + + def test_roles_listed_twice_throws_error(self): + config = { + 'mqtt': { + 'host': 'mqtt' + }, + 'save_clips': { + 'retain': { + 'default': 20, + 'objects': { + 'person': 30 + } + } + }, + 'cameras': { + 'back': { + 'ffmpeg': { + 'inputs': [ + { 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }, + { 'path': 'rtsp://10.0.0.1:554/video2', 'roles': ['detect'] } + ] + }, + 'height': 1080, + 'width': 1920 + } + } + } + self.assertRaises(vol.MultipleInvalid, lambda: FrigateConfig(config=config)) if __name__ == '__main__': main(verbosity=2) diff --git a/frigate/video.py b/frigate/video.py index cf1589072..2a53d97d5 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -72,7 +72,7 @@ def create_tensor_input(frame, region): # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] return np.expand_dims(cropped_frame, axis=0) -def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None): +def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size=None, ffmpeg_process=None): if not ffmpeg_process is None: logger.info("Terminating the existing ffmpeg process...") ffmpeg_process.terminate() @@ -85,9 +85,10 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None): ffmpeg_process.communicate() ffmpeg_process = None - logger.info("Creating ffmpeg process...") - logger.info(" ".join(ffmpeg_cmd)) - process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True) + if frame_size is None: + process = sp.Popen(ffmpeg_cmd, stdout = sp.DEVNULL, stdin = sp.DEVNULL, start_new_session=True) + else: + process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True) return process def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: FrameManager, @@ -138,7 +139,8 @@ class CameraWatchdog(threading.Thread): self.camera_name = camera_name self.config = config self.capture_thread = None - self.ffmpeg_process = None + self.ffmpeg_detect_process = None + self.ffmpeg_other_processes = [] self.camera_fps = camera_fps self.ffmpeg_pid = ffmpeg_pid self.frame_queue = frame_queue @@ -146,31 +148,47 @@ class CameraWatchdog(threading.Thread): self.frame_size = self.frame_shape[0] * self.frame_shape[1] def run(self): - self.start_ffmpeg() + self.start_ffmpeg_detect() + + for c in self.config.ffmpeg_cmds: + if 'detect' in c['roles']: + continue + self.ffmpeg_other_processes.append({ + 'cmd': c['cmd'], + 'process': start_or_restart_ffmpeg(c['cmd']) + }) + time.sleep(10) while True: now = datetime.datetime.now().timestamp() if not self.capture_thread.is_alive(): - self.start_ffmpeg() - elif now - self.capture_thread.current_frame.value > 5: - logger.info(f"No frames received from {self.camera_name} in 5 seconds. Exiting ffmpeg...") - self.ffmpeg_process.terminate() + self.start_ffmpeg_detect() + elif now - self.capture_thread.current_frame.value > 20: + logger.info(f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg...") + self.ffmpeg_detect_process.terminate() try: logger.info("Waiting for ffmpeg to exit gracefully...") - self.ffmpeg_process.communicate(timeout=30) + self.ffmpeg_detect_process.communicate(timeout=30) except sp.TimeoutExpired: logger.info("FFmpeg didnt exit. Force killing...") - self.ffmpeg_process.kill() - self.ffmpeg_process.communicate() + self.ffmpeg_detect_process.kill() + self.ffmpeg_detect_process.communicate() + + for p in self.ffmpeg_other_processes: + poll = p['process'].poll() + if poll == None: + continue + p['process'] = start_or_restart_ffmpeg(p['cmd'], ffmpeg_process=p['process']) # wait a bit before checking again time.sleep(10) - def start_ffmpeg(self): - self.ffmpeg_process = start_or_restart_ffmpeg(self.config.ffmpeg_cmd, self.frame_size) - self.ffmpeg_pid.value = self.ffmpeg_process.pid - self.capture_thread = CameraCapture(self.camera_name, self.ffmpeg_process, self.frame_shape, self.frame_queue, + def start_ffmpeg_detect(self): + ffmpeg_cmd = [c['cmd'] for c in self.config.ffmpeg_cmds if 'detect' in c['roles']][0] + self.ffmpeg_detect_process = start_or_restart_ffmpeg(ffmpeg_cmd, self.frame_size) + self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid + self.capture_thread = CameraCapture(self.camera_name, self.ffmpeg_detect_process, self.frame_shape, self.frame_queue, self.camera_fps) self.capture_thread.start()