blakeblackshear.frigate/frigate/util/config.py
Nicolas Mowen 5ff476c6f9
Configurable ffmpeg (#13722)
* Install multiple ffmpeg versions and add config to make it configurable

* Update docs

* Run ffprobe too

* Cleanup

* Apply config to go2rtc as well

* Fix ffmpeg bin

* Docs

* Restore path

* Cleanup env var

* Fix ffmpeg path for encoding

* Fix export

* Formatting
2024-09-13 15:14:51 -05:00

324 lines
12 KiB
Python

"""configuration utils."""
import asyncio
import logging
import os
import shutil
from typing import Optional, Union
from ruamel.yaml import YAML
from frigate.const import CONFIG_DIR, EXPORT_DIR
from frigate.util.services import get_video_properties
logger = logging.getLogger(__name__)
CURRENT_CONFIG_VERSION = "0.15-0"
def migrate_frigate_config(config_file: str):
"""handle migrating the frigate config."""
logger.info("Checking if frigate config needs migration...")
if not os.access(config_file, mode=os.W_OK):
logger.error("Config file is read-only, unable to migrate config file.")
return
yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2)
with open(config_file, "r") as f:
config: dict[str, dict[str, any]] = yaml.load(f)
previous_version = str(config.get("version", "0.13"))
if previous_version == CURRENT_CONFIG_VERSION:
logger.info("frigate config does not need migration...")
return
logger.info("copying config as backup...")
shutil.copy(config_file, os.path.join(CONFIG_DIR, "backup_config.yaml"))
if previous_version < "0.14":
logger.info(f"Migrating frigate config from {previous_version} to 0.14...")
new_config = migrate_014(config)
with open(config_file, "w") as f:
yaml.dump(new_config, f)
previous_version = "0.14"
logger.info("Migrating export file names...")
for file in os.listdir(EXPORT_DIR):
if "@" not in file:
continue
new_name = file.replace("@", "_")
os.rename(
os.path.join(EXPORT_DIR, file), os.path.join(EXPORT_DIR, new_name)
)
if previous_version < "0.15-0":
logger.info(f"Migrating frigate config from {previous_version} to 0.15-0...")
new_config = migrate_015_0(config)
with open(config_file, "w") as f:
yaml.dump(new_config, f)
previous_version = "0.15-0"
logger.info("Finished frigate config migration...")
def migrate_014(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
"""Handle migrating frigate config to 0.14"""
# migrate record.events.required_zones to review.alerts.required_zones
new_config = config.copy()
global_required_zones = (
config.get("record", {}).get("events", {}).get("required_zones", [])
)
if global_required_zones:
# migrate to new review config
if not new_config.get("review"):
new_config["review"] = {}
if not new_config["review"].get("alerts"):
new_config["review"]["alerts"] = {}
if not new_config["review"]["alerts"].get("required_zones"):
new_config["review"]["alerts"]["required_zones"] = global_required_zones
# remove record required zones config
del new_config["record"]["events"]["required_zones"]
# remove record altogether if there is not other config
if not new_config["record"]["events"]:
del new_config["record"]["events"]
if not new_config["record"]:
del new_config["record"]
# Remove UI fields
if new_config.get("ui"):
if new_config["ui"].get("use_experimental"):
del new_config["ui"]["use_experimental"]
if new_config["ui"].get("live_mode"):
del new_config["ui"]["live_mode"]
if not new_config["ui"]:
del new_config["ui"]
# remove rtmp
if new_config.get("ffmpeg", {}).get("output_args", {}).get("rtmp"):
del new_config["ffmpeg"]["output_args"]["rtmp"]
if new_config.get("rtmp"):
del new_config["rtmp"]
for name, camera in config.get("cameras", {}).items():
camera_config: dict[str, dict[str, any]] = camera.copy()
required_zones = (
camera_config.get("record", {}).get("events", {}).get("required_zones", [])
)
if required_zones:
# migrate to new review config
if not camera_config.get("review"):
camera_config["review"] = {}
if not camera_config["review"].get("alerts"):
camera_config["review"]["alerts"] = {}
if not camera_config["review"]["alerts"].get("required_zones"):
camera_config["review"]["alerts"]["required_zones"] = required_zones
# remove record required zones config
del camera_config["record"]["events"]["required_zones"]
# remove record altogether if there is not other config
if not camera_config["record"]["events"]:
del camera_config["record"]["events"]
if not camera_config["record"]:
del camera_config["record"]
# remove rtmp
if camera_config.get("ffmpeg", {}).get("output_args", {}).get("rtmp"):
del camera_config["ffmpeg"]["output_args"]["rtmp"]
if camera_config.get("rtmp"):
del camera_config["rtmp"]
new_config["cameras"][name] = camera_config
new_config["version"] = "0.14"
return new_config
def migrate_015_0(config: dict[str, dict[str, any]]) -> dict[str, dict[str, any]]:
"""Handle migrating frigate config to 0.15-0"""
new_config = config.copy()
# migrate record.events to record.alerts and record.detections
global_record_events = config.get("record", {}).get("events")
if global_record_events:
alerts_retention = {"retain": {}}
detections_retention = {"retain": {}}
if global_record_events.get("pre_capture"):
alerts_retention["pre_capture"] = global_record_events["pre_capture"]
if global_record_events.get("post_capture"):
alerts_retention["post_capture"] = global_record_events["post_capture"]
if global_record_events.get("retain", {}).get("default"):
alerts_retention["retain"]["days"] = global_record_events["retain"][
"default"
]
# decide logical detections retention based on current detections config
if not config.get("review", {}).get("alerts", {}).get(
"required_zones"
) or config.get("review", {}).get("detections"):
if global_record_events.get("pre_capture"):
detections_retention["pre_capture"] = global_record_events[
"pre_capture"
]
if global_record_events.get("post_capture"):
detections_retention["post_capture"] = global_record_events[
"post_capture"
]
if global_record_events.get("retain", {}).get("default"):
detections_retention["retain"]["days"] = global_record_events["retain"][
"default"
]
else:
continuous_days = config.get("record", {}).get("retain", {}).get("days")
detections_retention["retain"]["days"] = (
continuous_days if continuous_days else 1
)
new_config["record"]["alerts"] = alerts_retention
new_config["record"]["detections"] = detections_retention
del new_config["record"]["events"]
for name, camera in config.get("cameras", {}).items():
camera_config: dict[str, dict[str, any]] = camera.copy()
record_events: dict[str, any] = camera_config.get("record", {}).get("events")
if record_events:
alerts_retention = {"retain": {}}
detections_retention = {"retain": {}}
if record_events.get("pre_capture"):
alerts_retention["pre_capture"] = record_events["pre_capture"]
if record_events.get("post_capture"):
alerts_retention["post_capture"] = record_events["post_capture"]
if record_events.get("retain", {}).get("default"):
alerts_retention["retain"]["days"] = record_events["retain"]["default"]
# decide logical detections retention based on current detections config
if not camera_config.get("review", {}).get("alerts", {}).get(
"required_zones"
) or camera_config.get("review", {}).get("detections"):
if record_events.get("pre_capture"):
detections_retention["pre_capture"] = record_events["pre_capture"]
if record_events.get("post_capture"):
detections_retention["post_capture"] = record_events["post_capture"]
if record_events.get("retain", {}).get("default"):
detections_retention["retain"]["days"] = record_events["retain"][
"default"
]
else:
continuous_days = (
camera_config.get("record", {}).get("retain", {}).get("days")
)
detections_retention["retain"]["days"] = (
continuous_days if continuous_days else 1
)
camera_config["record"]["alerts"] = alerts_retention
camera_config["record"]["detections"] = detections_retention
del camera_config["record"]["events"]
new_config["cameras"][name] = camera_config
new_config["version"] = "0.15-0"
return new_config
def get_relative_coordinates(
mask: Optional[Union[str, list]], frame_shape: tuple[int, int]
) -> Union[str, list]:
# masks and zones are saved as relative coordinates
# we know if any points are > 1 then it is using the
# old native resolution coordinates
if mask:
if isinstance(mask, list) and any(x > "1.0" for x in mask[0].split(",")):
relative_masks = []
for m in mask:
points = m.split(",")
if any(x > "1.0" for x in points):
rel_points = []
for i in range(0, len(points), 2):
x = int(points[i])
y = int(points[i + 1])
if x > frame_shape[1] or y > frame_shape[0]:
logger.error(
f"Not applying mask due to invalid coordinates. {x},{y} is outside of the detection resolution {frame_shape[1]}x{frame_shape[0]}. Use the editor in the UI to correct the mask."
)
continue
rel_points.append(
f"{round(x / frame_shape[1], 3)},{round(y / frame_shape[0], 3)}"
)
relative_masks.append(",".join(rel_points))
else:
relative_masks.append(m)
mask = relative_masks
elif isinstance(mask, str) and any(x > "1.0" for x in mask.split(",")):
points = mask.split(",")
rel_points = []
for i in range(0, len(points), 2):
x = int(points[i])
y = int(points[i + 1])
if x > frame_shape[1] or y > frame_shape[0]:
logger.error(
f"Not applying mask due to invalid coordinates. {x},{y} is outside of the detection resolution {frame_shape[1]}x{frame_shape[0]}. Use the editor in the UI to correct the mask."
)
return []
rel_points.append(
f"{round(x / frame_shape[1], 3)},{round(y / frame_shape[0], 3)}"
)
mask = ",".join(rel_points)
return mask
return mask
class StreamInfoRetriever:
def __init__(self) -> None:
self.stream_cache: dict[str, tuple[int, int]] = {}
def get_stream_info(self, ffmpeg, path: str) -> str:
if path in self.stream_cache:
return self.stream_cache[path]
info = asyncio.run(get_video_properties(ffmpeg, path))
self.stream_cache[path] = info
return info