mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-03-07 02:18:07 +01:00
* use react-jsonschema-form for UI config * don't use properties wrapper when generating config i18n json * configure for full i18n support * section fields * add descriptions to all fields for i18n * motion i18n * fix nullable fields * sanitize internal fields * add switches widgets and use friendly names * fix nullable schema entries * ensure update_topic is added to api calls this needs further backend implementation to work correctly * add global sections, camera config overrides, and reset button * i18n * add reset logic to global config view * tweaks * fix sections and live validation * fix validation for schema objects that can be null * generic and custom per-field validation * improve generic error validation messages * remove show advanced fields switch * tweaks * use shadcn theme * fix array field template * i18n tweaks * remove collapsible around root section * deep merge schema for advanced fields * add array field item template and fix ffmpeg section * add missing i18n keys * tweaks * comment out api call for testing * add config groups as a separate i18n namespace * add descriptions to all pydantic fields * make titles more concise * new titles as i18n * update i18n config generation script to use json schema * tweaks * tweaks * rebase * clean up * form tweaks * add wildcards and fix object filter fields * add field template for additionalproperties schema objects * improve typing * add section description from schema and clarify global vs camera level descriptions * separate and consolidate global and camera i18n namespaces * clean up now obsolete namespaces * tweaks * refactor sections and overrides * add ability to render components before and after fields * fix titles * chore(sections): remove legacy single-section components replaced by template * refactor configs to use individual files with a template * fix review description * apply hidden fields after ui schema * move util * remove unused i18n * clean up error messages * fix fast refresh * add custom validation and use it for ffmpeg input roles * update nav tree * remove unused * re-add override and modified indicators * mark pending changes and add confirmation dialog for resets * fix red unsaved dot * tweaks * add docs links, readonly keys, and restart required per field * add special case and comments for global motion section * add section form special cases * combine review sections * tweaks * add audio labels endpoint * add audio label switches and input to filter list * fix type * remove key from config when resetting to default/global * don't show description for new key/val fields * tweaks * spacing tweaks * add activity indicator and scrollbar tweaks * add docs to filter fields * wording changes * fix global ffmpeg section * add review classification zones to review form * add backend endpoint and frontend widget for ffmpeg presets and manual args * improve wording * hide descriptions for additional properties arrays * add warning log about incorrectly nested model config * spacing and language tweaks * fix i18n keys * networking section docs and description * small wording tweaks * add layout grid field * refactor with shared utilities * field order * add individual detectors to schema add detector titles and descriptions (docstrings in pydantic are used for descriptions) and add i18n keys to globals * clean up detectors section and i18n * don't save model config back to yaml when saving detectors * add full detectors config to api model dump works around the way we use detector plugins so we can have the full detector config for the frontend * add restart button to toast when restart is required * add ui option to remove inner cards * fix buttons * section tweaks * don't zoom into text on mobile * make buttons sticky at bottom of sections * small tweaks * highlight label of changed fields * add null to enum list when unwrapping * refactor to shared utils and add save all button * add undo all button * add RJSF to dictionary * consolidate utils * preserve form data when changing cameras * add mono fonts * add popover to show what fields will be saved * fix mobile menu not re-rendering with unsaved dots * tweaks * fix logger and env vars config section saving use escaped periods in keys to retain them in the config file (eg "frigate.embeddings") * add timezone widget * role map field with validation * fix validation for model section * add another hidden field * add footer message for required restart * use rjsf for notifications view * fix config saving * add replace rules field * default column layout and add field sizing * clean up field template * refactor profile settings to match rjsf forms * tweaks * refactor frigate+ view and make tweaks to sections * show frigate+ model info in detection model settings when using a frigate+ model * update restartRequired for all fields * fix restart fields * tweaks and add ability enable disabled cameras more backend changes required * require restart when enabling camera that is disabled in config * disable save when form is invalid * refactor ffmpeg section for readability * change label * clean up camera inputs fields * misc tweaks to ffmpeg section - add raw paths endpoint to ensure credentials get saved - restart required tooltip * maintenance settings tweaks * don't mutate with lodash * fix description re-rendering for nullable object fields * hide reindex field * update rjsf * add frigate+ description to settings pane * disable save all when any section is invalid * show translated field name in validation error pane * clean up * remove unused * fix genai merge * fix genai
444 lines
14 KiB
Python
444 lines
14 KiB
Python
"""Utilities for builtin types manipulation."""
|
|
|
|
import ast
|
|
import copy
|
|
import datetime
|
|
import logging
|
|
import math
|
|
import multiprocessing.queues
|
|
import queue
|
|
import re
|
|
import shlex
|
|
import struct
|
|
import urllib.parse
|
|
from collections.abc import Mapping
|
|
from multiprocessing.sharedctypes import Synchronized
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional, Tuple, Union
|
|
|
|
import numpy as np
|
|
from ruamel.yaml import YAML
|
|
|
|
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EventsPerSecond:
|
|
def __init__(self, max_events=1000, last_n_seconds=10) -> None:
|
|
self._start = None
|
|
self._max_events = max_events
|
|
self._last_n_seconds = last_n_seconds
|
|
self._timestamps = []
|
|
|
|
def start(self) -> None:
|
|
self._start = datetime.datetime.now().timestamp()
|
|
|
|
def update(self) -> None:
|
|
now = datetime.datetime.now().timestamp()
|
|
if self._start is None:
|
|
self._start = now
|
|
self._timestamps.append(now)
|
|
# truncate the list when it goes 100 over the max_size
|
|
if len(self._timestamps) > self._max_events + 100:
|
|
self._timestamps = self._timestamps[(1 - self._max_events) :]
|
|
self.expire_timestamps(now)
|
|
|
|
def eps(self) -> float:
|
|
now = datetime.datetime.now().timestamp()
|
|
if self._start is None:
|
|
self._start = now
|
|
# compute the (approximate) events in the last n seconds
|
|
self.expire_timestamps(now)
|
|
seconds = min(now - self._start, self._last_n_seconds)
|
|
# avoid divide by zero
|
|
if seconds == 0:
|
|
seconds = 1
|
|
return len(self._timestamps) / seconds
|
|
|
|
# remove aged out timestamps
|
|
def expire_timestamps(self, now: float) -> None:
|
|
threshold = now - self._last_n_seconds
|
|
while self._timestamps and self._timestamps[0] < threshold:
|
|
del self._timestamps[0]
|
|
|
|
|
|
class InferenceSpeed:
|
|
def __init__(self, metric: Synchronized) -> None:
|
|
self.__metric = metric
|
|
self.__initialized = False
|
|
|
|
def update(self, inference_time: float) -> None:
|
|
if not self.__initialized:
|
|
self.__metric.value = inference_time
|
|
self.__initialized = True
|
|
return
|
|
|
|
self.__metric.value = (self.__metric.value * 9 + inference_time) / 10
|
|
|
|
def current(self) -> float:
|
|
return self.__metric.value
|
|
|
|
|
|
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
|
|
"""
|
|
:param dct1: First dict to merge
|
|
:param dct2: Second dict to merge
|
|
:param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True)
|
|
:return: The merge dictionary
|
|
"""
|
|
merged = copy.deepcopy(dct1)
|
|
for k, v2 in dct2.items():
|
|
if k in merged:
|
|
v1 = merged[k]
|
|
if isinstance(v1, dict) and isinstance(v2, Mapping):
|
|
merged[k] = deep_merge(v1, v2, override)
|
|
elif isinstance(v1, list) and isinstance(v2, list):
|
|
if merge_lists:
|
|
merged[k] = v1 + v2
|
|
else:
|
|
if override:
|
|
merged[k] = copy.deepcopy(v2)
|
|
else:
|
|
merged[k] = copy.deepcopy(v2)
|
|
return merged
|
|
|
|
|
|
def clean_camera_user_pass(line: str) -> str:
|
|
"""Removes user and password from line."""
|
|
rtsp_cleaned = re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)
|
|
return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", rtsp_cleaned)
|
|
|
|
|
|
def escape_special_characters(path: str) -> str:
|
|
"""Cleans reserved characters to encodings for ffmpeg."""
|
|
if len(path) > 1000:
|
|
return ValueError("Input too long to check")
|
|
|
|
try:
|
|
found = re.search(REGEX_RTSP_CAMERA_USER_PASS, path).group(0)[3:-1]
|
|
pw = found[(found.index(":") + 1) :]
|
|
return path.replace(pw, urllib.parse.quote_plus(pw))
|
|
except AttributeError:
|
|
# path does not have user:pass
|
|
return path
|
|
|
|
|
|
def get_ffmpeg_arg_list(arg: Any) -> list:
|
|
"""Use arg if list or convert to list format."""
|
|
return arg if isinstance(arg, list) else shlex.split(arg)
|
|
|
|
|
|
def load_labels(
|
|
path: Optional[str], encoding="utf-8", prefill=91, indexed: bool | None = None
|
|
):
|
|
"""Loads labels from file (with or without index numbers).
|
|
Args:
|
|
path: path to label file.
|
|
encoding: label file encoding.
|
|
Returns:
|
|
Dictionary mapping indices to labels.
|
|
"""
|
|
if path is None:
|
|
return {}
|
|
|
|
with open(path, "r", encoding=encoding) as f:
|
|
labels = {index: "unknown" for index in range(prefill)}
|
|
lines = f.readlines()
|
|
if not lines:
|
|
return {}
|
|
|
|
if indexed != False and lines[0].split(" ", maxsplit=1)[0].isdigit():
|
|
pairs = [line.split(" ", maxsplit=1) for line in lines]
|
|
labels.update({int(index): label.strip() for index, label in pairs})
|
|
else:
|
|
labels.update({index: line.strip() for index, line in enumerate(lines)})
|
|
|
|
return labels
|
|
|
|
|
|
def to_relative_box(
|
|
width: int, height: int, box: Tuple[int, int, int, int]
|
|
) -> Tuple[int | float, int | float, int | float, int | float]:
|
|
return (
|
|
box[0] / width, # x
|
|
box[1] / height, # y
|
|
(box[2] - box[0]) / width, # w
|
|
(box[3] - box[1]) / height, # h
|
|
)
|
|
|
|
|
|
def create_mask(frame_shape, mask):
|
|
mask_img = np.zeros(frame_shape, np.uint8)
|
|
mask_img[:] = 255
|
|
|
|
|
|
def process_config_query_string(query_string: Dict[str, list]) -> Dict[str, Any]:
|
|
updates = {}
|
|
for key_path_str, new_value_list in query_string.items():
|
|
# use the string key as-is for updates dictionary
|
|
if len(new_value_list) > 1:
|
|
updates[key_path_str] = new_value_list
|
|
else:
|
|
value = new_value_list[0]
|
|
try:
|
|
# no need to convert if we have a mask/zone string
|
|
value = ast.literal_eval(value) if "," not in value else value
|
|
except (ValueError, SyntaxError):
|
|
pass
|
|
updates[key_path_str] = value
|
|
return updates
|
|
|
|
|
|
def flatten_config_data(
|
|
config_data: Dict[str, Any], parent_key: str = ""
|
|
) -> Dict[str, Any]:
|
|
items = []
|
|
for key, value in config_data.items():
|
|
escaped_key = escape_config_key_segment(str(key))
|
|
new_key = f"{parent_key}.{escaped_key}" if parent_key else escaped_key
|
|
if isinstance(value, dict):
|
|
items.extend(flatten_config_data(value, new_key).items())
|
|
else:
|
|
items.append((new_key, value))
|
|
return dict(items)
|
|
|
|
|
|
def escape_config_key_segment(segment: str) -> str:
|
|
"""Escape dots and backslashes so they can be treated as literal key chars."""
|
|
return segment.replace("\\", "\\\\").replace(".", "\\.")
|
|
|
|
|
|
def split_config_key_path(key_path_str: str) -> list[str]:
|
|
"""Split a dotted config path, honoring \\. as a literal dot in a key."""
|
|
parts: list[str] = []
|
|
current: list[str] = []
|
|
escaped = False
|
|
|
|
for char in key_path_str:
|
|
if escaped:
|
|
current.append(char)
|
|
escaped = False
|
|
continue
|
|
|
|
if char == "\\":
|
|
escaped = True
|
|
continue
|
|
|
|
if char == ".":
|
|
parts.append("".join(current))
|
|
current = []
|
|
continue
|
|
|
|
current.append(char)
|
|
|
|
if escaped:
|
|
current.append("\\")
|
|
|
|
parts.append("".join(current))
|
|
return parts
|
|
|
|
|
|
def update_yaml_file_bulk(file_path: str, updates: Dict[str, Any]):
|
|
yaml = YAML()
|
|
yaml.indent(mapping=2, sequence=4, offset=2)
|
|
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
data = yaml.load(f)
|
|
except FileNotFoundError:
|
|
logger.error(
|
|
f"Unable to read from Frigate config file {file_path}. Make sure it exists and is readable."
|
|
)
|
|
return
|
|
|
|
# Apply all updates
|
|
for key_path_str, new_value in updates.items():
|
|
key_path = split_config_key_path(key_path_str)
|
|
for i in range(len(key_path)):
|
|
try:
|
|
index = int(key_path[i])
|
|
key_path[i] = (key_path[i - 1], index)
|
|
key_path.pop(i - 1)
|
|
except ValueError:
|
|
pass
|
|
data = update_yaml(data, key_path, new_value)
|
|
|
|
try:
|
|
with open(file_path, "w") as f:
|
|
yaml.dump(data, f)
|
|
except Exception as e:
|
|
logger.error(f"Unable to write to Frigate config file {file_path}: {e}")
|
|
|
|
|
|
def update_yaml(data, key_path, new_value):
|
|
temp = data
|
|
for key in key_path[:-1]:
|
|
if isinstance(key, tuple):
|
|
if key[0] not in temp:
|
|
temp[key[0]] = [{}] * max(1, key[1] + 1)
|
|
elif len(temp[key[0]]) <= key[1]:
|
|
temp[key[0]] += [{}] * (key[1] - len(temp[key[0]]) + 1)
|
|
temp = temp[key[0]][key[1]]
|
|
else:
|
|
if key not in temp or temp[key] is None:
|
|
temp[key] = {}
|
|
temp = temp[key]
|
|
|
|
last_key = key_path[-1]
|
|
if new_value == "":
|
|
if isinstance(last_key, tuple):
|
|
del temp[last_key[0]][last_key[1]]
|
|
else:
|
|
del temp[last_key]
|
|
else:
|
|
if isinstance(last_key, tuple):
|
|
if last_key[0] not in temp:
|
|
temp[last_key[0]] = [{}] * max(1, last_key[1] + 1)
|
|
elif len(temp[last_key[0]]) <= last_key[1]:
|
|
temp[last_key[0]] += [{}] * (last_key[1] - len(temp[last_key[0]]) + 1)
|
|
temp[last_key[0]][last_key[1]] = new_value
|
|
else:
|
|
if (
|
|
last_key in temp
|
|
and isinstance(temp[last_key], dict)
|
|
and isinstance(new_value, dict)
|
|
):
|
|
temp[last_key].update(new_value)
|
|
else:
|
|
temp[last_key] = new_value
|
|
|
|
return data
|
|
|
|
|
|
def find_by_key(dictionary, target_key):
|
|
if target_key in dictionary:
|
|
return dictionary[target_key]
|
|
else:
|
|
for value in dictionary.values():
|
|
if isinstance(value, dict):
|
|
result = find_by_key(value, target_key)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
|
|
def clear_and_unlink(file: Path, missing_ok: bool = True) -> None:
|
|
"""clear file then unlink to avoid space retained by file descriptors."""
|
|
if not missing_ok and not file.exists():
|
|
raise FileNotFoundError()
|
|
|
|
# empty contents of file before unlinking https://github.com/blakeblackshear/frigate/issues/4769
|
|
with open(file, "w"):
|
|
pass
|
|
|
|
file.unlink(missing_ok=missing_ok)
|
|
|
|
|
|
def empty_and_close_queue(q):
|
|
while True:
|
|
try:
|
|
q.get(block=True, timeout=0.5)
|
|
except (queue.Empty, EOFError):
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"Error while emptying queue: {e}")
|
|
break
|
|
|
|
# close the queue if it is a multiprocessing queue
|
|
# manager proxy queues do not have close or join_thread method
|
|
if isinstance(q, multiprocessing.queues.Queue):
|
|
try:
|
|
q.close()
|
|
q.join_thread()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def generate_color_palette(n):
|
|
# mimic matplotlib's color scheme
|
|
base_colors = [
|
|
(31, 119, 180), # blue
|
|
(255, 127, 14), # orange
|
|
(44, 160, 44), # green
|
|
(214, 39, 40), # red
|
|
(148, 103, 189), # purple
|
|
(140, 86, 75), # brown
|
|
(227, 119, 194), # pink
|
|
(127, 127, 127), # gray
|
|
(188, 189, 34), # olive
|
|
(23, 190, 207), # cyan
|
|
]
|
|
|
|
def interpolate(color1, color2, factor):
|
|
return tuple(int(c1 + (c2 - c1) * factor) for c1, c2 in zip(color1, color2))
|
|
|
|
if n <= len(base_colors):
|
|
return base_colors[:n]
|
|
|
|
colors = base_colors.copy()
|
|
step = 1 / (n - len(base_colors) + 1)
|
|
extra_colors_needed = n - len(base_colors)
|
|
|
|
# interpolate between the base colors to generate more if needed
|
|
for i in range(extra_colors_needed):
|
|
index = i % (len(base_colors) - 1)
|
|
factor = (i + 1) * step
|
|
color1 = base_colors[index]
|
|
color2 = base_colors[index + 1]
|
|
colors.append(interpolate(color1, color2, factor))
|
|
|
|
return colors
|
|
|
|
|
|
def serialize(
|
|
vector: Union[list[float], np.ndarray, float], pack: bool = True
|
|
) -> bytes:
|
|
"""Serializes a list of floats, numpy array, or single float into a compact "raw bytes" format"""
|
|
if isinstance(vector, np.ndarray):
|
|
# Convert numpy array to list of floats
|
|
vector = vector.flatten().tolist()
|
|
elif isinstance(vector, (float, np.float32, np.float64)):
|
|
# Handle single float values
|
|
vector = [vector]
|
|
elif not isinstance(vector, list):
|
|
raise TypeError(
|
|
f"Input must be a list of floats, a numpy array, or a single float. Got {type(vector)}"
|
|
)
|
|
|
|
try:
|
|
if pack:
|
|
return struct.pack("%sf" % len(vector), *vector)
|
|
else:
|
|
return vector
|
|
except struct.error as e:
|
|
raise ValueError(f"Failed to pack vector: {e}. Vector: {vector}")
|
|
|
|
|
|
def deserialize(bytes_data: bytes) -> list[float]:
|
|
"""Deserializes a compact "raw bytes" format into a list of floats"""
|
|
return list(struct.unpack("%sf" % (len(bytes_data) // 4), bytes_data))
|
|
|
|
|
|
def sanitize_float(value):
|
|
"""Replace NaN or inf with 0.0."""
|
|
if isinstance(value, (int, float)) and not math.isfinite(value):
|
|
return 0.0
|
|
return value
|
|
|
|
|
|
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
|
return 1 - cosine_distance(a, b)
|
|
|
|
|
|
def cosine_distance(a: np.ndarray, b: np.ndarray) -> float:
|
|
"""Returns cosine distance to match sqlite-vec's calculation."""
|
|
dot = np.dot(a, b)
|
|
a_mag = np.dot(a, a) # ||a||^2
|
|
b_mag = np.dot(b, b) # ||b||^2
|
|
|
|
if a_mag == 0 or b_mag == 0:
|
|
return 1.0
|
|
|
|
return 1.0 - (dot / (np.sqrt(a_mag) * np.sqrt(b_mag)))
|