blakeblackshear.frigate/frigate/util/config.py

329 lines
12 KiB
Python
Raw Permalink Normal View History

"""configuration utils."""
import asyncio
import logging
import os
import shutil
from typing import Optional, Union
from ruamel.yaml import YAML
from frigate.const import CONFIG_DIR, EXPORT_DIR
from frigate.util.services import get_video_properties
logger = logging.getLogger(__name__)
CURRENT_CONFIG_VERSION = "0.15-0"
def migrate_frigate_config(config_file: str):
"""handle migrating the frigate config."""
logger.info("Checking if frigate config needs migration...")
if not os.access(config_file, mode=os.W_OK):
logger.error("Config file is read-only, unable to migrate config file.")
return
yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2)
with open(config_file, "r") as f:
config: dict[str, dict[str, any]] = yaml.load(f)
if config is None:
logger.error(f"Failed to load config at {config_file}")
return
previous_version = str(config.get("version", "0.13"))
if previous_version == CURRENT_CONFIG_VERSION:
logger.info("frigate config does not need migration...")
return
logger.info("copying config as backup...")
shutil.copy(config_file, os.path.join(CONFIG_DIR, "backup_config.yaml"))
if previous_version < "0.14":
logger.info(f"Migrating frigate config from {previous_version} to 0.14...")
new_config = migrate_014(config)
with open(config_file, "w") as f:
yaml.dump(new_config, f)
previous_version = "0.14"
logger.info("Migrating export file names...")
if os.path.isdir(EXPORT_DIR):
for file in os.listdir(EXPORT_DIR):
if "@" not in file:
continue
new_name = file.replace("@", "_")
os.rename(
os.path.join(EXPORT_DIR, file), os.path.join(EXPORT_DIR, new_name)
)
if previous_version < "0.15-0":
logger.info(f"Migrating frigate config from {previous_version} to 0.15-0...")
new_config = migrate_015_0(config)
with open(config_file, "w") as f:
yaml.dump(new_config, f)
previous_version = "0.15-0"
logger.info("Finished frigate config migration...")
def migrate_014(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
"""Handle migrating frigate config to 0.14"""
# migrate record.events.required_zones to review.alerts.required_zones
new_config = config.copy()
global_required_zones = (
config.get("record", {}).get("events", {}).get("required_zones", [])
)
if global_required_zones:
# migrate to new review config
if not new_config.get("review"):
new_config["review"] = {}
if not new_config["review"].get("alerts"):
new_config["review"]["alerts"] = {}
if not new_config["review"]["alerts"].get("required_zones"):
new_config["review"]["alerts"]["required_zones"] = global_required_zones
# remove record required zones config
del new_config["record"]["events"]["required_zones"]
# remove record altogether if there is not other config
if not new_config["record"]["events"]:
del new_config["record"]["events"]
if not new_config["record"]:
del new_config["record"]
2024-07-16 15:45:11 +02:00
# Remove UI fields
if new_config.get("ui"):
if new_config["ui"].get("use_experimental"):
2024-08-09 23:59:55 +02:00
del new_config["ui"]["use_experimental"]
2024-07-16 15:45:11 +02:00
if new_config["ui"].get("live_mode"):
del new_config["ui"]["live_mode"]
2024-07-16 15:45:11 +02:00
if not new_config["ui"]:
del new_config["ui"]
# remove rtmp
if new_config.get("ffmpeg", {}).get("output_args", {}).get("rtmp"):
del new_config["ffmpeg"]["output_args"]["rtmp"]
if new_config.get("rtmp"):
del new_config["rtmp"]
for name, camera in config.get("cameras", {}).items():
camera_config: dict[str, dict[str, any]] = camera.copy()
required_zones = (
camera_config.get("record", {}).get("events", {}).get("required_zones", [])
)
if required_zones:
# migrate to new review config
if not camera_config.get("review"):
camera_config["review"] = {}
if not camera_config["review"].get("alerts"):
camera_config["review"]["alerts"] = {}
if not camera_config["review"]["alerts"].get("required_zones"):
camera_config["review"]["alerts"]["required_zones"] = required_zones
# remove record required zones config
del camera_config["record"]["events"]["required_zones"]
# remove record altogether if there is not other config
if not camera_config["record"]["events"]:
del camera_config["record"]["events"]
if not camera_config["record"]:
del camera_config["record"]
# remove rtmp
if camera_config.get("ffmpeg", {}).get("output_args", {}).get("rtmp"):
del camera_config["ffmpeg"]["output_args"]["rtmp"]
if camera_config.get("rtmp"):
del camera_config["rtmp"]
new_config["cameras"][name] = camera_config
new_config["version"] = "0.14"
return new_config
def migrate_015_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
"""Handle migrating frigate config to 0.15-0"""
new_config = config.copy()
# migrate record.events to record.alerts and record.detections
global_record_events = config.get("record", {}).get("events")
if global_record_events:
alerts_retention = {"retain": {}}
detections_retention = {"retain": {}}
if global_record_events.get("pre_capture"):
alerts_retention["pre_capture"] = global_record_events["pre_capture"]
if global_record_events.get("post_capture"):
alerts_retention["post_capture"] = global_record_events["post_capture"]
if global_record_events.get("retain", {}).get("default"):
alerts_retention["retain"]["days"] = global_record_events["retain"][
"default"
]
# decide logical detections retention based on current detections config
if not config.get("review", {}).get("alerts", {}).get(
"required_zones"
) or config.get("review", {}).get("detections"):
if global_record_events.get("pre_capture"):
detections_retention["pre_capture"] = global_record_events[
"pre_capture"
]
if global_record_events.get("post_capture"):
detections_retention["post_capture"] = global_record_events[
"post_capture"
]
if global_record_events.get("retain", {}).get("default"):
detections_retention["retain"]["days"] = global_record_events["retain"][
"default"
]
else:
continuous_days = config.get("record", {}).get("retain", {}).get("days")
detections_retention["retain"]["days"] = (
continuous_days if continuous_days else 1
)
new_config["record"]["alerts"] = alerts_retention
new_config["record"]["detections"] = detections_retention
del new_config["record"]["events"]
for name, camera in config.get("cameras", {}).items():
camera_config: dict[str, dict[str, any]] = camera.copy()
record_events: dict[str, any] = camera_config.get("record", {}).get("events")
if record_events:
alerts_retention = {"retain": {}}
detections_retention = {"retain": {}}
if record_events.get("pre_capture"):
alerts_retention["pre_capture"] = record_events["pre_capture"]
if record_events.get("post_capture"):
alerts_retention["post_capture"] = record_events["post_capture"]
if record_events.get("retain", {}).get("default"):
alerts_retention["retain"]["days"] = record_events["retain"]["default"]
# decide logical detections retention based on current detections config
if not camera_config.get("review", {}).get("alerts", {}).get(
"required_zones"
) or camera_config.get("review", {}).get("detections"):
if record_events.get("pre_capture"):
detections_retention["pre_capture"] = record_events["pre_capture"]
if record_events.get("post_capture"):
detections_retention["post_capture"] = record_events["post_capture"]
if record_events.get("retain", {}).get("default"):
detections_retention["retain"]["days"] = record_events["retain"][
"default"
]
else:
continuous_days = (
camera_config.get("record", {}).get("retain", {}).get("days")
)
detections_retention["retain"]["days"] = (
continuous_days if continuous_days else 1
)
camera_config["record"]["alerts"] = alerts_retention
camera_config["record"]["detections"] = detections_retention
del camera_config["record"]["events"]
new_config["cameras"][name] = camera_config
new_config["version"] = "0.15-0"
return new_config
def get_relative_coordinates(
mask: Optional[Union[str, list]], frame_shape: tuple[int, int]
) -> Union[str, list]:
# masks and zones are saved as relative coordinates
# we know if any points are > 1 then it is using the
# old native resolution coordinates
if mask:
if isinstance(mask, list) and any(x > "1.0" for x in mask[0].split(",")):
relative_masks = []
for m in mask:
points = m.split(",")
2024-04-29 17:58:53 +02:00
if any(x > "1.0" for x in points):
rel_points = []
for i in range(0, len(points), 2):
x = int(points[i])
y = int(points[i + 1])
if x > frame_shape[1] or y > frame_shape[0]:
logger.error(
f"Not applying mask due to invalid coordinates. {x},{y} is outside of the detection resolution {frame_shape[1]}x{frame_shape[0]}. Use the editor in the UI to correct the mask."
)
continue
rel_points.append(
f"{round(x / frame_shape[1], 3)},{round(y / frame_shape[0], 3)}"
2024-04-29 17:58:53 +02:00
)
relative_masks.append(",".join(rel_points))
2024-04-29 17:58:53 +02:00
else:
relative_masks.append(m)
mask = relative_masks
elif isinstance(mask, str) and any(x > "1.0" for x in mask.split(",")):
points = mask.split(",")
rel_points = []
for i in range(0, len(points), 2):
x = int(points[i])
y = int(points[i + 1])
if x > frame_shape[1] or y > frame_shape[0]:
logger.error(
f"Not applying mask due to invalid coordinates. {x},{y} is outside of the detection resolution {frame_shape[1]}x{frame_shape[0]}. Use the editor in the UI to correct the mask."
)
return []
rel_points.append(
f"{round(x / frame_shape[1], 3)},{round(y / frame_shape[0], 3)}"
)
mask = ",".join(rel_points)
return mask
return mask
class StreamInfoRetriever:
def __init__(self) -> None:
self.stream_cache: dict[str, tuple[int, int]] = {}
def get_stream_info(self, ffmpeg, path: str) -> str:
if path in self.stream_cache:
return self.stream_cache[path]
info = asyncio.run(get_video_properties(ffmpeg, path))
self.stream_cache[path] = info
return info