mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-08-04 13:47:37 +02:00
* Fix the `Any` typing hint treewide There has been confusion between the Any type[1] and the any function[2] in typing hints. [1] https://docs.python.org/3/library/typing.html#typing.Any [2] https://docs.python.org/3/library/functions.html#any * Fix typing for various frame_shape members Frame shapes are most likely defined by height and width, so a single int cannot express that. * Wrap gpu stats functions in Optional[] These can return `None`, so they need to be `Type | None`, which is what `Optional` expresses very nicely. * Fix return type in get_latest_segment_datetime Returns a datetime object, not an integer. * Make the return type of FrameManager.write optional This is necessary since the SharedMemoryFrameManager.write function can return None. * Fix total_seconds() return type in get_tz_modifiers The function returns a float, not an int. https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds * Account for floating point results in to_relative_box Because the function uses division the return types may either be int or float. * Resolve ruff deprecation warning The config has been split into formatter and linter, and the global options are deprecated.
402 lines
13 KiB
Python
402 lines
13 KiB
Python
"""Utilities for builtin types manipulation."""
|
|
|
|
import ast
|
|
import copy
|
|
import datetime
|
|
import logging
|
|
import multiprocessing as mp
|
|
import queue
|
|
import re
|
|
import shlex
|
|
import struct
|
|
import urllib.parse
|
|
from collections.abc import Mapping
|
|
from multiprocessing.sharedctypes import Synchronized
|
|
from pathlib import Path
|
|
from typing import Any, Optional, Tuple, Union
|
|
from zoneinfo import ZoneInfoNotFoundError
|
|
|
|
import numpy as np
|
|
import pytz
|
|
from ruamel.yaml import YAML
|
|
from tzlocal import get_localzone
|
|
|
|
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EventsPerSecond:
|
|
def __init__(self, max_events=1000, last_n_seconds=10) -> None:
|
|
self._start = None
|
|
self._max_events = max_events
|
|
self._last_n_seconds = last_n_seconds
|
|
self._timestamps = []
|
|
|
|
def start(self) -> None:
|
|
self._start = datetime.datetime.now().timestamp()
|
|
|
|
def update(self) -> None:
|
|
now = datetime.datetime.now().timestamp()
|
|
if self._start is None:
|
|
self._start = now
|
|
self._timestamps.append(now)
|
|
# truncate the list when it goes 100 over the max_size
|
|
if len(self._timestamps) > self._max_events + 100:
|
|
self._timestamps = self._timestamps[(1 - self._max_events) :]
|
|
self.expire_timestamps(now)
|
|
|
|
def eps(self) -> float:
|
|
now = datetime.datetime.now().timestamp()
|
|
if self._start is None:
|
|
self._start = now
|
|
# compute the (approximate) events in the last n seconds
|
|
self.expire_timestamps(now)
|
|
seconds = min(now - self._start, self._last_n_seconds)
|
|
# avoid divide by zero
|
|
if seconds == 0:
|
|
seconds = 1
|
|
return len(self._timestamps) / seconds
|
|
|
|
# remove aged out timestamps
|
|
def expire_timestamps(self, now: float) -> None:
|
|
threshold = now - self._last_n_seconds
|
|
while self._timestamps and self._timestamps[0] < threshold:
|
|
del self._timestamps[0]
|
|
|
|
|
|
class InferenceSpeed:
|
|
def __init__(self, metric: Synchronized) -> None:
|
|
self.__metric = metric
|
|
self.__initialized = False
|
|
|
|
def update(self, inference_time: float) -> None:
|
|
if not self.__initialized:
|
|
self.__metric.value = inference_time
|
|
self.__initialized = True
|
|
return
|
|
|
|
self.__metric.value = (self.__metric.value * 9 + inference_time) / 10
|
|
|
|
def current(self) -> float:
|
|
return self.__metric.value
|
|
|
|
|
|
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
|
|
"""
|
|
:param dct1: First dict to merge
|
|
:param dct2: Second dict to merge
|
|
:param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True)
|
|
:return: The merge dictionary
|
|
"""
|
|
merged = copy.deepcopy(dct1)
|
|
for k, v2 in dct2.items():
|
|
if k in merged:
|
|
v1 = merged[k]
|
|
if isinstance(v1, dict) and isinstance(v2, Mapping):
|
|
merged[k] = deep_merge(v1, v2, override)
|
|
elif isinstance(v1, list) and isinstance(v2, list):
|
|
if merge_lists:
|
|
merged[k] = v1 + v2
|
|
else:
|
|
if override:
|
|
merged[k] = copy.deepcopy(v2)
|
|
else:
|
|
merged[k] = copy.deepcopy(v2)
|
|
return merged
|
|
|
|
|
|
def clean_camera_user_pass(line: str) -> str:
|
|
"""Removes user and password from line."""
|
|
rtsp_cleaned = re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)
|
|
return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", rtsp_cleaned)
|
|
|
|
|
|
def escape_special_characters(path: str) -> str:
|
|
"""Cleans reserved characters to encodings for ffmpeg."""
|
|
if len(path) > 1000:
|
|
return ValueError("Input too long to check")
|
|
|
|
try:
|
|
found = re.search(REGEX_RTSP_CAMERA_USER_PASS, path).group(0)[3:-1]
|
|
pw = found[(found.index(":") + 1) :]
|
|
return path.replace(pw, urllib.parse.quote_plus(pw))
|
|
except AttributeError:
|
|
# path does not have user:pass
|
|
return path
|
|
|
|
|
|
def get_ffmpeg_arg_list(arg: Any) -> list:
|
|
"""Use arg if list or convert to list format."""
|
|
return arg if isinstance(arg, list) else shlex.split(arg)
|
|
|
|
|
|
def load_labels(path: Optional[str], encoding="utf-8", prefill=91):
|
|
"""Loads labels from file (with or without index numbers).
|
|
Args:
|
|
path: path to label file.
|
|
encoding: label file encoding.
|
|
Returns:
|
|
Dictionary mapping indices to labels.
|
|
"""
|
|
if path is None:
|
|
return {}
|
|
|
|
with open(path, "r", encoding=encoding) as f:
|
|
labels = {index: "unknown" for index in range(prefill)}
|
|
lines = f.readlines()
|
|
if not lines:
|
|
return {}
|
|
|
|
if lines[0].split(" ", maxsplit=1)[0].isdigit():
|
|
pairs = [line.split(" ", maxsplit=1) for line in lines]
|
|
labels.update({int(index): label.strip() for index, label in pairs})
|
|
else:
|
|
labels.update({index: line.strip() for index, line in enumerate(lines)})
|
|
return labels
|
|
|
|
|
|
def get_tz_modifiers(tz_name: str) -> Tuple[str, str, float]:
|
|
seconds_offset = (
|
|
datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds()
|
|
)
|
|
hours_offset = int(seconds_offset / 60 / 60)
|
|
minutes_offset = int(seconds_offset / 60 - hours_offset * 60)
|
|
hour_modifier = f"{hours_offset} hour"
|
|
minute_modifier = f"{minutes_offset} minute"
|
|
return hour_modifier, minute_modifier, seconds_offset
|
|
|
|
|
|
def to_relative_box(
|
|
width: int, height: int, box: Tuple[int, int, int, int]
|
|
) -> Tuple[int | float, int | float, int | float, int | float]:
|
|
return (
|
|
box[0] / width, # x
|
|
box[1] / height, # y
|
|
(box[2] - box[0]) / width, # w
|
|
(box[3] - box[1]) / height, # h
|
|
)
|
|
|
|
|
|
def create_mask(frame_shape, mask):
|
|
mask_img = np.zeros(frame_shape, np.uint8)
|
|
mask_img[:] = 255
|
|
|
|
|
|
def update_yaml_from_url(file_path, url):
|
|
parsed_url = urllib.parse.urlparse(url)
|
|
query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
|
|
|
|
# Filter out empty keys but keep blank values for non-empty keys
|
|
query_string = {k: v for k, v in query_string.items() if k}
|
|
|
|
for key_path_str, new_value_list in query_string.items():
|
|
key_path = key_path_str.split(".")
|
|
for i in range(len(key_path)):
|
|
try:
|
|
index = int(key_path[i])
|
|
key_path[i] = (key_path[i - 1], index)
|
|
key_path.pop(i - 1)
|
|
except ValueError:
|
|
pass
|
|
|
|
if len(new_value_list) > 1:
|
|
update_yaml_file(file_path, key_path, new_value_list)
|
|
else:
|
|
value = new_value_list[0]
|
|
try:
|
|
# no need to convert if we have a mask/zone string
|
|
value = ast.literal_eval(value) if "," not in value else value
|
|
except (ValueError, SyntaxError):
|
|
pass
|
|
update_yaml_file(file_path, key_path, value)
|
|
|
|
|
|
def update_yaml_file(file_path, key_path, new_value):
|
|
yaml = YAML()
|
|
yaml.indent(mapping=2, sequence=4, offset=2)
|
|
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
data = yaml.load(f)
|
|
except FileNotFoundError:
|
|
logger.error(
|
|
f"Unable to read from Frigate config file {file_path}. Make sure it exists and is readable."
|
|
)
|
|
return
|
|
|
|
data = update_yaml(data, key_path, new_value)
|
|
|
|
try:
|
|
with open(file_path, "w") as f:
|
|
yaml.dump(data, f)
|
|
except Exception as e:
|
|
logger.error(f"Unable to write to Frigate config file {file_path}: {e}")
|
|
|
|
|
|
def update_yaml(data, key_path, new_value):
|
|
temp = data
|
|
for key in key_path[:-1]:
|
|
if isinstance(key, tuple):
|
|
if key[0] not in temp:
|
|
temp[key[0]] = [{}] * max(1, key[1] + 1)
|
|
elif len(temp[key[0]]) <= key[1]:
|
|
temp[key[0]] += [{}] * (key[1] - len(temp[key[0]]) + 1)
|
|
temp = temp[key[0]][key[1]]
|
|
else:
|
|
if key not in temp or temp[key] is None:
|
|
temp[key] = {}
|
|
temp = temp[key]
|
|
|
|
last_key = key_path[-1]
|
|
if new_value == "":
|
|
if isinstance(last_key, tuple):
|
|
del temp[last_key[0]][last_key[1]]
|
|
else:
|
|
del temp[last_key]
|
|
else:
|
|
if isinstance(last_key, tuple):
|
|
if last_key[0] not in temp:
|
|
temp[last_key[0]] = [{}] * max(1, last_key[1] + 1)
|
|
elif len(temp[last_key[0]]) <= last_key[1]:
|
|
temp[last_key[0]] += [{}] * (last_key[1] - len(temp[last_key[0]]) + 1)
|
|
temp[last_key[0]][last_key[1]] = new_value
|
|
else:
|
|
if (
|
|
last_key in temp
|
|
and isinstance(temp[last_key], dict)
|
|
and isinstance(new_value, dict)
|
|
):
|
|
temp[last_key].update(new_value)
|
|
else:
|
|
temp[last_key] = new_value
|
|
|
|
return data
|
|
|
|
|
|
def find_by_key(dictionary, target_key):
|
|
if target_key in dictionary:
|
|
return dictionary[target_key]
|
|
else:
|
|
for value in dictionary.values():
|
|
if isinstance(value, dict):
|
|
result = find_by_key(value, target_key)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
|
|
def get_tomorrow_at_time(hour: int) -> datetime.datetime:
|
|
"""Returns the datetime of the following day at 2am."""
|
|
try:
|
|
tomorrow = datetime.datetime.now(get_localzone()) + datetime.timedelta(days=1)
|
|
except ZoneInfoNotFoundError:
|
|
tomorrow = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
|
|
days=1
|
|
)
|
|
logger.warning(
|
|
"Using utc for maintenance due to missing or incorrect timezone set"
|
|
)
|
|
|
|
return tomorrow.replace(hour=hour, minute=0, second=0).astimezone(
|
|
datetime.timezone.utc
|
|
)
|
|
|
|
|
|
def is_current_hour(timestamp: int) -> bool:
|
|
"""Returns if timestamp is in the current UTC hour."""
|
|
start_of_next_hour = (
|
|
datetime.datetime.now(datetime.timezone.utc).replace(
|
|
minute=0, second=0, microsecond=0
|
|
)
|
|
+ datetime.timedelta(hours=1)
|
|
).timestamp()
|
|
return timestamp < start_of_next_hour
|
|
|
|
|
|
def clear_and_unlink(file: Path, missing_ok: bool = True) -> None:
|
|
"""clear file then unlink to avoid space retained by file descriptors."""
|
|
if not missing_ok and not file.exists():
|
|
raise FileNotFoundError()
|
|
|
|
# empty contents of file before unlinking https://github.com/blakeblackshear/frigate/issues/4769
|
|
with open(file, "w"):
|
|
pass
|
|
|
|
file.unlink(missing_ok=missing_ok)
|
|
|
|
|
|
def empty_and_close_queue(q: mp.Queue):
|
|
while True:
|
|
try:
|
|
q.get(block=True, timeout=0.5)
|
|
except queue.Empty:
|
|
q.close()
|
|
q.join_thread()
|
|
return
|
|
|
|
|
|
def generate_color_palette(n):
|
|
# mimic matplotlib's color scheme
|
|
base_colors = [
|
|
(31, 119, 180), # blue
|
|
(255, 127, 14), # orange
|
|
(44, 160, 44), # green
|
|
(214, 39, 40), # red
|
|
(148, 103, 189), # purple
|
|
(140, 86, 75), # brown
|
|
(227, 119, 194), # pink
|
|
(127, 127, 127), # gray
|
|
(188, 189, 34), # olive
|
|
(23, 190, 207), # cyan
|
|
]
|
|
|
|
def interpolate(color1, color2, factor):
|
|
return tuple(int(c1 + (c2 - c1) * factor) for c1, c2 in zip(color1, color2))
|
|
|
|
if n <= len(base_colors):
|
|
return base_colors[:n]
|
|
|
|
colors = base_colors.copy()
|
|
step = 1 / (n - len(base_colors) + 1)
|
|
extra_colors_needed = n - len(base_colors)
|
|
|
|
# interpolate between the base colors to generate more if needed
|
|
for i in range(extra_colors_needed):
|
|
index = i % (len(base_colors) - 1)
|
|
factor = (i + 1) * step
|
|
color1 = base_colors[index]
|
|
color2 = base_colors[index + 1]
|
|
colors.append(interpolate(color1, color2, factor))
|
|
|
|
return colors
|
|
|
|
|
|
def serialize(
|
|
vector: Union[list[float], np.ndarray, float], pack: bool = True
|
|
) -> bytes:
|
|
"""Serializes a list of floats, numpy array, or single float into a compact "raw bytes" format"""
|
|
if isinstance(vector, np.ndarray):
|
|
# Convert numpy array to list of floats
|
|
vector = vector.flatten().tolist()
|
|
elif isinstance(vector, (float, np.float32, np.float64)):
|
|
# Handle single float values
|
|
vector = [vector]
|
|
elif not isinstance(vector, list):
|
|
raise TypeError(
|
|
f"Input must be a list of floats, a numpy array, or a single float. Got {type(vector)}"
|
|
)
|
|
|
|
try:
|
|
if pack:
|
|
return struct.pack("%sf" % len(vector), *vector)
|
|
else:
|
|
return vector
|
|
except struct.error as e:
|
|
raise ValueError(f"Failed to pack vector: {e}. Vector: {vector}")
|
|
|
|
|
|
def deserialize(bytes_data: bytes) -> list[float]:
|
|
"""Deserializes a compact "raw bytes" format into a list of floats"""
|
|
return list(struct.unpack("%sf" % (len(bytes_data) // 4), bytes_data))
|