Cleanup and organize utils (#7033)

* Force birdseye cameras into standard aspect ratios

* Organize utils

* Update tests

* Formatting

* Isort

* Fix tests

* Cleanup

* isort
This commit is contained in:
Nicolas Mowen 2023-07-06 08:28:50 -06:00 committed by GitHub
parent 606f00867e
commit baf671b764
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 689 additions and 665 deletions

View File

@ -47,7 +47,7 @@ from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
from frigate.util import LimitedQueue as LQueue
from frigate.util.builtin import LimitedQueue as LQueue
from frigate.version import VERSION
from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog

View File

@ -7,7 +7,7 @@ from typing import Any, Callable
from frigate.config import FrigateConfig
from frigate.ptz import OnvifCommandEnum, OnvifController
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
from frigate.util import restart_frigate
from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__)

View File

@ -22,13 +22,13 @@ from frigate.ffmpeg_presets import (
parse_preset_output_rtmp,
)
from frigate.plus import PlusApi
from frigate.util import (
create_mask,
from frigate.util.builtin import (
deep_merge,
escape_special_characters,
get_ffmpeg_arg_list,
load_config_with_no_duplicates,
)
from frigate.util.image import create_mask
logger = logging.getLogger(__name__)

View File

@ -11,7 +11,7 @@ from pydantic import BaseModel, Extra, Field
from pydantic.fields import PrivateAttr
from frigate.plus import PlusApi
from frigate.util import load_labels
from frigate.util.builtin import load_labels
logger = logging.getLogger(__name__)

View File

@ -26,7 +26,8 @@ from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe
from frigate.object_detection import load_labels
from frigate.types import FeatureMetricsTypes
from frigate.util import get_ffmpeg_arg_list, listen
from frigate.util.builtin import get_ffmpeg_arg_list
from frigate.util.services import listen
from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg
try:

View File

@ -14,7 +14,7 @@ from faster_fifo import Queue
from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR
from frigate.events.maintainer import EventTypeEnum
from frigate.util import draw_box_with_label
from frigate.util.image import draw_box_with_label
logger = logging.getLogger(__name__)

View File

@ -11,7 +11,7 @@ from faster_fifo import Queue
from frigate.config import EventsConfig, FrigateConfig
from frigate.models import Event
from frigate.types import CameraMetricsTypes
from frigate.util import to_relative_box
from frigate.util.builtin import to_relative_box
logger = logging.getLogger(__name__)

View File

@ -5,7 +5,7 @@ import os
from enum import Enum
from typing import Any
from frigate.util import vainfo_hwaccel
from frigate.util.services import vainfo_hwaccel
from frigate.version import VERSION
logger = logging.getLogger(__name__)

View File

@ -38,13 +38,8 @@ from frigate.ptz import OnvifController
from frigate.record.export import PlaybackFactorEnum, RecordingExporter
from frigate.stats import stats_snapshot
from frigate.storage import StorageMaintainer
from frigate.util import (
clean_camera_user_pass,
ffprobe_stream,
get_tz_modifiers,
restart_frigate,
vainfo_hwaccel,
)
from frigate.util.builtin import clean_camera_user_pass, get_tz_modifiers
from frigate.util.services import ffprobe_stream, restart_frigate, vainfo_hwaccel
from frigate.version import VERSION
logger = logging.getLogger(__name__)

View File

@ -13,7 +13,7 @@ from typing import Deque, Optional
from faster_fifo import Queue
from setproctitle import setproctitle
from frigate.util import clean_camera_user_pass
from frigate.util.builtin import clean_camera_user_pass
def listener_configurer() -> None:

View File

@ -13,7 +13,9 @@ from setproctitle import setproctitle
from frigate.detectors import create_detector
from frigate.detectors.detector_config import InputTensorEnum
from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels
from frigate.util.builtin import EventsPerSecond, load_labels
from frigate.util.image import SharedMemoryFrameManager
from frigate.util.services import listen
logger = logging.getLogger(__name__)

View File

@ -22,7 +22,7 @@ from frigate.config import (
)
from frigate.const import CLIPS_DIR
from frigate.events.maintainer import EventTypeEnum
from frigate.util import (
from frigate.util.image import (
SharedMemoryFrameManager,
area,
calculate_region,

View File

@ -24,7 +24,11 @@ from ws4py.websocket import WebSocket
from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import BASE_DIR, BIRDSEYE_PIPE
from frigate.util import SharedMemoryFrameManager, copy_yuv_to_position, get_yuv_crop
from frigate.util.image import (
SharedMemoryFrameManager,
copy_yuv_to_position,
get_yuv_crop,
)
logger = logging.getLogger(__name__)

View File

@ -21,7 +21,8 @@ from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR
from frigate.models import Event, Recordings
from frigate.types import FeatureMetricsTypes
from frigate.util import area, get_video_properties
from frigate.util.image import area
from frigate.util.services import get_video_properties
logger = logging.getLogger(__name__)

View File

@ -16,7 +16,7 @@ from frigate.models import Event, Recordings, RecordingsToDelete, Timeline
from frigate.record.cleanup import RecordingCleanup
from frigate.record.maintainer import RecordingMaintainer
from frigate.types import FeatureMetricsTypes
from frigate.util import listen
from frigate.util.services import listen
logger = logging.getLogger(__name__)

View File

@ -17,7 +17,7 @@ from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, DRIVER_AMD, DRIVER_ENV_VAR, RECORD_DIR
from frigate.object_detection import ObjectDetectProcess
from frigate.types import CameraMetricsTypes, StatsTrackingTypes
from frigate.util import (
from frigate.util.services import (
get_amd_gpu_stats,
get_bandwidth_stats,
get_cpu_stats,

View File

@ -2,7 +2,7 @@
import unittest
from frigate.util import clean_camera_user_pass, escape_special_characters
from frigate.util.builtin import clean_camera_user_pass, escape_special_characters
class TestUserPassCleanup(unittest.TestCase):

View File

@ -9,7 +9,7 @@ from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import MODEL_CACHE_DIR
from frigate.detectors import DetectorTypeEnum
from frigate.plus import PlusApi
from frigate.util import deep_merge, load_config_with_no_duplicates
from frigate.util.builtin import deep_merge, load_config_with_no_duplicates
class TestConfig(unittest.TestCase):

View File

@ -3,7 +3,7 @@ from unittest import TestCase, main
import cv2
import numpy as np
from frigate.util import copy_yuv_to_position, get_yuv_crop
from frigate.util.image import copy_yuv_to_position, get_yuv_crop
class TestCopyYuvToPosition(TestCase):

View File

@ -1,7 +1,7 @@
import unittest
from unittest.mock import MagicMock, patch
from frigate.util import get_amd_gpu_stats, get_intel_gpu_stats
from frigate.util.services import get_amd_gpu_stats, get_intel_gpu_stats
class TestGpuStats(unittest.TestCase):

View File

@ -5,7 +5,7 @@ import numpy as np
from norfair.drawing.color import Palette
from norfair.drawing.drawer import Drawer
from frigate.util import intersection
from frigate.util.image import intersection
from frigate.video import (
get_cluster_boundary,
get_cluster_candidates,

View File

@ -3,7 +3,7 @@ from unittest import TestCase, main
import cv2
import numpy as np
from frigate.util import yuv_region_2_rgb
from frigate.util.image import yuv_region_2_rgb
class TestYuvRegion2RGB(TestCase):

View File

@ -10,7 +10,7 @@ from faster_fifo import Queue
from frigate.config import FrigateConfig
from frigate.events.maintainer import EventTypeEnum
from frigate.models import Timeline
from frigate.util import to_relative_box
from frigate.util.builtin import to_relative_box
logger = logging.getLogger(__name__)

View File

@ -7,7 +7,7 @@ from norfair.drawing.drawer import Drawer
from frigate.config import DetectConfig
from frigate.track import ObjectTracker
from frigate.util import intersection_over_union
from frigate.util.image import intersection_over_union
# Normalizes distance from estimate relative to object size

226
frigate/util/builtin.py Normal file
View File

@ -0,0 +1,226 @@
"""Utilities for builtin types manipulation."""
import copy
import ctypes
import datetime
import logging
import multiprocessing
import re
import shlex
import time
import urllib.parse
from collections import Counter
from collections.abc import Mapping
from queue import Empty, Full
from typing import Any, Tuple
import pytz
import yaml
from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT
from faster_fifo import Queue as FFQueue
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
logger = logging.getLogger(__name__)
class EventsPerSecond:
def __init__(self, max_events=1000, last_n_seconds=10):
self._start = None
self._max_events = max_events
self._last_n_seconds = last_n_seconds
self._timestamps = []
def start(self):
self._start = datetime.datetime.now().timestamp()
def update(self):
now = datetime.datetime.now().timestamp()
if self._start is None:
self._start = now
self._timestamps.append(now)
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
self.expire_timestamps(now)
def eps(self):
now = datetime.datetime.now().timestamp()
if self._start is None:
self._start = now
# compute the (approximate) events in the last n seconds
self.expire_timestamps(now)
seconds = min(now - self._start, self._last_n_seconds)
# avoid divide by zero
if seconds == 0:
seconds = 1
return len(self._timestamps) / seconds
# remove aged out timestamps
def expire_timestamps(self, now):
threshold = now - self._last_n_seconds
while self._timestamps and self._timestamps[0] < threshold:
del self._timestamps[0]
class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE,
loads=None,
dumps=None,
):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return super().get(block=block, timeout=timeout)
def qsize(self):
return self.size
def empty(self):
return self.qsize() == 0
def full(self):
return self.qsize() == self.maxsize
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
"""
:param dct1: First dict to merge
:param dct2: Second dict to merge
:param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True)
:return: The merge dictionary
"""
merged = copy.deepcopy(dct1)
for k, v2 in dct2.items():
if k in merged:
v1 = merged[k]
if isinstance(v1, dict) and isinstance(v2, Mapping):
merged[k] = deep_merge(v1, v2, override)
elif isinstance(v1, list) and isinstance(v2, list):
if merge_lists:
merged[k] = v1 + v2
else:
if override:
merged[k] = copy.deepcopy(v2)
else:
merged[k] = copy.deepcopy(v2)
return merged
def load_config_with_no_duplicates(raw_config) -> dict:
"""Get config ensuring duplicate keys are not allowed."""
# https://stackoverflow.com/a/71751051
class PreserveDuplicatesLoader(yaml.loader.Loader):
pass
def map_constructor(loader, node, deep=False):
keys = [loader.construct_object(node, deep=deep) for node, _ in node.value]
vals = [loader.construct_object(node, deep=deep) for _, node in node.value]
key_count = Counter(keys)
data = {}
for key, val in zip(keys, vals):
if key_count[key] > 1:
raise ValueError(
f"Config input {key} is defined multiple times for the same field, this is not allowed."
)
else:
data[key] = val
return data
PreserveDuplicatesLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, map_constructor
)
return yaml.load(raw_config, PreserveDuplicatesLoader)
def clean_camera_user_pass(line: str) -> str:
"""Removes user and password from line."""
if "rtsp://" in line:
return re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)
else:
return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", line)
def escape_special_characters(path: str) -> str:
"""Cleans reserved characters to encodings for ffmpeg."""
try:
found = re.search(REGEX_RTSP_CAMERA_USER_PASS, path).group(0)[3:-1]
pw = found[(found.index(":") + 1) :]
return path.replace(pw, urllib.parse.quote_plus(pw))
except AttributeError:
# path does not have user:pass
return path
def get_ffmpeg_arg_list(arg: Any) -> list:
"""Use arg if list or convert to list format."""
return arg if isinstance(arg, list) else shlex.split(arg)
def load_labels(path, encoding="utf-8"):
"""Loads labels from file (with or without index numbers).
Args:
path: path to label file.
encoding: label file encoding.
Returns:
Dictionary mapping indices to labels.
"""
with open(path, "r", encoding=encoding) as f:
labels = {index: "unknown" for index in range(91)}
lines = f.readlines()
if not lines:
return {}
if lines[0].split(" ", maxsplit=1)[0].isdigit():
pairs = [line.split(" ", maxsplit=1) for line in lines]
labels.update({int(index): label.strip() for index, label in pairs})
else:
labels.update({index: line.strip() for index, line in enumerate(lines)})
return labels
def get_tz_modifiers(tz_name: str) -> Tuple[str, str]:
seconds_offset = (
datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds()
)
hours_offset = int(seconds_offset / 60 / 60)
minutes_offset = int(seconds_offset / 60 - hours_offset * 60)
hour_modifier = f"{hours_offset} hour"
minute_modifier = f"{minutes_offset} minute"
return hour_modifier, minute_modifier
def to_relative_box(
width: int, height: int, box: Tuple[int, int, int, int]
) -> Tuple[int, int, int, int]:
return (
box[0] / width, # x
box[1] / height, # y
(box[2] - box[0]) / width, # w
(box[3] - box[1]) / height, # h
)

652
frigate/util.py → frigate/util/image.py Executable file → Normal file
View File

@ -1,89 +1,17 @@
import copy
import ctypes
"""Utilities for creating and manipulating image frames."""
import datetime
import json
import logging
import multiprocessing
import os
import re
import shlex
import signal
import subprocess as sp
import time
import traceback
import urllib.parse
from abc import ABC, abstractmethod
from collections import Counter
from collections.abc import Mapping
from multiprocessing import shared_memory
from queue import Empty, Full
from typing import Any, AnyStr, Optional, Tuple
from typing import AnyStr, Optional
import cv2
import numpy as np
import psutil
import py3nvml.py3nvml as nvml
import pytz
import yaml
from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT
from faster_fifo import Queue as FFQueue
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
logger = logging.getLogger(__name__)
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
"""
:param dct1: First dict to merge
:param dct2: Second dict to merge
:param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True)
:return: The merge dictionary
"""
merged = copy.deepcopy(dct1)
for k, v2 in dct2.items():
if k in merged:
v1 = merged[k]
if isinstance(v1, dict) and isinstance(v2, Mapping):
merged[k] = deep_merge(v1, v2, override)
elif isinstance(v1, list) and isinstance(v2, list):
if merge_lists:
merged[k] = v1 + v2
else:
if override:
merged[k] = copy.deepcopy(v2)
else:
merged[k] = copy.deepcopy(v2)
return merged
def load_config_with_no_duplicates(raw_config) -> dict:
"""Get config ensuring duplicate keys are not allowed."""
# https://stackoverflow.com/a/71751051
class PreserveDuplicatesLoader(yaml.loader.Loader):
pass
def map_constructor(loader, node, deep=False):
keys = [loader.construct_object(node, deep=deep) for node, _ in node.value]
vals = [loader.construct_object(node, deep=deep) for _, node in node.value]
key_count = Counter(keys)
data = {}
for key, val in zip(keys, vals):
if key_count[key] > 1:
raise ValueError(
f"Config input {key} is defined multiple times for the same field, this is not allowed."
)
else:
data[key] = val
return data
PreserveDuplicatesLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, map_constructor
)
return yaml.load(raw_config, PreserveDuplicatesLoader)
def draw_timestamp(
frame,
timestamp,
@ -645,432 +573,6 @@ def clipped(obj, frame_shape):
return False
def restart_frigate():
proc = psutil.Process(1)
# if this is running via s6, sigterm pid 1
if proc.name() == "s6-svscan":
proc.terminate()
# otherwise, just try and exit frigate
else:
os.kill(os.getpid(), signal.SIGTERM)
class EventsPerSecond:
def __init__(self, max_events=1000, last_n_seconds=10):
self._start = None
self._max_events = max_events
self._last_n_seconds = last_n_seconds
self._timestamps = []
def start(self):
self._start = datetime.datetime.now().timestamp()
def update(self):
now = datetime.datetime.now().timestamp()
if self._start is None:
self._start = now
self._timestamps.append(now)
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
self.expire_timestamps(now)
def eps(self):
now = datetime.datetime.now().timestamp()
if self._start is None:
self._start = now
# compute the (approximate) events in the last n seconds
self.expire_timestamps(now)
seconds = min(now - self._start, self._last_n_seconds)
# avoid divide by zero
if seconds == 0:
seconds = 1
return len(self._timestamps) / seconds
# remove aged out timestamps
def expire_timestamps(self, now):
threshold = now - self._last_n_seconds
while self._timestamps and self._timestamps[0] < threshold:
del self._timestamps[0]
def print_stack(sig, frame):
traceback.print_stack(frame)
def listen():
signal.signal(signal.SIGUSR1, print_stack)
def create_mask(frame_shape, mask):
mask_img = np.zeros(frame_shape, np.uint8)
mask_img[:] = 255
if isinstance(mask, list):
for m in mask:
add_mask(m, mask_img)
elif isinstance(mask, str):
add_mask(mask, mask_img)
return mask_img
def add_mask(mask, mask_img):
points = mask.split(",")
contour = np.array(
[[int(points[i]), int(points[i + 1])] for i in range(0, len(points), 2)]
)
cv2.fillPoly(mask_img, pts=[contour], color=(0))
def load_labels(path, encoding="utf-8"):
"""Loads labels from file (with or without index numbers).
Args:
path: path to label file.
encoding: label file encoding.
Returns:
Dictionary mapping indices to labels.
"""
with open(path, "r", encoding=encoding) as f:
labels = {index: "unknown" for index in range(91)}
lines = f.readlines()
if not lines:
return {}
if lines[0].split(" ", maxsplit=1)[0].isdigit():
pairs = [line.split(" ", maxsplit=1) for line in lines]
labels.update({int(index): label.strip() for index, label in pairs})
else:
labels.update({index: line.strip() for index, line in enumerate(lines)})
return labels
def clean_camera_user_pass(line: str) -> str:
"""Removes user and password from line."""
if "rtsp://" in line:
return re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)
else:
return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", line)
def escape_special_characters(path: str) -> str:
"""Cleans reserved characters to encodings for ffmpeg."""
try:
found = re.search(REGEX_RTSP_CAMERA_USER_PASS, path).group(0)[3:-1]
pw = found[(found.index(":") + 1) :]
return path.replace(pw, urllib.parse.quote_plus(pw))
except AttributeError:
# path does not have user:pass
return path
def get_cgroups_version() -> str:
"""Determine what version of cgroups is enabled."""
cgroup_path = "/sys/fs/cgroup"
if not os.path.ismount(cgroup_path):
logger.debug(f"{cgroup_path} is not a mount point.")
return "unknown"
try:
with open("/proc/mounts", "r") as f:
mounts = f.readlines()
for mount in mounts:
mount_info = mount.split()
if mount_info[1] == cgroup_path:
fs_type = mount_info[2]
if fs_type == "cgroup2fs" or fs_type == "cgroup2":
return "cgroup2"
elif fs_type == "tmpfs":
return "cgroup"
else:
logger.debug(
f"Could not determine cgroups version: unhandled filesystem {fs_type}"
)
break
except Exception as e:
logger.debug(f"Could not determine cgroups version: {e}")
return "unknown"
def get_docker_memlimit_bytes() -> int:
"""Get mem limit in bytes set in docker if present. Returns -1 if no limit detected."""
# check running a supported cgroups version
if get_cgroups_version() == "cgroup2":
memlimit_path = "/sys/fs/cgroup/memory.max"
try:
with open(memlimit_path, "r") as f:
value = f.read().strip()
if value.isnumeric():
return int(value)
elif value.lower() == "max":
return -1
except Exception as e:
logger.debug(f"Unable to get docker memlimit: {e}")
return -1
def get_cpu_stats() -> dict[str, dict]:
"""Get cpu usages for each process id"""
usages = {}
docker_memlimit = get_docker_memlimit_bytes() / 1024
total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024
for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]):
pid = process.info["pid"]
try:
cpu_percent = process.info["cpu_percent"]
cmdline = process.info["cmdline"]
with open(f"/proc/{pid}/stat", "r") as f:
stats = f.readline().split()
utime = int(stats[13])
stime = int(stats[14])
starttime = int(stats[21])
with open("/proc/uptime") as f:
system_uptime_sec = int(float(f.read().split()[0]))
clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
process_utime_sec = utime // clk_tck
process_stime_sec = stime // clk_tck
process_starttime_sec = starttime // clk_tck
process_elapsed_sec = system_uptime_sec - process_starttime_sec
process_usage_sec = process_utime_sec + process_stime_sec
cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec
with open(f"/proc/{pid}/statm", "r") as f:
mem_stats = f.readline().split()
mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024
if docker_memlimit > 0:
mem_pct = round((mem_res / docker_memlimit) * 100, 1)
else:
mem_pct = round((mem_res / total_mem) * 100, 1)
usages[pid] = {
"cpu": str(cpu_percent),
"cpu_average": str(round(cpu_average_usage, 2)),
"mem": f"{mem_pct}",
"cmdline": " ".join(cmdline),
}
except Exception:
continue
return usages
def get_physical_interfaces(interfaces) -> list:
with open("/proc/net/dev", "r") as file:
lines = file.readlines()
physical_interfaces = []
for line in lines:
if ":" in line:
interface = line.split(":")[0].strip()
for int in interfaces:
if interface.startswith(int):
physical_interfaces.append(interface)
return physical_interfaces
def get_bandwidth_stats(config) -> dict[str, dict]:
"""Get bandwidth usages for each ffmpeg process id"""
usages = {}
top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces(
config.telemetry.network_interfaces
)
p = sp.run(
top_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
return usages
else:
lines = p.stdout.split("\n")
for line in lines:
stats = list(filter(lambda a: a != "", line.strip().split("\t")))
try:
if re.search(
r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0]
):
process = stats[0].split("/")
usages[process[len(process) - 2]] = {
"bandwidth": round(float(stats[1]) + float(stats[2]), 1),
}
except (IndexError, ValueError):
continue
return usages
def get_amd_gpu_stats() -> dict[str, str]:
"""Get stats using radeontop."""
radeontop_command = ["radeontop", "-d", "-", "-l", "1"]
p = sp.run(
radeontop_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(f"Unable to poll radeon GPU stats: {p.stderr}")
return None
else:
usages = p.stdout.split(",")
results: dict[str, str] = {}
for hw in usages:
if "gpu" in hw:
results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
elif "vram" in hw:
results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
return results
def get_intel_gpu_stats() -> dict[str, str]:
"""Get stats using intel_gpu_top."""
intel_gpu_top_command = [
"timeout",
"0.5s",
"intel_gpu_top",
"-J",
"-o",
"-",
"-s",
"1",
]
p = sp.run(
intel_gpu_top_command,
encoding="ascii",
capture_output=True,
)
# timeout has a non-zero returncode when timeout is reached
if p.returncode != 124:
logger.error(f"Unable to poll intel GPU stats: {p.stderr}")
return None
else:
reading = "".join(p.stdout.split())
results: dict[str, str] = {}
# render is used for qsv
render = []
for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[14:])
single = packet.get("busy", 0.0)
render.append(float(single))
if render:
render_avg = sum(render) / len(render)
else:
render_avg = 1
# video is used for vaapi
video = []
for result in re.findall('"Video/\d":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[10:])
single = packet.get("busy", 0.0)
video.append(float(single))
if video:
video_avg = sum(video) / len(video)
else:
video_avg = 1
results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%"
results["mem"] = "-%"
return results
def try_get_info(f, h, default="N/A"):
try:
v = f(h)
except nvml.NVMLError_NotSupported:
v = default
return v
def get_nvidia_gpu_stats() -> dict[int, dict]:
results = {}
try:
nvml.nvmlInit()
deviceCount = nvml.nvmlDeviceGetCount()
for i in range(deviceCount):
handle = nvml.nvmlDeviceGetHandleByIndex(i)
meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle)
util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle)
if util != "N/A":
gpu_util = util.gpu
else:
gpu_util = 0
if meminfo != "N/A":
gpu_mem_util = meminfo.used / meminfo.total * 100
else:
gpu_mem_util = -1
results[i] = {
"name": nvml.nvmlDeviceGetName(handle),
"gpu": gpu_util,
"mem": gpu_mem_util,
}
except Exception:
pass
finally:
return results
def ffprobe_stream(path: str) -> sp.CompletedProcess:
"""Run ffprobe on stream."""
clean_path = escape_special_characters(path)
ffprobe_cmd = [
"ffprobe",
"-timeout",
"1000000",
"-print_format",
"json",
"-show_entries",
"stream=codec_long_name,width,height,bit_rate,duration,display_aspect_ratio,avg_frame_rate",
"-loglevel",
"quiet",
clean_path,
]
return sp.run(ffprobe_cmd, capture_output=True)
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
"""Run vainfo."""
ffprobe_cmd = (
["vainfo"]
if not device_name
else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"]
)
return sp.run(ffprobe_cmd, capture_output=True)
def get_ffmpeg_arg_list(arg: Any) -> list:
"""Use arg if list or convert to list format."""
return arg if isinstance(arg, list) else shlex.split(arg)
class FrameManager(ABC):
@abstractmethod
def create(self, name, size) -> AnyStr:
@ -1138,133 +640,23 @@ class SharedMemoryFrameManager(FrameManager):
del self.shm_store[name]
def get_tz_modifiers(tz_name: str) -> Tuple[str, str]:
seconds_offset = (
datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds()
def create_mask(frame_shape, mask):
mask_img = np.zeros(frame_shape, np.uint8)
mask_img[:] = 255
if isinstance(mask, list):
for m in mask:
add_mask(m, mask_img)
elif isinstance(mask, str):
add_mask(mask, mask_img)
return mask_img
def add_mask(mask, mask_img):
points = mask.split(",")
contour = np.array(
[[int(points[i]), int(points[i + 1])] for i in range(0, len(points), 2)]
)
hours_offset = int(seconds_offset / 60 / 60)
minutes_offset = int(seconds_offset / 60 - hours_offset * 60)
hour_modifier = f"{hours_offset} hour"
minute_modifier = f"{minutes_offset} minute"
return hour_modifier, minute_modifier
def to_relative_box(
width: int, height: int, box: Tuple[int, int, int, int]
) -> Tuple[int, int, int, int]:
return (
box[0] / width, # x
box[1] / height, # y
(box[2] - box[0]) / width, # w
(box[3] - box[1]) / height, # h
)
def get_video_properties(url, get_duration=False):
def calculate_duration(video: Optional[any]) -> float:
duration = None
if video is not None:
# Get the frames per second (fps) of the video stream
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if fps and total_frames:
duration = total_frames / fps
# if cv2 failed need to use ffprobe
if duration is None:
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{url}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else:
duration = -1
return duration
width = height = 0
try:
# Open the video stream
video = cv2.VideoCapture(url)
# Check if the video stream was opened successfully
if not video.isOpened():
video = None
except Exception:
video = None
result = {}
if get_duration:
result["duration"] = calculate_duration(video)
if video is not None:
# Get the width of frames in the video stream
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Release the video stream
video.release()
result["width"] = round(width)
result["height"] = round(height)
return result
class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE,
loads=None,
dumps=None,
):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return super().get(block=block, timeout=timeout)
def qsize(self):
return self.size
def empty(self):
return self.qsize() == 0
def full(self):
return self.qsize() == self.maxsize
cv2.fillPoly(mask_img, pts=[contour], color=(0))

403
frigate/util/services.py Normal file
View File

@ -0,0 +1,403 @@
"""Utilities for services."""
import json
import logging
import os
import re
import signal
import subprocess as sp
import traceback
from typing import Optional
import cv2
import psutil
import py3nvml.py3nvml as nvml
from frigate.util.builtin import escape_special_characters
logger = logging.getLogger(__name__)
def restart_frigate():
proc = psutil.Process(1)
# if this is running via s6, sigterm pid 1
if proc.name() == "s6-svscan":
proc.terminate()
# otherwise, just try and exit frigate
else:
os.kill(os.getpid(), signal.SIGTERM)
def print_stack(sig, frame):
traceback.print_stack(frame)
def listen():
signal.signal(signal.SIGUSR1, print_stack)
def get_cgroups_version() -> str:
"""Determine what version of cgroups is enabled."""
cgroup_path = "/sys/fs/cgroup"
if not os.path.ismount(cgroup_path):
logger.debug(f"{cgroup_path} is not a mount point.")
return "unknown"
try:
with open("/proc/mounts", "r") as f:
mounts = f.readlines()
for mount in mounts:
mount_info = mount.split()
if mount_info[1] == cgroup_path:
fs_type = mount_info[2]
if fs_type == "cgroup2fs" or fs_type == "cgroup2":
return "cgroup2"
elif fs_type == "tmpfs":
return "cgroup"
else:
logger.debug(
f"Could not determine cgroups version: unhandled filesystem {fs_type}"
)
break
except Exception as e:
logger.debug(f"Could not determine cgroups version: {e}")
return "unknown"
def get_docker_memlimit_bytes() -> int:
"""Get mem limit in bytes set in docker if present. Returns -1 if no limit detected."""
# check running a supported cgroups version
if get_cgroups_version() == "cgroup2":
memlimit_path = "/sys/fs/cgroup/memory.max"
try:
with open(memlimit_path, "r") as f:
value = f.read().strip()
if value.isnumeric():
return int(value)
elif value.lower() == "max":
return -1
except Exception as e:
logger.debug(f"Unable to get docker memlimit: {e}")
return -1
def get_cpu_stats() -> dict[str, dict]:
"""Get cpu usages for each process id"""
usages = {}
docker_memlimit = get_docker_memlimit_bytes() / 1024
total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024
for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]):
pid = process.info["pid"]
try:
cpu_percent = process.info["cpu_percent"]
cmdline = process.info["cmdline"]
with open(f"/proc/{pid}/stat", "r") as f:
stats = f.readline().split()
utime = int(stats[13])
stime = int(stats[14])
starttime = int(stats[21])
with open("/proc/uptime") as f:
system_uptime_sec = int(float(f.read().split()[0]))
clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
process_utime_sec = utime // clk_tck
process_stime_sec = stime // clk_tck
process_starttime_sec = starttime // clk_tck
process_elapsed_sec = system_uptime_sec - process_starttime_sec
process_usage_sec = process_utime_sec + process_stime_sec
cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec
with open(f"/proc/{pid}/statm", "r") as f:
mem_stats = f.readline().split()
mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024
if docker_memlimit > 0:
mem_pct = round((mem_res / docker_memlimit) * 100, 1)
else:
mem_pct = round((mem_res / total_mem) * 100, 1)
usages[pid] = {
"cpu": str(cpu_percent),
"cpu_average": str(round(cpu_average_usage, 2)),
"mem": f"{mem_pct}",
"cmdline": " ".join(cmdline),
}
except Exception:
continue
return usages
def get_physical_interfaces(interfaces) -> list:
with open("/proc/net/dev", "r") as file:
lines = file.readlines()
physical_interfaces = []
for line in lines:
if ":" in line:
interface = line.split(":")[0].strip()
for int in interfaces:
if interface.startswith(int):
physical_interfaces.append(interface)
return physical_interfaces
def get_bandwidth_stats(config) -> dict[str, dict]:
"""Get bandwidth usages for each ffmpeg process id"""
usages = {}
top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces(
config.telemetry.network_interfaces
)
p = sp.run(
top_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
return usages
else:
lines = p.stdout.split("\n")
for line in lines:
stats = list(filter(lambda a: a != "", line.strip().split("\t")))
try:
if re.search(
r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0]
):
process = stats[0].split("/")
usages[process[len(process) - 2]] = {
"bandwidth": round(float(stats[1]) + float(stats[2]), 1),
}
except (IndexError, ValueError):
continue
return usages
def get_amd_gpu_stats() -> dict[str, str]:
"""Get stats using radeontop."""
radeontop_command = ["radeontop", "-d", "-", "-l", "1"]
p = sp.run(
radeontop_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(f"Unable to poll radeon GPU stats: {p.stderr}")
return None
else:
usages = p.stdout.split(",")
results: dict[str, str] = {}
for hw in usages:
if "gpu" in hw:
results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
elif "vram" in hw:
results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
return results
def get_intel_gpu_stats() -> dict[str, str]:
"""Get stats using intel_gpu_top."""
intel_gpu_top_command = [
"timeout",
"0.5s",
"intel_gpu_top",
"-J",
"-o",
"-",
"-s",
"1",
]
p = sp.run(
intel_gpu_top_command,
encoding="ascii",
capture_output=True,
)
# timeout has a non-zero returncode when timeout is reached
if p.returncode != 124:
logger.error(f"Unable to poll intel GPU stats: {p.stderr}")
return None
else:
reading = "".join(p.stdout.split())
results: dict[str, str] = {}
# render is used for qsv
render = []
for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[14:])
single = packet.get("busy", 0.0)
render.append(float(single))
if render:
render_avg = sum(render) / len(render)
else:
render_avg = 1
# video is used for vaapi
video = []
for result in re.findall('"Video/\d":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[10:])
single = packet.get("busy", 0.0)
video.append(float(single))
if video:
video_avg = sum(video) / len(video)
else:
video_avg = 1
results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%"
results["mem"] = "-%"
return results
def try_get_info(f, h, default="N/A"):
try:
v = f(h)
except nvml.NVMLError_NotSupported:
v = default
return v
def get_nvidia_gpu_stats() -> dict[int, dict]:
results = {}
try:
nvml.nvmlInit()
deviceCount = nvml.nvmlDeviceGetCount()
for i in range(deviceCount):
handle = nvml.nvmlDeviceGetHandleByIndex(i)
meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle)
util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle)
if util != "N/A":
gpu_util = util.gpu
else:
gpu_util = 0
if meminfo != "N/A":
gpu_mem_util = meminfo.used / meminfo.total * 100
else:
gpu_mem_util = -1
results[i] = {
"name": nvml.nvmlDeviceGetName(handle),
"gpu": gpu_util,
"mem": gpu_mem_util,
}
except Exception:
pass
finally:
return results
def ffprobe_stream(path: str) -> sp.CompletedProcess:
"""Run ffprobe on stream."""
clean_path = escape_special_characters(path)
ffprobe_cmd = [
"ffprobe",
"-timeout",
"1000000",
"-print_format",
"json",
"-show_entries",
"stream=codec_long_name,width,height,bit_rate,duration,display_aspect_ratio,avg_frame_rate",
"-loglevel",
"quiet",
clean_path,
]
return sp.run(ffprobe_cmd, capture_output=True)
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
"""Run vainfo."""
ffprobe_cmd = (
["vainfo"]
if not device_name
else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"]
)
return sp.run(ffprobe_cmd, capture_output=True)
def get_video_properties(url, get_duration=False):
def calculate_duration(video: Optional[any]) -> float:
duration = None
if video is not None:
# Get the frames per second (fps) of the video stream
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if fps and total_frames:
duration = total_frames / fps
# if cv2 failed need to use ffprobe
if duration is None:
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{url}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else:
duration = -1
return duration
width = height = 0
try:
# Open the video stream
video = cv2.VideoCapture(url)
# Check if the video stream was opened successfully
if not video.isOpened():
video = None
except Exception:
video = None
result = {}
if get_duration:
result["duration"] = calculate_duration(video)
if video is not None:
# Get the width of frames in the video stream
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Release the video stream
video.release()
result["width"] = round(width)
result["height"] = round(height)
return result

View File

@ -24,8 +24,8 @@ from frigate.motion.improved_motion import ImprovedMotionDetector
from frigate.object_detection import RemoteObjectDetector
from frigate.track import ObjectTracker
from frigate.track.norfair_tracker import NorfairTracker
from frigate.util import (
EventsPerSecond,
from frigate.util.builtin import EventsPerSecond
from frigate.util.image import (
FrameManager,
SharedMemoryFrameManager,
area,
@ -33,11 +33,11 @@ from frigate.util import (
draw_box_with_label,
intersection,
intersection_over_union,
listen,
yuv_region_2_bgr,
yuv_region_2_rgb,
yuv_region_2_yuv,
)
from frigate.util.services import listen
logger = logging.getLogger(__name__)

View File

@ -5,7 +5,7 @@ import time
from multiprocessing.synchronize import Event as MpEvent
from frigate.object_detection import ObjectDetectProcess
from frigate.util import restart_frigate
from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__)