mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-12 00:06:43 +01:00
761daf46ea
* Run ffmpeg sub process & video_properties as async * Run recording cleanup in the main process * More cleanup * Use inter process communication to write recordings into the DB * Formatting
427 lines
12 KiB
Python
427 lines
12 KiB
Python
"""Utilities for services."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess as sp
|
|
import traceback
|
|
from typing import Optional
|
|
|
|
import cv2
|
|
import psutil
|
|
import py3nvml.py3nvml as nvml
|
|
|
|
from frigate.util.builtin import escape_special_characters
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def restart_frigate():
|
|
proc = psutil.Process(1)
|
|
# if this is running via s6, sigterm pid 1
|
|
if proc.name() == "s6-svscan":
|
|
proc.terminate()
|
|
# otherwise, just try and exit frigate
|
|
else:
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
|
|
|
|
def print_stack(sig, frame):
|
|
traceback.print_stack(frame)
|
|
|
|
|
|
def listen():
|
|
signal.signal(signal.SIGUSR1, print_stack)
|
|
|
|
|
|
def get_cgroups_version() -> str:
|
|
"""Determine what version of cgroups is enabled."""
|
|
|
|
cgroup_path = "/sys/fs/cgroup"
|
|
|
|
if not os.path.ismount(cgroup_path):
|
|
logger.debug(f"{cgroup_path} is not a mount point.")
|
|
return "unknown"
|
|
|
|
try:
|
|
with open("/proc/mounts", "r") as f:
|
|
mounts = f.readlines()
|
|
|
|
for mount in mounts:
|
|
mount_info = mount.split()
|
|
if mount_info[1] == cgroup_path:
|
|
fs_type = mount_info[2]
|
|
if fs_type == "cgroup2fs" or fs_type == "cgroup2":
|
|
return "cgroup2"
|
|
elif fs_type == "tmpfs":
|
|
return "cgroup"
|
|
else:
|
|
logger.debug(
|
|
f"Could not determine cgroups version: unhandled filesystem {fs_type}"
|
|
)
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"Could not determine cgroups version: {e}")
|
|
|
|
return "unknown"
|
|
|
|
|
|
def get_docker_memlimit_bytes() -> int:
|
|
"""Get mem limit in bytes set in docker if present. Returns -1 if no limit detected."""
|
|
|
|
# check running a supported cgroups version
|
|
if get_cgroups_version() == "cgroup2":
|
|
memlimit_path = "/sys/fs/cgroup/memory.max"
|
|
|
|
try:
|
|
with open(memlimit_path, "r") as f:
|
|
value = f.read().strip()
|
|
|
|
if value.isnumeric():
|
|
return int(value)
|
|
elif value.lower() == "max":
|
|
return -1
|
|
except Exception as e:
|
|
logger.debug(f"Unable to get docker memlimit: {e}")
|
|
|
|
return -1
|
|
|
|
|
|
def get_cpu_stats() -> dict[str, dict]:
|
|
"""Get cpu usages for each process id"""
|
|
usages = {}
|
|
docker_memlimit = get_docker_memlimit_bytes() / 1024
|
|
total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024
|
|
|
|
for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]):
|
|
pid = process.info["pid"]
|
|
try:
|
|
cpu_percent = process.info["cpu_percent"]
|
|
cmdline = process.info["cmdline"]
|
|
|
|
with open(f"/proc/{pid}/stat", "r") as f:
|
|
stats = f.readline().split()
|
|
utime = int(stats[13])
|
|
stime = int(stats[14])
|
|
starttime = int(stats[21])
|
|
|
|
with open("/proc/uptime") as f:
|
|
system_uptime_sec = int(float(f.read().split()[0]))
|
|
|
|
clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
|
|
|
|
process_utime_sec = utime // clk_tck
|
|
process_stime_sec = stime // clk_tck
|
|
process_starttime_sec = starttime // clk_tck
|
|
|
|
process_elapsed_sec = system_uptime_sec - process_starttime_sec
|
|
process_usage_sec = process_utime_sec + process_stime_sec
|
|
cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec
|
|
|
|
with open(f"/proc/{pid}/statm", "r") as f:
|
|
mem_stats = f.readline().split()
|
|
mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024
|
|
|
|
if docker_memlimit > 0:
|
|
mem_pct = round((mem_res / docker_memlimit) * 100, 1)
|
|
else:
|
|
mem_pct = round((mem_res / total_mem) * 100, 1)
|
|
|
|
usages[pid] = {
|
|
"cpu": str(cpu_percent),
|
|
"cpu_average": str(round(cpu_average_usage, 2)),
|
|
"mem": f"{mem_pct}",
|
|
"cmdline": " ".join(cmdline),
|
|
}
|
|
except Exception:
|
|
continue
|
|
|
|
return usages
|
|
|
|
|
|
def get_physical_interfaces(interfaces) -> list:
|
|
with open("/proc/net/dev", "r") as file:
|
|
lines = file.readlines()
|
|
|
|
physical_interfaces = []
|
|
for line in lines:
|
|
if ":" in line:
|
|
interface = line.split(":")[0].strip()
|
|
for int in interfaces:
|
|
if interface.startswith(int):
|
|
physical_interfaces.append(interface)
|
|
|
|
return physical_interfaces
|
|
|
|
|
|
def get_bandwidth_stats(config) -> dict[str, dict]:
|
|
"""Get bandwidth usages for each ffmpeg process id"""
|
|
usages = {}
|
|
top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces(
|
|
config.telemetry.network_interfaces
|
|
)
|
|
|
|
p = sp.run(
|
|
top_command,
|
|
encoding="ascii",
|
|
capture_output=True,
|
|
)
|
|
|
|
if p.returncode != 0:
|
|
return usages
|
|
else:
|
|
lines = p.stdout.split("\n")
|
|
for line in lines:
|
|
stats = list(filter(lambda a: a != "", line.strip().split("\t")))
|
|
try:
|
|
if re.search(
|
|
r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0]
|
|
):
|
|
process = stats[0].split("/")
|
|
usages[process[len(process) - 2]] = {
|
|
"bandwidth": round(float(stats[1]) + float(stats[2]), 1),
|
|
}
|
|
except (IndexError, ValueError):
|
|
continue
|
|
|
|
return usages
|
|
|
|
|
|
def get_amd_gpu_stats() -> dict[str, str]:
|
|
"""Get stats using radeontop."""
|
|
radeontop_command = ["radeontop", "-d", "-", "-l", "1"]
|
|
|
|
p = sp.run(
|
|
radeontop_command,
|
|
encoding="ascii",
|
|
capture_output=True,
|
|
)
|
|
|
|
if p.returncode != 0:
|
|
logger.error(f"Unable to poll radeon GPU stats: {p.stderr}")
|
|
return None
|
|
else:
|
|
usages = p.stdout.split(",")
|
|
results: dict[str, str] = {}
|
|
|
|
for hw in usages:
|
|
if "gpu" in hw:
|
|
results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
|
|
elif "vram" in hw:
|
|
results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
|
|
|
|
return results
|
|
|
|
|
|
def get_intel_gpu_stats() -> dict[str, str]:
|
|
"""Get stats using intel_gpu_top."""
|
|
intel_gpu_top_command = [
|
|
"timeout",
|
|
"0.5s",
|
|
"intel_gpu_top",
|
|
"-J",
|
|
"-o",
|
|
"-",
|
|
"-s",
|
|
"1",
|
|
]
|
|
|
|
p = sp.run(
|
|
intel_gpu_top_command,
|
|
encoding="ascii",
|
|
capture_output=True,
|
|
)
|
|
|
|
# timeout has a non-zero returncode when timeout is reached
|
|
if p.returncode != 124:
|
|
logger.error(f"Unable to poll intel GPU stats: {p.stderr}")
|
|
return None
|
|
else:
|
|
reading = "".join(p.stdout.split())
|
|
results: dict[str, str] = {}
|
|
|
|
# render is used for qsv
|
|
render = []
|
|
for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading):
|
|
packet = json.loads(result[14:])
|
|
single = packet.get("busy", 0.0)
|
|
render.append(float(single))
|
|
|
|
if render:
|
|
render_avg = sum(render) / len(render)
|
|
else:
|
|
render_avg = 1
|
|
|
|
# video is used for vaapi
|
|
video = []
|
|
for result in re.findall('"Video/\d":{[a-z":\d.,%]+}', reading):
|
|
packet = json.loads(result[10:])
|
|
single = packet.get("busy", 0.0)
|
|
video.append(float(single))
|
|
|
|
if video:
|
|
video_avg = sum(video) / len(video)
|
|
else:
|
|
video_avg = 1
|
|
|
|
results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%"
|
|
results["mem"] = "-%"
|
|
return results
|
|
|
|
|
|
def try_get_info(f, h, default="N/A"):
|
|
try:
|
|
v = f(h)
|
|
except nvml.NVMLError_NotSupported:
|
|
v = default
|
|
return v
|
|
|
|
|
|
def get_nvidia_gpu_stats() -> dict[int, dict]:
|
|
results = {}
|
|
try:
|
|
nvml.nvmlInit()
|
|
deviceCount = nvml.nvmlDeviceGetCount()
|
|
for i in range(deviceCount):
|
|
handle = nvml.nvmlDeviceGetHandleByIndex(i)
|
|
meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle)
|
|
util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle)
|
|
if util != "N/A":
|
|
gpu_util = util.gpu
|
|
else:
|
|
gpu_util = 0
|
|
|
|
if meminfo != "N/A":
|
|
gpu_mem_util = meminfo.used / meminfo.total * 100
|
|
else:
|
|
gpu_mem_util = -1
|
|
|
|
results[i] = {
|
|
"name": nvml.nvmlDeviceGetName(handle),
|
|
"gpu": gpu_util,
|
|
"mem": gpu_mem_util,
|
|
}
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
return results
|
|
|
|
|
|
def get_jetson_stats() -> dict[int, dict]:
|
|
results = {}
|
|
|
|
try:
|
|
results["mem"] = "-" # no discrete gpu memory
|
|
|
|
with open("/sys/devices/gpu.0/load", "r") as f:
|
|
gpuload = float(f.readline()) / 10
|
|
results["gpu"] = f"{gpuload}%"
|
|
except Exception:
|
|
return None
|
|
|
|
return results
|
|
|
|
|
|
def ffprobe_stream(path: str) -> sp.CompletedProcess:
|
|
"""Run ffprobe on stream."""
|
|
clean_path = escape_special_characters(path)
|
|
ffprobe_cmd = [
|
|
"ffprobe",
|
|
"-timeout",
|
|
"1000000",
|
|
"-print_format",
|
|
"json",
|
|
"-show_entries",
|
|
"stream=codec_long_name,width,height,bit_rate,duration,display_aspect_ratio,avg_frame_rate",
|
|
"-loglevel",
|
|
"quiet",
|
|
clean_path,
|
|
]
|
|
return sp.run(ffprobe_cmd, capture_output=True)
|
|
|
|
|
|
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
|
|
"""Run vainfo."""
|
|
ffprobe_cmd = (
|
|
["vainfo"]
|
|
if not device_name
|
|
else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"]
|
|
)
|
|
return sp.run(ffprobe_cmd, capture_output=True)
|
|
|
|
|
|
async def get_video_properties(url, get_duration=False):
|
|
async def calculate_duration(video: Optional[any]) -> float:
|
|
duration = None
|
|
|
|
if video is not None:
|
|
# Get the frames per second (fps) of the video stream
|
|
fps = video.get(cv2.CAP_PROP_FPS)
|
|
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
|
if fps and total_frames:
|
|
duration = total_frames / fps
|
|
|
|
# if cv2 failed need to use ffprobe
|
|
if duration is None:
|
|
p = await asyncio.create_subprocess_exec(
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"default=noprint_wrappers=1:nokey=1",
|
|
f"{url}",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
await p.wait()
|
|
|
|
if p.returncode == 0:
|
|
result = (await p.stdout.read()).decode()
|
|
else:
|
|
result = None
|
|
|
|
if result:
|
|
duration = float(result.strip())
|
|
else:
|
|
duration = -1
|
|
|
|
return duration
|
|
|
|
width = height = 0
|
|
|
|
try:
|
|
# Open the video stream
|
|
video = cv2.VideoCapture(url)
|
|
|
|
# Check if the video stream was opened successfully
|
|
if not video.isOpened():
|
|
video = None
|
|
except Exception:
|
|
video = None
|
|
|
|
result = {}
|
|
|
|
if get_duration:
|
|
result["duration"] = await calculate_duration(video)
|
|
|
|
if video is not None:
|
|
# Get the width of frames in the video stream
|
|
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
|
|
|
|
# Get the height of frames in the video stream
|
|
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
|
|
|
# Release the video stream
|
|
video.release()
|
|
|
|
result["width"] = round(width)
|
|
result["height"] = round(height)
|
|
|
|
return result
|