blakeblackshear.frigate/frigate/config.py

598 lines
17 KiB
Python
Raw Normal View History

2020-11-03 15:15:58 +01:00
import base64
import json
import os
import yaml
from typing import Dict
import cv2
import matplotlib.pyplot as plt
import numpy as np
2020-11-01 13:17:44 +01:00
import voluptuous as vol
2020-11-03 15:15:58 +01:00
from frigate.util import get_frame_shape
2020-11-01 13:17:44 +01:00
DETECTORS_SCHEMA = vol.Schema(
{
vol.Required(str): {
vol.Required('type', default='edgetpu'): vol.In(['cpu', 'edgetpu']),
vol.Optional('device', default='usb'): str
}
}
)
DEFAULT_DETECTORS = {
'coral': {
'type': 'edgetpu',
'device': 'usb'
}
}
MQTT_SCHEMA = vol.Schema(
{
vol.Required('host'): str,
vol.Optional('port', default=1883): int,
vol.Optional('topic_prefix', default='frigate'): str,
vol.Optional('client_id', default='frigate'): str,
'user': str,
'password': str
}
)
SAVE_CLIPS_SCHEMA = vol.Schema(
{
vol.Optional('max_seconds', default=300): int,
vol.Optional('clips_dir', default='/media/frigate/clips'): str,
vol.Optional('cache_dir', default='/tmp/cache'): str
2020-11-01 13:17:44 +01:00
}
)
FFMPEG_GLOBAL_ARGS_DEFAULT = ['-hide_banner','-loglevel','panic']
FFMPEG_INPUT_ARGS_DEFAULT = ['-avoid_negative_ts', 'make_zero',
'-fflags', 'nobuffer',
'-flags', 'low_delay',
'-strict', 'experimental',
'-fflags', '+genpts+discardcorrupt',
'-rtsp_transport', 'tcp',
'-stimeout', '5000000',
'-use_wallclock_as_timestamps', '1']
FFMPEG_OUTPUT_ARGS_DEFAULT = ['-f', 'rawvideo',
'-pix_fmt', 'yuv420p']
GLOBAL_FFMPEG_SCHEMA = vol.Schema(
{
vol.Optional('global_args', default=FFMPEG_GLOBAL_ARGS_DEFAULT): [str],
vol.Optional('hwaccel_args', default=[]): [str],
vol.Optional('input_args', default=FFMPEG_INPUT_ARGS_DEFAULT): [str],
vol.Optional('output_args', default=FFMPEG_OUTPUT_ARGS_DEFAULT): [str]
}
)
FILTER_SCHEMA = vol.Schema(
{
str: {
vol.Optional('min_area', default=0): int,
vol.Optional('max_area', default=24000000): int,
vol.Optional('threshold', default=0.85): float
}
}
)
2020-11-03 15:15:58 +01:00
def filters_for_all_tracked_objects(object_config):
for tracked_object in object_config.get('track', ['person']):
if not 'filters' in object_config:
object_config['filters'] = {}
if not tracked_object in object_config['filters']:
object_config['filters'][tracked_object] = {}
return object_config
OBJECTS_SCHEMA = vol.Schema(vol.All(filters_for_all_tracked_objects,
2020-11-01 13:17:44 +01:00
{
vol.Optional('track', default=['person']): [str],
2020-11-03 15:15:58 +01:00
# TODO: this should populate filters for all tracked objects
vol.Optional('filters', default = {}): FILTER_SCHEMA.extend({ str: {vol.Optional('min_score', default=0.5): float}})
2020-11-01 13:17:44 +01:00
}
2020-11-03 15:15:58 +01:00
))
2020-11-01 13:17:44 +01:00
DEFAULT_CAMERA_MQTT = {
'crop_to_region': True
}
DEFAULT_CAMERA_SAVE_CLIPS = {
'enabled': False
}
DEFAULT_CAMERA_SNAPSHOTS = {
'show_timestamp': True,
'draw_zones': False,
'draw_bounding_boxes': True
}
CAMERA_FFMPEG_SCHEMA = vol.Schema(
{
vol.Required('input'): str,
'global_args': [str],
'hwaccel_args': [str],
'input_args': [str],
'output_args': [str]
}
)
CAMERAS_SCHEMA = vol.Schema(
{
str: {
vol.Required('ffmpeg'): CAMERA_FFMPEG_SCHEMA,
2020-11-03 15:15:58 +01:00
vol.Required('height'): int,
vol.Required('width'): int,
2020-11-01 13:17:44 +01:00
'fps': int,
'mask': str,
vol.Optional('best_image_timeout', default=60): int,
vol.Optional('mqtt', default=DEFAULT_CAMERA_MQTT): {
vol.Optional('crop_to_region', default=True): bool,
'snapshot_height': int
},
vol.Optional('zones', default={}): {
str: {
vol.Required('coordinates'): vol.Any(str, [str]),
'filters': FILTER_SCHEMA
}
},
vol.Optional('save_clips', default=DEFAULT_CAMERA_SAVE_CLIPS): {
vol.Optional('enabled', default=False): bool,
vol.Optional('pre_capture', default=30): int,
'objects': [str],
},
vol.Optional('snapshots', default=DEFAULT_CAMERA_SNAPSHOTS): {
vol.Optional('show_timestamp', default=True): bool,
vol.Optional('draw_zones', default=False): bool,
vol.Optional('draw_bounding_boxes', default=True): bool
},
'objects': OBJECTS_SCHEMA
}
}
)
FRIGATE_CONFIG_SCHEMA = vol.Schema(
{
vol.Optional('web_port', default=5000): int,
vol.Optional('detectors', default=DEFAULT_DETECTORS): DETECTORS_SCHEMA,
'mqtt': MQTT_SCHEMA,
vol.Optional('save_clips', default={}): SAVE_CLIPS_SCHEMA,
vol.Optional('ffmpeg', default={}): GLOBAL_FFMPEG_SCHEMA,
vol.Optional('objects', default={}): OBJECTS_SCHEMA,
vol.Required('cameras', default={}): CAMERAS_SCHEMA
}
)
2020-11-03 15:15:58 +01:00
class DetectorConfig():
def __init__(self, config):
self._type = config['type']
self._device = config['device']
@property
def type(self):
return self._type
@property
def device(self):
return self._device
class MqttConfig():
def __init__(self, config):
self._host = config['host']
self._port = config['port']
self._topic_prefix = config['topic_prefix']
self._client_id = config['client_id']
self._user = config.get('user')
self._password = config.get('password')
@property
def host(self):
return self._host
@property
def port(self):
return self._port
@property
def topic_prefix(self):
return self._topic_prefix
@property
def client_id(self):
return self._client_id
@property
def user(self):
return self._user
@property
def password(self):
return self._password
class SaveClipsConfig():
def __init__(self, config):
self._max_seconds = config['max_seconds']
self._clips_dir = config['clips_dir']
self._cache_dir = config['cache_dir']
@property
def max_seconds(self):
return self._max_seconds
@property
def clips_dir(self):
return self._clips_dir
@property
def cache_dir(self):
return self._cache_dir
class FfmpegConfig():
def __init__(self, global_config, config):
self._input = config.get('input')
self._global_args = config.get('global_args', global_config['global_args'])
self._hwaccel_args = config.get('hwaccel_args', global_config['hwaccel_args'])
self._input_args = config.get('input_args', global_config['input_args'])
self._output_args = config.get('output_args', global_config['output_args'])
@property
def input(self):
return self._input
@property
def global_args(self):
return self._global_args
@property
def hwaccel_args(self):
return self._hwaccel_args
@property
def input_args(self):
return self._input_args
@property
def output_args(self):
return self._output_args
class FilterConfig():
def __init__(self, config):
self._min_area = config['min_area']
self._max_area = config['max_area']
self._threshold = config['threshold']
self._min_score = config.get('min_score')
@property
def min_area(self):
return self._min_area
@property
def max_area(self):
return self._max_area
@property
def threshold(self):
return self._threshold
@property
def min_score(self):
return self._min_score
class ObjectConfig():
def __init__(self, global_config, config):
self._track = config.get('track', global_config['track'])
if 'filters' in config:
self._filters = { name: FilterConfig(c) for name, c in config['filters'].items() }
else:
self._filters = { name: FilterConfig(c) for name, c in global_config['filters'].items() }
@property
def track(self):
return self._track
@property
def filters(self) -> Dict[str, FilterConfig]:
return self._filters
class CameraSnapshotsConfig():
def __init__(self, config):
self._show_timestamp = config['show_timestamp']
self._draw_zones = config['draw_zones']
self._draw_bounding_boxes = config['draw_bounding_boxes']
@property
def show_timestamp(self):
return self._show_timestamp
@property
def draw_zones(self):
return self._draw_zones
@property
def draw_bounding_boxes(self):
return self._draw_bounding_boxes
class CameraSaveClipsConfig():
def __init__(self, config):
self._enabled = config['enabled']
self._pre_capture = config['pre_capture']
self._objects = config.get('objects')
@property
def enabled(self):
return self._enabled
@property
def pre_capture(self):
return self._pre_capture
@property
def objects(self):
return self._objects
class CameraMqttConfig():
def __init__(self, config):
self._crop_to_region = config['crop_to_region']
self._snapshot_height = config.get('snapshot_height')
@property
def crop_to_region(self):
return self._crop_to_region
@property
def snapshot_height(self):
return self._snapshot_height
class ZoneConfig():
def __init__(self, name, config):
self._coordinates = config['coordinates']
self._filters = { name: FilterConfig(c) for name, c in config['filters'].items() }
if isinstance(self._coordinates, list):
self._contour = np.array([[int(p.split(',')[0]), int(p.split(',')[1])] for p in self._coordinates])
elif isinstance(self._coordinates, str):
points = self._coordinates.split(',')
self._contour = np.array([[int(points[i]), int(points[i+1])] for i in range(0, len(points), 2)])
else:
print(f"Unable to parse zone coordinates for {name}")
self._contour = np.array([])
self._color = (0,0,0)
@property
def coordinates(self):
return self._coordinates
@property
def contour(self):
return self._contour
@contour.setter
def contour(self, val):
self._contour = val
@property
def color(self):
return self._color
@color.setter
def color(self, val):
self._color = val
@property
def filters(self):
return self._filters
class CameraConfig():
def __init__(self, name, config, cache_dir, global_ffmpeg, global_objects):
self._name = name
self._ffmpeg = FfmpegConfig(global_ffmpeg, config['ffmpeg'])
self._height = config.get('height')
self._width = config.get('width')
self._frame_shape = (self._height, self._width)
self._frame_shape_yuv = (self._frame_shape[0]*3//2, self._frame_shape[1])
self._fps = config.get('fps')
self._mask = self._create_mask(config.get('mask'))
self._best_image_timeout = config['best_image_timeout']
self._mqtt = CameraMqttConfig(config['mqtt'])
self._zones = { name: ZoneConfig(name, z) for name, z in config['zones'].items() }
self._save_clips = CameraSaveClipsConfig(config['save_clips'])
self._snapshots = CameraSnapshotsConfig(config['snapshots'])
self._objects = ObjectConfig(global_objects, config.get('objects', {}))
self._ffmpeg_cmd = self._get_ffmpeg_cmd(cache_dir)
self._set_zone_colors(self._zones)
def _create_mask(self, mask):
if mask:
if mask.startswith('base64,'):
img = base64.b64decode(mask[7:])
np_img = np.fromstring(img, dtype=np.uint8)
mask_img = cv2.imdecode(np_img, cv2.IMREAD_GRAYSCALE)
elif mask.startswith('poly,'):
points = mask.split(',')[1:]
contour = np.array([[int(points[i]), int(points[i+1])] for i in range(0, len(points), 2)])
mask_img = np.zeros(self.frame_shape, np.uint8)
mask_img[:] = 255
cv2.fillPoly(mask_img, pts=[contour], color=(0))
else:
mask_img = cv2.imread(f"/config/{mask}", cv2.IMREAD_GRAYSCALE)
else:
mask_img = None
if mask_img is None or mask_img.size == 0:
mask_img = np.zeros(self.frame_shape, np.uint8)
mask_img[:] = 255
return mask_img
def _get_ffmpeg_cmd(self, cache_dir):
ffmpeg_output_args = self.ffmpeg.output_args
if self.fps:
ffmpeg_output_args = ["-r", str(self.fps)] + ffmpeg_output_args
if self.save_clips.enabled:
ffmpeg_output_args = [
"-f",
"segment",
"-segment_time",
"10",
"-segment_format",
"mp4",
"-reset_timestamps",
"1",
"-strftime",
"1",
"-c",
"copy",
"-an",
f"{os.path.join(cache_dir, self.name)}-%Y%m%d%H%M%S.mp4"
] + ffmpeg_output_args
return (['ffmpeg'] +
self.ffmpeg.global_args +
self.ffmpeg.hwaccel_args +
self.ffmpeg.input_args +
['-i', self.ffmpeg.input] +
ffmpeg_output_args +
['pipe:'])
def _set_zone_colors(self, zones: Dict[str, ZoneConfig]):
# set colors for zones
all_zone_names = zones.keys()
zone_colors = {}
colors = plt.cm.get_cmap('tab10', len(all_zone_names))
for i, zone in enumerate(all_zone_names):
zone_colors[zone] = tuple(int(round(255 * c)) for c in colors(i)[:3])
for name, zone in zones.items():
zone.color = zone_colors[name]
@property
def name(self):
return self._name
@property
def ffmpeg(self):
return self._ffmpeg
@property
def height(self):
return self._height
@property
def width(self):
return self._width
@property
def fps(self):
return self._fps
@property
def mask(self):
return self._mask
@property
def best_image_timeout(self):
return self._best_image_timeout
@property
def mqtt(self):
return self._mqtt
@property
def zones(self)-> Dict[str, ZoneConfig]:
return self._zones
@property
def save_clips(self):
return self._save_clips
@property
def snapshots(self):
return self._snapshots
@property
def objects(self):
return self._objects
@property
def frame_shape(self):
return self._frame_shape
@property
def frame_shape_yuv(self):
return self._frame_shape_yuv
@property
def ffmpeg_cmd(self):
return self._ffmpeg_cmd
class FrigateConfig():
def __init__(self, config_file=None, config=None):
if config is None and config_file is None:
raise ValueError('config or config_file must be defined')
elif not config_file is None:
config = self._load_file(config_file)
config = FRIGATE_CONFIG_SCHEMA(config)
config = self._sub_env_vars(config)
self._web_port = config['web_port']
self._detectors = { name: DetectorConfig(d) for name, d in config['detectors'].items() }
self._mqtt = MqttConfig(config['mqtt'])
self._save_clips = SaveClipsConfig(config['save_clips'])
self._cameras = { name: CameraConfig(name, c, self._save_clips.cache_dir, config['ffmpeg'], config['objects']) for name, c in config['cameras'].items() }
self._ensure_dirs()
def _sub_env_vars(self, config):
frigate_env_vars = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
if 'password' in config['mqtt']:
config['mqtt']['password'] = config['mqtt']['password'].format(**frigate_env_vars)
for camera in config['cameras'].values():
camera['ffmpeg']['input'] = camera['ffmpeg']['input'].format(**frigate_env_vars)
return config
def _ensure_dirs(self):
cache_dir = self.save_clips.cache_dir
clips_dir = self.save_clips.clips_dir
if not os.path.exists(cache_dir) and not os.path.islink(cache_dir):
os.makedirs(cache_dir)
if not os.path.exists(clips_dir) and not os.path.islink(clips_dir):
os.makedirs(clips_dir)
def _load_file(self, config_file):
with open(config_file) as f:
raw_config = f.read()
if config_file.endswith(".yml"):
config = yaml.safe_load(raw_config)
elif config_file.endswith(".json"):
config = json.loads(raw_config)
return config
@property
def web_port(self):
return self._web_port
@property
def detectors(self) -> Dict[str, DetectorConfig]:
return self._detectors
@property
def mqtt(self):
return self._mqtt
@property
def save_clips(self):
return self._save_clips
@property
def cameras(self) -> Dict[str, CameraConfig]:
return self._cameras