mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-21 19:07:46 +01:00
initial conversion to pydantic
This commit is contained in:
parent
dada764d2c
commit
c664bd63f6
@ -36,8 +36,8 @@ RUN apt-get -qq update \
|
|||||||
|
|
||||||
RUN pip3 install \
|
RUN pip3 install \
|
||||||
peewee_migrate \
|
peewee_migrate \
|
||||||
|
pydantic \
|
||||||
zeroconf \
|
zeroconf \
|
||||||
voluptuous\
|
|
||||||
ws4py
|
ws4py
|
||||||
|
|
||||||
COPY --from=nginx /usr/local/nginx/ /usr/local/nginx/
|
COPY --from=nginx /usr/local/nginx/ /usr/local/nginx/
|
||||||
|
@ -13,7 +13,7 @@ from peewee_migrate import Router
|
|||||||
from playhouse.sqlite_ext import SqliteExtDatabase
|
from playhouse.sqlite_ext import SqliteExtDatabase
|
||||||
from playhouse.sqliteq import SqliteQueueDatabase
|
from playhouse.sqliteq import SqliteQueueDatabase
|
||||||
|
|
||||||
from frigate.config import FrigateConfig
|
from frigate.config import DetectorTypeEnum, FrigateConfig
|
||||||
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
|
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
|
||||||
from frigate.edgetpu import EdgeTPUProcess
|
from frigate.edgetpu import EdgeTPUProcess
|
||||||
from frigate.events import EventCleanup, EventProcessor
|
from frigate.events import EventCleanup, EventProcessor
|
||||||
@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
|
|||||||
class FrigateApp:
|
class FrigateApp:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.stop_event = mp.Event()
|
self.stop_event = mp.Event()
|
||||||
|
self.base_config: FrigateConfig = None
|
||||||
self.config: FrigateConfig = None
|
self.config: FrigateConfig = None
|
||||||
self.detection_queue = mp.Queue()
|
self.detection_queue = mp.Queue()
|
||||||
self.detectors: Dict[str, EdgeTPUProcess] = {}
|
self.detectors: Dict[str, EdgeTPUProcess] = {}
|
||||||
@ -65,7 +66,8 @@ class FrigateApp:
|
|||||||
|
|
||||||
def init_config(self):
|
def init_config(self):
|
||||||
config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
|
config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
|
||||||
self.config = FrigateConfig(config_file=config_file)
|
user_config = FrigateConfig.parse_file(config_file)
|
||||||
|
self.config = user_config.runtime_config
|
||||||
|
|
||||||
for camera_name in self.config.cameras.keys():
|
for camera_name in self.config.cameras.keys():
|
||||||
# create camera_metrics
|
# create camera_metrics
|
||||||
@ -116,9 +118,9 @@ class FrigateApp:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def set_log_levels(self):
|
def set_log_levels(self):
|
||||||
logging.getLogger().setLevel(self.config.logger.default)
|
logging.getLogger().setLevel(self.config.logger.default.value.upper())
|
||||||
for log, level in self.config.logger.logs.items():
|
for log, level in self.config.logger.logs.items():
|
||||||
logging.getLogger(log).setLevel(level)
|
logging.getLogger(log).setLevel(level.value.upper())
|
||||||
|
|
||||||
if not "werkzeug" in self.config.logger.logs:
|
if not "werkzeug" in self.config.logger.logs:
|
||||||
logging.getLogger("werkzeug").setLevel("ERROR")
|
logging.getLogger("werkzeug").setLevel("ERROR")
|
||||||
@ -183,9 +185,9 @@ class FrigateApp:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
shm_in = mp.shared_memory.SharedMemory(
|
shm_in = mp.shared_memory.SharedMemory(
|
||||||
name=name,
|
name=name,
|
||||||
create=True,
|
create=True,
|
||||||
size=self.config.model.height*self.config.model.width * 3,
|
size=self.config.model.height * self.config.model.width * 3,
|
||||||
)
|
)
|
||||||
except FileExistsError:
|
except FileExistsError:
|
||||||
shm_in = mp.shared_memory.SharedMemory(name=name)
|
shm_in = mp.shared_memory.SharedMemory(name=name)
|
||||||
@ -201,7 +203,7 @@ class FrigateApp:
|
|||||||
self.detection_shms.append(shm_out)
|
self.detection_shms.append(shm_out)
|
||||||
|
|
||||||
for name, detector in self.config.detectors.items():
|
for name, detector in self.config.detectors.items():
|
||||||
if detector.type == "cpu":
|
if detector.type == DetectorTypeEnum.cpu:
|
||||||
self.detectors[name] = EdgeTPUProcess(
|
self.detectors[name] = EdgeTPUProcess(
|
||||||
name,
|
name,
|
||||||
self.detection_queue,
|
self.detection_queue,
|
||||||
@ -210,7 +212,7 @@ class FrigateApp:
|
|||||||
"cpu",
|
"cpu",
|
||||||
detector.num_threads,
|
detector.num_threads,
|
||||||
)
|
)
|
||||||
if detector.type == "edgetpu":
|
if detector.type == DetectorTypeEnum.edgetpu:
|
||||||
self.detectors[name] = EdgeTPUProcess(
|
self.detectors[name] = EdgeTPUProcess(
|
||||||
name,
|
name,
|
||||||
self.detection_queue,
|
self.detection_queue,
|
||||||
|
1517
frigate/config.py
1517
frigate/config.py
File diff suppressed because it is too large
Load Diff
@ -272,7 +272,14 @@ def events():
|
|||||||
|
|
||||||
@bp.route("/config")
|
@bp.route("/config")
|
||||||
def config():
|
def config():
|
||||||
return jsonify(current_app.frigate_config.to_dict())
|
return jsonify(current_app.frigate_config.dict())
|
||||||
|
|
||||||
|
|
||||||
|
@bp.route("/config/schema")
|
||||||
|
def config_schema():
|
||||||
|
return current_app.response_class(
|
||||||
|
current_app.frigate_config.schema_json(), mimetype="application/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@bp.route("/version")
|
@bp.route("/version")
|
||||||
|
@ -2,6 +2,7 @@ import cv2
|
|||||||
import imutils
|
import imutils
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from frigate.config import MotionConfig
|
from frigate.config import MotionConfig
|
||||||
|
from frigate.util import create_mask
|
||||||
|
|
||||||
|
|
||||||
class MotionDetector:
|
class MotionDetector:
|
||||||
@ -18,7 +19,7 @@ class MotionDetector:
|
|||||||
self.motion_frame_count = 0
|
self.motion_frame_count = 0
|
||||||
self.frame_counter = 0
|
self.frame_counter = 0
|
||||||
resized_mask = cv2.resize(
|
resized_mask = cv2.resize(
|
||||||
config.mask,
|
create_mask(frame_shape, config.mask),
|
||||||
dsize=(self.motion_frame_size[1], self.motion_frame_size[0]),
|
dsize=(self.motion_frame_size[1], self.motion_frame_size[0]),
|
||||||
interpolation=cv2.INTER_LINEAR,
|
interpolation=cv2.INTER_LINEAR,
|
||||||
)
|
)
|
||||||
|
@ -272,6 +272,7 @@ class TrackedObject:
|
|||||||
best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA
|
best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA
|
||||||
)
|
)
|
||||||
if timestamp:
|
if timestamp:
|
||||||
|
color = self.camera_config.timestamp_style.color
|
||||||
draw_timestamp(
|
draw_timestamp(
|
||||||
best_frame,
|
best_frame,
|
||||||
self.thumbnail_data["frame_time"],
|
self.thumbnail_data["frame_time"],
|
||||||
@ -279,7 +280,7 @@ class TrackedObject:
|
|||||||
font_effect=self.camera_config.timestamp_style.effect,
|
font_effect=self.camera_config.timestamp_style.effect,
|
||||||
font_scale=self.camera_config.timestamp_style.scale,
|
font_scale=self.camera_config.timestamp_style.scale,
|
||||||
font_thickness=self.camera_config.timestamp_style.thickness,
|
font_thickness=self.camera_config.timestamp_style.thickness,
|
||||||
font_color=self.camera_config.timestamp_style.color,
|
font_color=(color.red, color.green, color.blue),
|
||||||
position=self.camera_config.timestamp_style.position,
|
position=self.camera_config.timestamp_style.position,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -318,7 +319,7 @@ class CameraState:
|
|||||||
def __init__(self, name, config, frame_manager):
|
def __init__(self, name, config, frame_manager):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.config = config
|
self.config = config
|
||||||
self.camera_config = config.cameras[name]
|
self.camera_config: CameraConfig = config.cameras[name]
|
||||||
self.frame_manager = frame_manager
|
self.frame_manager = frame_manager
|
||||||
self.best_objects: Dict[str, TrackedObject] = {}
|
self.best_objects: Dict[str, TrackedObject] = {}
|
||||||
self.object_counts = defaultdict(int)
|
self.object_counts = defaultdict(int)
|
||||||
@ -328,6 +329,7 @@ class CameraState:
|
|||||||
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
|
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
|
||||||
self.current_frame_lock = threading.Lock()
|
self.current_frame_lock = threading.Lock()
|
||||||
self.current_frame_time = 0.0
|
self.current_frame_time = 0.0
|
||||||
|
self.motion_mask = self.camera_config.motion.mask
|
||||||
self.motion_boxes = []
|
self.motion_boxes = []
|
||||||
self.regions = []
|
self.regions = []
|
||||||
self.previous_frame_id = None
|
self.previous_frame_id = None
|
||||||
@ -389,7 +391,7 @@ class CameraState:
|
|||||||
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
|
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
|
||||||
|
|
||||||
if draw_options.get("mask"):
|
if draw_options.get("mask"):
|
||||||
mask_overlay = np.where(self.camera_config.motion.mask == [0])
|
mask_overlay = np.where(self.motion_mask == [0])
|
||||||
frame_copy[mask_overlay] = [0, 0, 0]
|
frame_copy[mask_overlay] = [0, 0, 0]
|
||||||
|
|
||||||
if draw_options.get("motion_boxes"):
|
if draw_options.get("motion_boxes"):
|
||||||
|
@ -21,7 +21,7 @@ from ws4py.server.wsgirefserver import (
|
|||||||
from ws4py.server.wsgiutils import WebSocketWSGIApplication
|
from ws4py.server.wsgiutils import WebSocketWSGIApplication
|
||||||
from ws4py.websocket import WebSocket
|
from ws4py.websocket import WebSocket
|
||||||
|
|
||||||
from frigate.config import FrigateConfig
|
from frigate.config import BirdseyeModeEnum, FrigateConfig
|
||||||
from frigate.util import SharedMemoryFrameManager, copy_yuv_to_position, get_yuv_crop
|
from frigate.util import SharedMemoryFrameManager, copy_yuv_to_position, get_yuv_crop
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -173,13 +173,16 @@ class BirdsEyeFrameManager:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def camera_active(self, object_box_count, motion_box_count):
|
def camera_active(self, object_box_count, motion_box_count):
|
||||||
if self.mode == "continuous":
|
if self.mode == BirdseyeModeEnum.continuous:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if self.mode == "motion" and object_box_count + motion_box_count > 0:
|
if (
|
||||||
|
self.mode == BirdseyeModeEnum.motion
|
||||||
|
and object_box_count + motion_box_count > 0
|
||||||
|
):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if self.mode == "objects" and object_box_count > 0:
|
if self.mode == BirdseyeModeEnum.objects and object_box_count > 0:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def update_frame(self):
|
def update_frame(self):
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
import json
|
import unittest
|
||||||
from unittest import TestCase, main
|
import numpy as np
|
||||||
import voluptuous as vol
|
from pydantic import ValidationError
|
||||||
from frigate.config import FRIGATE_CONFIG_SCHEMA, FrigateConfig
|
from frigate.config import (
|
||||||
|
FrigateConfig,
|
||||||
|
DetectorTypeEnum,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestConfig(TestCase):
|
class TestConfig(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.minimal = {
|
self.minimal = {
|
||||||
"mqtt": {"host": "mqtt"},
|
"mqtt": {"host": "mqtt"},
|
||||||
@ -21,14 +24,30 @@ class TestConfig(TestCase):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
def test_empty(self):
|
|
||||||
FRIGATE_CONFIG_SCHEMA({})
|
|
||||||
|
|
||||||
def test_minimal(self):
|
|
||||||
FRIGATE_CONFIG_SCHEMA(self.minimal)
|
|
||||||
|
|
||||||
def test_config_class(self):
|
def test_config_class(self):
|
||||||
FrigateConfig(config=self.minimal)
|
frigate_config = FrigateConfig(**self.minimal)
|
||||||
|
assert self.minimal == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "coral" in runtime_config.detectors.keys()
|
||||||
|
assert runtime_config.detectors["coral"].type == DetectorTypeEnum.edgetpu
|
||||||
|
|
||||||
|
def test_invalid_mqtt_config(self):
|
||||||
|
config = {
|
||||||
|
"mqtt": {"host": "mqtt", "user": "test"},
|
||||||
|
"cameras": {
|
||||||
|
"back": {
|
||||||
|
"ffmpeg": {
|
||||||
|
"inputs": [
|
||||||
|
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"height": 1080,
|
||||||
|
"width": 1920,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
self.assertRaises(ValidationError, lambda: FrigateConfig(**config))
|
||||||
|
|
||||||
def test_inherit_tracked_objects(self):
|
def test_inherit_tracked_objects(self):
|
||||||
config = {
|
config = {
|
||||||
@ -46,8 +65,11 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "dog" in frigate_config.cameras["back"].objects.track
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "dog" in runtime_config.cameras["back"].objects.track
|
||||||
|
|
||||||
def test_override_tracked_objects(self):
|
def test_override_tracked_objects(self):
|
||||||
config = {
|
config = {
|
||||||
@ -66,8 +88,11 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "cat" in frigate_config.cameras["back"].objects.track
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "cat" in runtime_config.cameras["back"].objects.track
|
||||||
|
|
||||||
def test_default_object_filters(self):
|
def test_default_object_filters(self):
|
||||||
config = {
|
config = {
|
||||||
@ -85,8 +110,11 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "dog" in frigate_config.cameras["back"].objects.filters
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "dog" in runtime_config.cameras["back"].objects.filters
|
||||||
|
|
||||||
def test_inherit_object_filters(self):
|
def test_inherit_object_filters(self):
|
||||||
config = {
|
config = {
|
||||||
@ -107,9 +135,12 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "dog" in frigate_config.cameras["back"].objects.filters
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
assert frigate_config.cameras["back"].objects.filters["dog"].threshold == 0.7
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "dog" in runtime_config.cameras["back"].objects.filters
|
||||||
|
assert runtime_config.cameras["back"].objects.filters["dog"].threshold == 0.7
|
||||||
|
|
||||||
def test_override_object_filters(self):
|
def test_override_object_filters(self):
|
||||||
config = {
|
config = {
|
||||||
@ -130,9 +161,12 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "dog" in frigate_config.cameras["back"].objects.filters
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
assert frigate_config.cameras["back"].objects.filters["dog"].threshold == 0.7
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "dog" in runtime_config.cameras["back"].objects.filters
|
||||||
|
assert runtime_config.cameras["back"].objects.filters["dog"].threshold == 0.7
|
||||||
|
|
||||||
def test_global_object_mask(self):
|
def test_global_object_mask(self):
|
||||||
config = {
|
config = {
|
||||||
@ -154,12 +188,14 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "dog" in frigate_config.cameras["back"].objects.filters
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
assert len(frigate_config.cameras["back"].objects.filters["dog"].raw_mask) == 2
|
|
||||||
assert (
|
runtime_config = frigate_config.runtime_config
|
||||||
len(frigate_config.cameras["back"].objects.filters["person"].raw_mask) == 1
|
back_camera = runtime_config.cameras["back"]
|
||||||
)
|
assert "dog" in back_camera.objects.filters
|
||||||
|
assert len(back_camera.objects.filters["dog"].raw_mask) == 2
|
||||||
|
assert len(back_camera.objects.filters["person"].raw_mask) == 1
|
||||||
|
|
||||||
def test_ffmpeg_params_global(self):
|
def test_ffmpeg_params_global(self):
|
||||||
config = {
|
config = {
|
||||||
@ -181,8 +217,11 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "-re" in frigate_config.cameras["back"].ffmpeg_cmds[0]["cmd"]
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "-re" in runtime_config.cameras["back"].ffmpeg_cmds[0]["cmd"]
|
||||||
|
|
||||||
def test_ffmpeg_params_camera(self):
|
def test_ffmpeg_params_camera(self):
|
||||||
config = {
|
config = {
|
||||||
@ -204,8 +243,11 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "-re" in frigate_config.cameras["back"].ffmpeg_cmds[0]["cmd"]
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "-re" in runtime_config.cameras["back"].ffmpeg_cmds[0]["cmd"]
|
||||||
|
|
||||||
def test_ffmpeg_params_input(self):
|
def test_ffmpeg_params_input(self):
|
||||||
config = {
|
config = {
|
||||||
@ -230,8 +272,11 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert "-re" in frigate_config.cameras["back"].ffmpeg_cmds[0]["cmd"]
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert "-re" in runtime_config.cameras["back"].ffmpeg_cmds[0]["cmd"]
|
||||||
|
|
||||||
def test_inherit_clips_retention(self):
|
def test_inherit_clips_retention(self):
|
||||||
config = {
|
config = {
|
||||||
@ -249,8 +294,11 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
frigate_config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert frigate_config.cameras["back"].clips.retain.objects["person"] == 30
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert runtime_config.cameras["back"].clips.retain.objects["person"] == 30
|
||||||
|
|
||||||
def test_roles_listed_twice_throws_error(self):
|
def test_roles_listed_twice_throws_error(self):
|
||||||
config = {
|
config = {
|
||||||
@ -269,7 +317,7 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
self.assertRaises(vol.MultipleInvalid, lambda: FrigateConfig(config=config))
|
self.assertRaises(ValidationError, lambda: FrigateConfig(**config))
|
||||||
|
|
||||||
def test_zone_matching_camera_name_throws_error(self):
|
def test_zone_matching_camera_name_throws_error(self):
|
||||||
config = {
|
config = {
|
||||||
@ -288,7 +336,33 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
self.assertRaises(vol.MultipleInvalid, lambda: FrigateConfig(config=config))
|
self.assertRaises(ValidationError, lambda: FrigateConfig(**config))
|
||||||
|
|
||||||
|
def test_zone_assigns_color_and_contour(self):
|
||||||
|
config = {
|
||||||
|
"mqtt": {"host": "mqtt"},
|
||||||
|
"clips": {"retain": {"default": 20, "objects": {"person": 30}}},
|
||||||
|
"cameras": {
|
||||||
|
"back": {
|
||||||
|
"ffmpeg": {
|
||||||
|
"inputs": [
|
||||||
|
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"height": 1080,
|
||||||
|
"width": 1920,
|
||||||
|
"zones": {"test": {"coordinates": "1,1,1,1,1,1"}},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
frigate_config = FrigateConfig(**config)
|
||||||
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
assert isinstance(
|
||||||
|
runtime_config.cameras["back"].zones["test"].contour, np.ndarray
|
||||||
|
)
|
||||||
|
assert runtime_config.cameras["back"].zones["test"].color != (0, 0, 0)
|
||||||
|
|
||||||
def test_clips_should_default_to_global_objects(self):
|
def test_clips_should_default_to_global_objects(self):
|
||||||
config = {
|
config = {
|
||||||
@ -308,11 +382,16 @@ class TestConfig(TestCase):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
config = FrigateConfig(config=config)
|
frigate_config = FrigateConfig(**config)
|
||||||
assert config.cameras["back"].clips.objects is None
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
back_camera = runtime_config.cameras["back"]
|
||||||
|
assert back_camera.clips.objects is None
|
||||||
|
assert back_camera.clips.retain.objects["person"] == 30
|
||||||
|
|
||||||
def test_role_assigned_but_not_enabled(self):
|
def test_role_assigned_but_not_enabled(self):
|
||||||
json_config = {
|
config = {
|
||||||
"mqtt": {"host": "mqtt"},
|
"mqtt": {"host": "mqtt"},
|
||||||
"cameras": {
|
"cameras": {
|
||||||
"back": {
|
"back": {
|
||||||
@ -331,11 +410,14 @@ class TestConfig(TestCase):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
config = FrigateConfig(config=json_config)
|
frigate_config = FrigateConfig(**config)
|
||||||
ffmpeg_cmds = config.cameras["back"].ffmpeg_cmds
|
assert config == frigate_config.dict(exclude_unset=True)
|
||||||
|
|
||||||
|
runtime_config = frigate_config.runtime_config
|
||||||
|
ffmpeg_cmds = runtime_config.cameras["back"].ffmpeg_cmds
|
||||||
assert len(ffmpeg_cmds) == 1
|
assert len(ffmpeg_cmds) == 1
|
||||||
assert not "clips" in ffmpeg_cmds[0]["roles"]
|
assert not "clips" in ffmpeg_cmds[0]["roles"]
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main(verbosity=2)
|
unittest.main(verbosity=2)
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import collections
|
import collections
|
||||||
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
@ -20,6 +21,29 @@ import numpy as np
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def deep_merge(dct1: dict, dct2: dict, override=False) -> dict:
|
||||||
|
"""
|
||||||
|
:param dct1: First dict to merge
|
||||||
|
:param dct2: Second dict to merge
|
||||||
|
:param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True)
|
||||||
|
:return: The merge dictionary
|
||||||
|
"""
|
||||||
|
merged = copy.deepcopy(dct1)
|
||||||
|
for k, v2 in dct2.items():
|
||||||
|
if k in merged:
|
||||||
|
v1 = merged[k]
|
||||||
|
if isinstance(v1, dict) and isinstance(v2, collections.Mapping):
|
||||||
|
merged[k] = deep_merge(v1, v2, override)
|
||||||
|
elif isinstance(v1, list) and isinstance(v2, list):
|
||||||
|
merged[k] = v1 + v2
|
||||||
|
else:
|
||||||
|
if override:
|
||||||
|
merged[k] = copy.deepcopy(v2)
|
||||||
|
else:
|
||||||
|
merged[k] = copy.deepcopy(v2)
|
||||||
|
return merged
|
||||||
|
|
||||||
|
|
||||||
def draw_timestamp(
|
def draw_timestamp(
|
||||||
frame,
|
frame,
|
||||||
timestamp,
|
timestamp,
|
||||||
|
Loading…
Reference in New Issue
Block a user