blakeblackshear.frigate/frigate/config.py
2021-01-26 21:40:33 -06:00

807 lines
25 KiB
Python

import base64
import json
import os
from typing import Dict
import cv2
import matplotlib.pyplot as plt
import numpy as np
import voluptuous as vol
import yaml
from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
DETECTORS_SCHEMA = vol.Schema(
{
vol.Required(str): {
vol.Required('type', default='edgetpu'): vol.In(['cpu', 'edgetpu']),
vol.Optional('device', default='usb'): str
}
}
)
DEFAULT_DETECTORS = {
'coral': {
'type': 'edgetpu',
'device': 'usb'
}
}
MQTT_SCHEMA = vol.Schema(
{
vol.Required('host'): str,
vol.Optional('port', default=1883): int,
vol.Optional('topic_prefix', default='frigate'): str,
vol.Optional('client_id', default='frigate'): str,
'user': str,
'password': str
}
)
SAVE_CLIPS_RETAIN_SCHEMA = vol.Schema(
{
vol.Required('default',default=10): int,
'objects': {
str: int
}
}
)
SAVE_CLIPS_SCHEMA = vol.Schema(
{
vol.Optional('max_seconds', default=300): int,
vol.Optional('retain', default={}): SAVE_CLIPS_RETAIN_SCHEMA
}
)
FFMPEG_GLOBAL_ARGS_DEFAULT = ['-hide_banner','-loglevel','fatal']
FFMPEG_INPUT_ARGS_DEFAULT = ['-avoid_negative_ts', 'make_zero',
'-fflags', '+genpts+discardcorrupt'
'-rtsp_transport', 'tcp',
'-stimeout', '5000000',
'-use_wallclock_as_timestamps', '1']
DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT = ['-f', 'rawvideo',
'-pix_fmt', 'yuv420p']
RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-c", "copy", "-f", "flv"]
SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "segment", "-segment_time",
"10", "-segment_format", "mp4", "-reset_timestamps", "1", "-strftime",
"1", "-c", "copy", "-an"]
RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "segment", "-segment_time",
"60", "-segment_format", "mp4", "-reset_timestamps", "1", "-strftime",
"1", "-c", "copy", "-an"]
GLOBAL_FFMPEG_SCHEMA = vol.Schema(
{
vol.Optional('global_args', default=FFMPEG_GLOBAL_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('hwaccel_args', default=[]): vol.Any(str, [str]),
vol.Optional('input_args', default=FFMPEG_INPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('output_args', default={}): {
vol.Optional('detect', default=DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('record', default=RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('clips', default=SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('rtmp', default=RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
}
}
)
FILTER_SCHEMA = vol.Schema(
{
str: {
vol.Optional('min_area', default=0): int,
vol.Optional('max_area', default=24000000): int,
vol.Optional('threshold', default=0.85): float
}
}
)
def filters_for_all_tracked_objects(object_config):
for tracked_object in object_config.get('track', ['person']):
if not 'filters' in object_config:
object_config['filters'] = {}
if not tracked_object in object_config['filters']:
object_config['filters'][tracked_object] = {}
return object_config
OBJECTS_SCHEMA = vol.Schema(vol.All(filters_for_all_tracked_objects,
{
vol.Optional('track', default=['person']): [str],
vol.Optional('filters', default = {}): FILTER_SCHEMA.extend({ str: {vol.Optional('min_score', default=0.5): float}})
}
))
DEFAULT_CAMERA_SAVE_CLIPS = {
'enabled': False
}
DEFAULT_CAMERA_SNAPSHOTS = {
'show_timestamp': True,
'draw_zones': False,
'draw_bounding_boxes': True,
'crop_to_region': True
}
def each_role_used_once(inputs):
roles = [role for i in inputs for role in i['roles']]
roles_set = set(roles)
if len(roles) > len(roles_set):
raise ValueError
return inputs
CAMERA_FFMPEG_SCHEMA = vol.Schema(
{
vol.Required('inputs'): vol.All([{
vol.Required('path'): str,
vol.Required('roles'): ['detect', 'clips', 'record', 'rtmp'],
'global_args': vol.Any(str, [str]),
'hwaccel_args': vol.Any(str, [str]),
'input_args': vol.Any(str, [str]),
}], vol.Msg(each_role_used_once, msg="Each input role may only be used once")),
'output_args': {
vol.Optional('detect', default=DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('record', default=RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('clips', default=SAVE_CLIPS_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
vol.Optional('rtmp', default=RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT): vol.Any(str, [str]),
}
}
)
def ensure_zones_and_cameras_have_different_names(cameras):
zones = [zone for camera in cameras.values() for zone in camera['zones'].keys()]
for zone in zones:
if zone in cameras.keys():
raise ValueError
return cameras
CAMERAS_SCHEMA = vol.Schema(vol.All(
{
str: {
vol.Required('ffmpeg'): CAMERA_FFMPEG_SCHEMA,
vol.Required('height'): int,
vol.Required('width'): int,
'fps': int,
'mask': str,
vol.Optional('best_image_timeout', default=60): int,
vol.Optional('zones', default={}): {
str: {
vol.Required('coordinates'): vol.Any(str, [str]),
vol.Optional('filters', default={}): FILTER_SCHEMA
}
},
vol.Optional('save_clips', default=DEFAULT_CAMERA_SAVE_CLIPS): {
vol.Optional('enabled', default=False): bool,
vol.Optional('pre_capture', default=30): int,
'objects': [str],
vol.Optional('retain', default={}): SAVE_CLIPS_RETAIN_SCHEMA,
},
vol.Optional('record', default={}): {
'enabled': bool,
'retain_days': int,
},
vol.Optional('rtmp', default={}): {
vol.Required('enabled', default=True): bool,
},
vol.Optional('snapshots', default=DEFAULT_CAMERA_SNAPSHOTS): {
vol.Optional('show_timestamp', default=True): bool,
vol.Optional('draw_zones', default=False): bool,
vol.Optional('draw_bounding_boxes', default=True): bool,
vol.Optional('crop_to_region', default=True): bool,
vol.Optional('height', default=175): int
},
'objects': OBJECTS_SCHEMA
}
}, vol.Msg(ensure_zones_and_cameras_have_different_names, msg='Zones cannot share names with cameras'))
)
FRIGATE_CONFIG_SCHEMA = vol.Schema(
{
vol.Optional('detectors', default=DEFAULT_DETECTORS): DETECTORS_SCHEMA,
'mqtt': MQTT_SCHEMA,
vol.Optional('logger', default={'default': 'info', 'logs': {}}): {
vol.Optional('default', default='info'): vol.In(['info', 'debug', 'warning', 'error', 'critical']),
vol.Optional('logs', default={}): {str: vol.In(['info', 'debug', 'warning', 'error', 'critical']) }
},
vol.Optional('save_clips', default={}): SAVE_CLIPS_SCHEMA,
vol.Optional('record', default={}): {
vol.Optional('enabled', default=False): bool,
vol.Optional('retain_days', default=30): int,
},
vol.Optional('ffmpeg', default={}): GLOBAL_FFMPEG_SCHEMA,
vol.Optional('objects', default={}): OBJECTS_SCHEMA,
vol.Required('cameras', default={}): CAMERAS_SCHEMA
}
)
class DetectorConfig():
def __init__(self, config):
self._type = config['type']
self._device = config['device']
@property
def type(self):
return self._type
@property
def device(self):
return self._device
def to_dict(self):
return {
'type': self.type,
'device': self.device
}
class LoggerConfig():
def __init__(self, config):
self._default = config['default'].upper()
self._logs = {k: v.upper() for k, v in config['logs'].items()}
@property
def default(self):
return self._default
@property
def logs(self):
return self._logs
def to_dict(self):
return {
'default': self.default,
'logs': self.logs
}
class MqttConfig():
def __init__(self, config):
self._host = config['host']
self._port = config['port']
self._topic_prefix = config['topic_prefix']
self._client_id = config['client_id']
self._user = config.get('user')
self._password = config.get('password')
@property
def host(self):
return self._host
@property
def port(self):
return self._port
@property
def topic_prefix(self):
return self._topic_prefix
@property
def client_id(self):
return self._client_id
@property
def user(self):
return self._user
@property
def password(self):
return self._password
def to_dict(self):
return {
'host': self.host,
'port': self.port,
'topic_prefix': self.topic_prefix,
'client_id': self.client_id,
'user': self.user
}
class CameraInput():
def __init__(self, global_config, ffmpeg_input):
self._path = ffmpeg_input['path']
self._roles = ffmpeg_input['roles']
self._global_args = ffmpeg_input.get('global_args', global_config['global_args'])
self._hwaccel_args = ffmpeg_input.get('hwaccel_args', global_config['hwaccel_args'])
self._input_args = ffmpeg_input.get('input_args', global_config['input_args'])
@property
def path(self):
return self._path
@property
def roles(self):
return self._roles
@property
def global_args(self):
return self._global_args if isinstance(self._global_args, list) else self._global_args.split(' ')
@property
def hwaccel_args(self):
return self._hwaccel_args if isinstance(self._hwaccel_args, list) else self._hwaccel_args.split(' ')
@property
def input_args(self):
return self._input_args if isinstance(self._input_args, list) else self._input_args.split(' ')
class CameraFfmpegConfig():
def __init__(self, global_config, config):
self._inputs = [CameraInput(global_config, i) for i in config['inputs']]
self._output_args = config.get('output_args', global_config['output_args'])
@property
def inputs(self):
return self._inputs
@property
def output_args(self):
return {k: v if isinstance(v, list) else v.split(' ') for k, v in self._output_args.items()}
class SaveClipsRetainConfig():
def __init__(self, global_config, config):
self._default = config.get('default', global_config.get('default'))
self._objects = config.get('objects', global_config.get('objects', {}))
@property
def default(self):
return self._default
@property
def objects(self):
return self._objects
def to_dict(self):
return {
'default': self.default,
'objects': self.objects
}
class SaveClipsConfig():
def __init__(self, config):
self._max_seconds = config['max_seconds']
self._retain = SaveClipsRetainConfig(config['retain'], config['retain'])
@property
def max_seconds(self):
return self._max_seconds
@property
def retain(self):
return self._retain
def to_dict(self):
return {
'max_seconds': self.max_seconds,
'retain': self.retain.to_dict()
}
class RecordConfig():
def __init__(self, global_config, config):
self._enabled = config.get('enabled', global_config['enabled'])
self._retain_days = config.get('retain_days', global_config['retain_days'])
@property
def enabled(self):
return self._enabled
@property
def retain_days(self):
return self._retain_days
def to_dict(self):
return {
'enabled': self.enabled,
'retain_days': self.retain_days,
}
class FilterConfig():
def __init__(self, config):
self._min_area = config['min_area']
self._max_area = config['max_area']
self._threshold = config['threshold']
self._min_score = config.get('min_score')
@property
def min_area(self):
return self._min_area
@property
def max_area(self):
return self._max_area
@property
def threshold(self):
return self._threshold
@property
def min_score(self):
return self._min_score
def to_dict(self):
return {
'min_area': self.min_area,
'max_area': self.max_area,
'threshold': self.threshold,
'min_score': self.min_score
}
class ObjectConfig():
def __init__(self, global_config, config):
self._track = config.get('track', global_config['track'])
if 'filters' in config:
self._filters = { name: FilterConfig(c) for name, c in config['filters'].items() }
else:
self._filters = { name: FilterConfig(c) for name, c in global_config['filters'].items() }
@property
def track(self):
return self._track
@property
def filters(self) -> Dict[str, FilterConfig]:
return self._filters
def to_dict(self):
return {
'track': self.track,
'filters': { k: f.to_dict() for k, f in self.filters.items() }
}
class CameraSnapshotsConfig():
def __init__(self, config):
self._show_timestamp = config['show_timestamp']
self._draw_zones = config['draw_zones']
self._draw_bounding_boxes = config['draw_bounding_boxes']
self._crop_to_region = config['crop_to_region']
self._height = config.get('height')
@property
def show_timestamp(self):
return self._show_timestamp
@property
def draw_zones(self):
return self._draw_zones
@property
def draw_bounding_boxes(self):
return self._draw_bounding_boxes
@property
def crop_to_region(self):
return self._crop_to_region
@property
def height(self):
return self._height
def to_dict(self):
return {
'show_timestamp': self.show_timestamp,
'draw_zones': self.draw_zones,
'draw_bounding_boxes': self.draw_bounding_boxes,
'crop_to_region': self.crop_to_region,
'height': self.height
}
class CameraSaveClipsConfig():
def __init__(self, global_config, config):
self._enabled = config['enabled']
self._pre_capture = config['pre_capture']
self._objects = config.get('objects', global_config['objects']['track'])
self._retain = SaveClipsRetainConfig(global_config['save_clips']['retain'], config['retain'])
@property
def enabled(self):
return self._enabled
@property
def pre_capture(self):
return self._pre_capture
@property
def objects(self):
return self._objects
@property
def retain(self):
return self._retain
def to_dict(self):
return {
'enabled': self.enabled,
'pre_capture': self.pre_capture,
'objects': self.objects,
'retain': self.retain.to_dict()
}
class CameraRtmpConfig():
def __init__(self, global_config, config):
self._enabled = config['enabled']
@property
def enabled(self):
return self._enabled
def to_dict(self):
return {
'enabled': self.enabled,
}
class ZoneConfig():
def __init__(self, name, config):
self._coordinates = config['coordinates']
self._filters = { name: FilterConfig(c) for name, c in config['filters'].items() }
if isinstance(self._coordinates, list):
self._contour = np.array([[int(p.split(',')[0]), int(p.split(',')[1])] for p in self._coordinates])
elif isinstance(self._coordinates, str):
points = self._coordinates.split(',')
self._contour = np.array([[int(points[i]), int(points[i+1])] for i in range(0, len(points), 2)])
else:
print(f"Unable to parse zone coordinates for {name}")
self._contour = np.array([])
self._color = (0,0,0)
@property
def coordinates(self):
return self._coordinates
@property
def contour(self):
return self._contour
@contour.setter
def contour(self, val):
self._contour = val
@property
def color(self):
return self._color
@color.setter
def color(self, val):
self._color = val
@property
def filters(self):
return self._filters
def to_dict(self):
return {
'filters': {k: f.to_dict() for k, f in self.filters.items()}
}
class CameraConfig():
def __init__(self, name, config, global_config):
self._name = name
self._ffmpeg = CameraFfmpegConfig(global_config['ffmpeg'], config['ffmpeg'])
self._height = config.get('height')
self._width = config.get('width')
self._frame_shape = (self._height, self._width)
self._frame_shape_yuv = (self._frame_shape[0]*3//2, self._frame_shape[1])
self._fps = config.get('fps')
self._mask = self._create_mask(config.get('mask'))
self._best_image_timeout = config['best_image_timeout']
self._zones = { name: ZoneConfig(name, z) for name, z in config['zones'].items() }
self._save_clips = CameraSaveClipsConfig(global_config, config['save_clips'])
self._record = RecordConfig(global_config['record'], config['record'])
self._rtmp = CameraRtmpConfig(global_config, config['rtmp'])
self._snapshots = CameraSnapshotsConfig(config['snapshots'])
self._objects = ObjectConfig(global_config['objects'], config.get('objects', {}))
self._ffmpeg_cmds = []
for ffmpeg_input in self._ffmpeg.inputs:
self._ffmpeg_cmds.append({
'roles': ffmpeg_input.roles,
'cmd': self._get_ffmpeg_cmd(ffmpeg_input)
})
self._set_zone_colors(self._zones)
def _create_mask(self, mask):
if mask:
if mask.startswith('base64,'):
img = base64.b64decode(mask[7:])
np_img = np.fromstring(img, dtype=np.uint8)
mask_img = cv2.imdecode(np_img, cv2.IMREAD_GRAYSCALE)
elif mask.startswith('poly,'):
points = mask.split(',')[1:]
contour = np.array([[int(points[i]), int(points[i+1])] for i in range(0, len(points), 2)])
mask_img = np.zeros(self.frame_shape, np.uint8)
mask_img[:] = 255
cv2.fillPoly(mask_img, pts=[contour], color=(0))
else:
mask_img = cv2.imread(f"/config/{mask}", cv2.IMREAD_GRAYSCALE)
else:
mask_img = None
if mask_img is None or mask_img.size == 0:
mask_img = np.zeros(self.frame_shape, np.uint8)
mask_img[:] = 255
return mask_img
def _get_ffmpeg_cmd(self, ffmpeg_input):
ffmpeg_output_args = []
if 'detect' in ffmpeg_input.roles:
ffmpeg_output_args = self.ffmpeg.output_args['detect'] + ffmpeg_output_args + ['pipe:']
if self.fps:
ffmpeg_output_args = ["-r", str(self.fps)] + ffmpeg_output_args
if 'rtmp' in ffmpeg_input.roles and self.rtmp.enabled:
ffmpeg_output_args = self.ffmpeg.output_args['rtmp'] + [
f"rtmp://127.0.0.1/live/{self.name}"
] + ffmpeg_output_args
if 'clips' in ffmpeg_input.roles and self.save_clips.enabled:
ffmpeg_output_args = self.ffmpeg.output_args['clips'] + [
f"{os.path.join(CACHE_DIR, self.name)}-%Y%m%d%H%M%S.mp4"
] + ffmpeg_output_args
if 'record' in ffmpeg_input.roles and self.record.enabled:
ffmpeg_output_args = self.ffmpeg.output_args['record'] + [
f"{os.path.join(RECORD_DIR, self.name)}-%Y%m%d%H%M%S.mp4"
] + ffmpeg_output_args
return (['ffmpeg'] +
ffmpeg_input.global_args +
ffmpeg_input.hwaccel_args +
ffmpeg_input.input_args +
['-i', ffmpeg_input.path] +
ffmpeg_output_args)
def _set_zone_colors(self, zones: Dict[str, ZoneConfig]):
# set colors for zones
all_zone_names = zones.keys()
zone_colors = {}
colors = plt.cm.get_cmap('tab10', len(all_zone_names))
for i, zone in enumerate(all_zone_names):
zone_colors[zone] = tuple(int(round(255 * c)) for c in colors(i)[:3])
for name, zone in zones.items():
zone.color = zone_colors[name]
@property
def name(self):
return self._name
@property
def ffmpeg(self):
return self._ffmpeg
@property
def height(self):
return self._height
@property
def width(self):
return self._width
@property
def fps(self):
return self._fps
@property
def mask(self):
return self._mask
@property
def best_image_timeout(self):
return self._best_image_timeout
@property
def zones(self)-> Dict[str, ZoneConfig]:
return self._zones
@property
def save_clips(self):
return self._save_clips
@property
def record(self):
return self._record
@property
def rtmp(self):
return self._rtmp
@property
def snapshots(self):
return self._snapshots
@property
def objects(self):
return self._objects
@property
def frame_shape(self):
return self._frame_shape
@property
def frame_shape_yuv(self):
return self._frame_shape_yuv
@property
def ffmpeg_cmds(self):
return self._ffmpeg_cmds
def to_dict(self):
return {
'name': self.name,
'height': self.height,
'width': self.width,
'fps': self.fps,
'best_image_timeout': self.best_image_timeout,
'zones': {k: z.to_dict() for k, z in self.zones.items()},
'save_clips': self.save_clips.to_dict(),
'record': self.record.to_dict(),
'rtmp': self.rtmp.to_dict(),
'snapshots': self.snapshots.to_dict(),
'objects': self.objects.to_dict(),
'frame_shape': self.frame_shape,
'ffmpeg_cmds': [{'roles': c['roles'], 'cmd': ' '.join(c['cmd'])} for c in self.ffmpeg_cmds],
}
class FrigateConfig():
def __init__(self, config_file=None, config=None):
if config is None and config_file is None:
raise ValueError('config or config_file must be defined')
elif not config_file is None:
config = self._load_file(config_file)
config = FRIGATE_CONFIG_SCHEMA(config)
config = self._sub_env_vars(config)
self._detectors = { name: DetectorConfig(d) for name, d in config['detectors'].items() }
self._mqtt = MqttConfig(config['mqtt'])
self._save_clips = SaveClipsConfig(config['save_clips'])
self._cameras = { name: CameraConfig(name, c, config) for name, c in config['cameras'].items() }
self._logger = LoggerConfig(config['logger'])
def _sub_env_vars(self, config):
frigate_env_vars = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
if 'password' in config['mqtt']:
config['mqtt']['password'] = config['mqtt']['password'].format(**frigate_env_vars)
for camera in config['cameras'].values():
for i in camera['ffmpeg']['inputs']:
i['path'] = i['path'].format(**frigate_env_vars)
return config
def _load_file(self, config_file):
with open(config_file) as f:
raw_config = f.read()
if config_file.endswith(".yml"):
config = yaml.safe_load(raw_config)
elif config_file.endswith(".json"):
config = json.loads(raw_config)
return config
def to_dict(self):
return {
'detectors': {k: d.to_dict() for k, d in self.detectors.items()},
'mqtt': self.mqtt.to_dict(),
'save_clips': self.save_clips.to_dict(),
'cameras': {k: c.to_dict() for k, c in self.cameras.items()},
'logger': self.logger.to_dict()
}
@property
def detectors(self) -> Dict[str, DetectorConfig]:
return self._detectors
@property
def logger(self):
return self._logger
@property
def mqtt(self):
return self._mqtt
@property
def save_clips(self):
return self._save_clips
@property
def cameras(self) -> Dict[str, CameraConfig]:
return self._cameras