add multiple streams per camera

This commit is contained in:
Blake Blackshear 2020-11-29 15:55:53 -06:00
parent d98751102a
commit 4e0cf3681e
4 changed files with 211 additions and 115 deletions

View File

@ -56,22 +56,30 @@ SAVE_CLIPS_SCHEMA = vol.Schema(
FFMPEG_GLOBAL_ARGS_DEFAULT = ['-hide_banner','-loglevel','panic']
FFMPEG_INPUT_ARGS_DEFAULT = ['-avoid_negative_ts', 'make_zero',
'-fflags', 'nobuffer',
'-flags', 'low_delay',
'-strict', 'experimental',
'-fflags', '+genpts+discardcorrupt',
'-rtsp_transport', 'tcp',
'-stimeout', '5000000',
'-use_wallclock_as_timestamps', '1']
FFMPEG_OUTPUT_ARGS_DEFAULT = ['-f', 'rawvideo',
DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT = ['-f', 'rawvideo',
'-pix_fmt', 'yuv420p']
RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-c", "copy", "-f", "flv"]
SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "segment", "-segment_time",
"10", "-segment_format", "mp4", "-reset_timestamps", "1", "-strftime",
"1", "-c", "copy", "-an"]
RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "segment", "-segment_time",
"60", "-segment_format", "mp4", "-reset_timestamps", "1", "-strftime",
"1", "-c", "copy", "-an"]
GLOBAL_FFMPEG_SCHEMA = vol.Schema(
{
vol.Optional('global_args', default=FFMPEG_GLOBAL_ARGS_DEFAULT): [str],
vol.Optional('hwaccel_args', default=[]): [str],
vol.Optional('input_args', default=FFMPEG_INPUT_ARGS_DEFAULT): [str],
vol.Optional('output_args', default=FFMPEG_OUTPUT_ARGS_DEFAULT): [str]
vol.Optional('global_args', default=FFMPEG_GLOBAL_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('hwaccel_args', default=[]): vol.Any(str, [str]),
vol.Optional('input_args', default=FFMPEG_INPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('output_args', default={}): {
vol.Optional('detect', default=DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('record', default=RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('clips', default=SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('rtmp', default=RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
}
}
)
@ -111,13 +119,28 @@ DEFAULT_CAMERA_SNAPSHOTS = {
'crop_to_region': True
}
def each_role_used_once(inputs):
roles = [role for i in inputs for role in i['roles']]
roles_set = set(roles)
if len(roles) > len(roles_set):
raise ValueError
return inputs
CAMERA_FFMPEG_SCHEMA = vol.Schema(
{
vol.Required('input'): str,
'global_args': [str],
'hwaccel_args': [str],
'input_args': [str],
'output_args': [str]
vol.Required('inputs'): vol.All([{
vol.Required('path'): str,
vol.Required('roles'): ['detect', 'clips', 'record', 'rtmp'],
'global_args': vol.Any(str, [str]),
'hwaccel_args': vol.Any(str, [str]),
'input_args': vol.Any(str, [str]),
}], vol.Msg(each_role_used_once, msg="Each input role may only be used once")),
'output_args': {
vol.Optional('detect', default=DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('record', default=RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('clips', default=SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('rtmp', default=RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
}
}
)
@ -140,10 +163,10 @@ CAMERAS_SCHEMA = vol.Schema(
vol.Optional('enabled', default=False): bool,
vol.Optional('pre_capture', default=30): int,
'objects': [str],
vol.Optional('retain', default={}): SAVE_CLIPS_RETAIN_SCHEMA
vol.Optional('retain', default={}): SAVE_CLIPS_RETAIN_SCHEMA,
},
vol.Optional('rtmp', default={}): {
vol.Required('enabled', default=True): bool
vol.Required('enabled', default=True): bool,
},
vol.Optional('snapshots', default=DEFAULT_CAMERA_SNAPSHOTS): {
vol.Optional('show_timestamp', default=True): bool,
@ -230,6 +253,47 @@ class MqttConfig():
'user': self.user
}
class CameraInput():
def __init__(self, global_config, ffmpeg_input):
self._path = ffmpeg_input['path']
self._roles = ffmpeg_input['roles']
self._global_args = ffmpeg_input.get('global_args', global_config['global_args'])
self._hwaccel_args = ffmpeg_input.get('hwaccel_args', global_config['hwaccel_args'])
self._input_args = ffmpeg_input.get('input_args', global_config['input_args'])
@property
def path(self):
return self._path
@property
def roles(self):
return self._roles
@property
def global_args(self):
return self._global_args if isinstance(self._global_args, list) else self._global_args.split(' ')
@property
def hwaccel_args(self):
return self._hwaccel_args if isinstance(self._hwaccel_args, list) else self._hwaccel_args.split(' ')
@property
def input_args(self):
return self._input_args if isinstance(self._input_args, list) else self._input_args.split(' ')
class CameraFfmpegConfig():
def __init__(self, global_config, config):
self._inputs = [CameraInput(global_config, i) for i in config['inputs']]
self._output_args = config.get('output_args', global_config['output_args'])
@property
def inputs(self):
return self._inputs
@property
def output_args(self):
return {k: v if isinstance(v, list) else v.split(' ') for k, v in self._output_args.items()}
class SaveClipsRetainConfig():
def __init__(self, global_config, config):
self._default = config.get('default', global_config.get('default'))
@ -280,34 +344,6 @@ class SaveClipsConfig():
'retain': self.retain.to_dict()
}
class FfmpegConfig():
def __init__(self, global_config, config):
self._input = config.get('input')
self._global_args = config.get('global_args', global_config['global_args'])
self._hwaccel_args = config.get('hwaccel_args', global_config['hwaccel_args'])
self._input_args = config.get('input_args', global_config['input_args'])
self._output_args = config.get('output_args', global_config['output_args'])
@property
def input(self):
return self._input
@property
def global_args(self):
return self._global_args
@property
def hwaccel_args(self):
return self._hwaccel_args
@property
def input_args(self):
return self._input_args
@property
def output_args(self):
return self._output_args
class FilterConfig():
def __init__(self, config):
self._min_area = config['min_area']
@ -403,7 +439,7 @@ class CameraSaveClipsConfig():
self._enabled = config['enabled']
self._pre_capture = config['pre_capture']
self._objects = config.get('objects')
self._retain = SaveClipsRetainConfig(global_config['retain'], config['retain'])
self._retain = SaveClipsRetainConfig(global_config['save_clips']['retain'], config['retain'])
@property
def enabled(self):
@ -428,8 +464,9 @@ class CameraSaveClipsConfig():
'objects': self.objects,
'retain': self.retain.to_dict()
}
class CameraRtmpConfig():
def __init__(self, config):
def __init__(self, global_config, config):
self._enabled = config['enabled']
@property
@ -438,7 +475,7 @@ class CameraRtmpConfig():
def to_dict(self):
return {
'enabled': self.enabled
'enabled': self.enabled,
}
class ZoneConfig():
@ -489,7 +526,7 @@ class ZoneConfig():
class CameraConfig():
def __init__(self, name, config, cache_dir, global_config):
self._name = name
self._ffmpeg = FfmpegConfig(global_config['ffmpeg'], config['ffmpeg'])
self._ffmpeg = CameraFfmpegConfig(global_config['ffmpeg'], config['ffmpeg'])
self._height = config.get('height')
self._width = config.get('width')
self._frame_shape = (self._height, self._width)
@ -498,12 +535,18 @@ class CameraConfig():
self._mask = self._create_mask(config.get('mask'))
self._best_image_timeout = config['best_image_timeout']
self._zones = { name: ZoneConfig(name, z) for name, z in config['zones'].items() }
self._save_clips = CameraSaveClipsConfig(global_config['save_clips'], config['save_clips'])
self._rtmp = CameraRtmpConfig(config['rtmp'])
self._save_clips = CameraSaveClipsConfig(global_config, config['save_clips'])
self._rtmp = CameraRtmpConfig(global_config, config['rtmp'])
self._snapshots = CameraSnapshotsConfig(config['snapshots'])
self._objects = ObjectConfig(global_config['objects'], config.get('objects', {}))
self._ffmpeg_cmd = self._get_ffmpeg_cmd(cache_dir)
self._ffmpeg_cmds = []
for ffmpeg_input in self._ffmpeg.inputs:
self._ffmpeg_cmds.append({
'roles': ffmpeg_input.roles,
'cmd': self._get_ffmpeg_cmd(ffmpeg_input, cache_dir)
})
self._set_zone_colors(self._zones)
@ -530,42 +573,31 @@ class CameraConfig():
return mask_img
def _get_ffmpeg_cmd(self, cache_dir):
ffmpeg_output_args = self.ffmpeg.output_args
if self.fps:
ffmpeg_output_args = ["-r", str(self.fps)] + ffmpeg_output_args
if self.rtmp.enabled:
ffmpeg_output_args = [
"-c",
"copy",
"-f",
"flv",
def _get_ffmpeg_cmd(self, ffmpeg_input, cache_dir):
ffmpeg_output_args = []
# TODO: ensure output args exist for each role and each role is only used once
if 'detect' in ffmpeg_input.roles:
ffmpeg_output_args = self.ffmpeg.output_args['detect'] + ffmpeg_output_args + ['pipe:']
if self.fps:
ffmpeg_output_args = ["-r", str(self.fps)] + ffmpeg_output_args
if 'rtmp' in ffmpeg_input.roles and self.rtmp.enabled:
ffmpeg_output_args = self.ffmpeg.output_args['rtmp'] + [
f"rtmp://127.0.0.1/live/{self.name}"
] + ffmpeg_output_args
if self.save_clips.enabled:
ffmpeg_output_args = [
"-f",
"segment",
"-segment_time",
"10",
"-segment_format",
"mp4",
"-reset_timestamps",
"1",
"-strftime",
"1",
"-c",
"copy",
"-an",
if 'clips' in ffmpeg_input.roles and self.save_clips.enabled:
ffmpeg_output_args = self.ffmpeg.output_args['clips'] + [
f"{os.path.join(cache_dir, self.name)}-%Y%m%d%H%M%S.mp4"
] + ffmpeg_output_args
# if 'record' in ffmpeg_input.roles and self.save_clips.enabled:
# ffmpeg_output_args = self.ffmpeg.output_args['record'] + [
# f"{os.path.join(cache_dir, self.name)}-%Y%m%d%H%M%S.mp4"
# ] + ffmpeg_output_args
return (['ffmpeg'] +
self.ffmpeg.global_args +
self.ffmpeg.hwaccel_args +
self.ffmpeg.input_args +
['-i', self.ffmpeg.input] +
ffmpeg_output_args +
['pipe:'])
ffmpeg_input.global_args +
ffmpeg_input.hwaccel_args +
ffmpeg_input.input_args +
['-i', ffmpeg_input.path] +
ffmpeg_output_args)
def _set_zone_colors(self, zones: Dict[str, ZoneConfig]):
# set colors for zones
@ -635,8 +667,8 @@ class CameraConfig():
return self._frame_shape_yuv
@property
def ffmpeg_cmd(self):
return self._ffmpeg_cmd
def ffmpeg_cmds(self):
return self._ffmpeg_cmds
def to_dict(self):
return {
@ -651,7 +683,7 @@ class CameraConfig():
'snapshots': self.snapshots.to_dict(),
'objects': self.objects.to_dict(),
'frame_shape': self.frame_shape,
'ffmpeg_cmd': " ".join(self.ffmpeg_cmd),
'ffmpeg_cmds': [{'roles': c['roles'], 'cmd': ' '.join(c['cmd'])} for c in self.ffmpeg_cmds],
}
@ -680,7 +712,8 @@ class FrigateConfig():
config['mqtt']['password'] = config['mqtt']['password'].format(**frigate_env_vars)
for camera in config['cameras'].values():
camera['ffmpeg']['input'] = camera['ffmpeg']['input'].format(**frigate_env_vars)
for i in camera['ffmpeg']['inputs']:
i['path'] = i['path'].format(**frigate_env_vars)
return config

View File

@ -36,10 +36,11 @@ class EventProcessor(threading.Thread):
cached_files = os.listdir(self.cache_dir)
files_in_use = []
for process_data in self.camera_processes.values():
for process in psutil.process_iter():
if process.name() != 'ffmpeg':
continue
try:
ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_pid'].value)
flist = ffmpeg_process.open_files()
flist = process.open_files()
if flist:
for nt in flist:
if nt.path.startswith(self.cache_dir):

View File

@ -12,7 +12,9 @@ class TestConfig(TestCase):
'cameras': {
'back': {
'ffmpeg': {
'input': 'rtsp://10.0.0.1:554/video'
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920
@ -39,7 +41,9 @@ class TestConfig(TestCase):
'cameras': {
'back': {
'ffmpeg': {
'input': 'rtsp://10.0.0.1:554/video'
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920
@ -60,7 +64,9 @@ class TestConfig(TestCase):
'cameras': {
'back': {
'ffmpeg': {
'input': 'rtsp://10.0.0.1:554/video'
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920,
@ -84,7 +90,9 @@ class TestConfig(TestCase):
'cameras': {
'back': {
'ffmpeg': {
'input': 'rtsp://10.0.0.1:554/video'
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920
@ -110,7 +118,9 @@ class TestConfig(TestCase):
'cameras': {
'back': {
'ffmpeg': {
'input': 'rtsp://10.0.0.1:554/video'
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920
@ -129,7 +139,9 @@ class TestConfig(TestCase):
'cameras': {
'back': {
'ffmpeg': {
'input': 'rtsp://10.0.0.1:554/video'
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920,
@ -159,7 +171,9 @@ class TestConfig(TestCase):
'cameras': {
'back': {
'ffmpeg': {
'input': 'rtsp://10.0.0.1:554/video'
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920,
@ -175,7 +189,7 @@ class TestConfig(TestCase):
}
}
frigate_config = FrigateConfig(config=config)
assert('-re' in frigate_config.cameras['back'].ffmpeg_cmd)
assert('-re' in frigate_config.cameras['back'].ffmpeg_cmds[0]['cmd'])
def test_inherit_save_clips_retention(self):
config = {
@ -193,7 +207,9 @@ class TestConfig(TestCase):
'cameras': {
'back': {
'ffmpeg': {
'input': 'rtsp://10.0.0.1:554/video'
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920
@ -202,6 +218,34 @@ class TestConfig(TestCase):
}
frigate_config = FrigateConfig(config=config)
assert(frigate_config.cameras['back'].save_clips.retain.objects['person'] == 30)
def test_roles_listed_twice_throws_error(self):
config = {
'mqtt': {
'host': 'mqtt'
},
'save_clips': {
'retain': {
'default': 20,
'objects': {
'person': 30
}
}
},
'cameras': {
'back': {
'ffmpeg': {
'inputs': [
{ 'path': 'rtsp://10.0.0.1:554/video', 'roles': ['detect'] },
{ 'path': 'rtsp://10.0.0.1:554/video2', 'roles': ['detect'] }
]
},
'height': 1080,
'width': 1920
}
}
}
self.assertRaises(vol.MultipleInvalid, lambda: FrigateConfig(config=config))
if __name__ == '__main__':
main(verbosity=2)

View File

@ -72,7 +72,7 @@ def create_tensor_input(frame, region):
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
return np.expand_dims(cropped_frame, axis=0)
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size=None, ffmpeg_process=None):
if not ffmpeg_process is None:
logger.info("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate()
@ -85,9 +85,10 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
ffmpeg_process.communicate()
ffmpeg_process = None
logger.info("Creating ffmpeg process...")
logger.info(" ".join(ffmpeg_cmd))
process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True)
if frame_size is None:
process = sp.Popen(ffmpeg_cmd, stdout = sp.DEVNULL, stdin = sp.DEVNULL, start_new_session=True)
else:
process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True)
return process
def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: FrameManager,
@ -138,7 +139,8 @@ class CameraWatchdog(threading.Thread):
self.camera_name = camera_name
self.config = config
self.capture_thread = None
self.ffmpeg_process = None
self.ffmpeg_detect_process = None
self.ffmpeg_other_processes = []
self.camera_fps = camera_fps
self.ffmpeg_pid = ffmpeg_pid
self.frame_queue = frame_queue
@ -146,31 +148,47 @@ class CameraWatchdog(threading.Thread):
self.frame_size = self.frame_shape[0] * self.frame_shape[1]
def run(self):
self.start_ffmpeg()
self.start_ffmpeg_detect()
for c in self.config.ffmpeg_cmds:
if 'detect' in c['roles']:
continue
self.ffmpeg_other_processes.append({
'cmd': c['cmd'],
'process': start_or_restart_ffmpeg(c['cmd'])
})
time.sleep(10)
while True:
now = datetime.datetime.now().timestamp()
if not self.capture_thread.is_alive():
self.start_ffmpeg()
elif now - self.capture_thread.current_frame.value > 5:
logger.info(f"No frames received from {self.camera_name} in 5 seconds. Exiting ffmpeg...")
self.ffmpeg_process.terminate()
self.start_ffmpeg_detect()
elif now - self.capture_thread.current_frame.value > 20:
logger.info(f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg...")
self.ffmpeg_detect_process.terminate()
try:
logger.info("Waiting for ffmpeg to exit gracefully...")
self.ffmpeg_process.communicate(timeout=30)
self.ffmpeg_detect_process.communicate(timeout=30)
except sp.TimeoutExpired:
logger.info("FFmpeg didnt exit. Force killing...")
self.ffmpeg_process.kill()
self.ffmpeg_process.communicate()
self.ffmpeg_detect_process.kill()
self.ffmpeg_detect_process.communicate()
for p in self.ffmpeg_other_processes:
poll = p['process'].poll()
if poll == None:
continue
p['process'] = start_or_restart_ffmpeg(p['cmd'], ffmpeg_process=p['process'])
# wait a bit before checking again
time.sleep(10)
def start_ffmpeg(self):
self.ffmpeg_process = start_or_restart_ffmpeg(self.config.ffmpeg_cmd, self.frame_size)
self.ffmpeg_pid.value = self.ffmpeg_process.pid
self.capture_thread = CameraCapture(self.camera_name, self.ffmpeg_process, self.frame_shape, self.frame_queue,
def start_ffmpeg_detect(self):
ffmpeg_cmd = [c['cmd'] for c in self.config.ffmpeg_cmds if 'detect' in c['roles']][0]
self.ffmpeg_detect_process = start_or_restart_ffmpeg(ffmpeg_cmd, self.frame_size)
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCapture(self.camera_name, self.ffmpeg_detect_process, self.frame_shape, self.frame_queue,
self.camera_fps)
self.capture_thread.start()