mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			771 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			771 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Utilities for services."""
 | 
						|
 | 
						|
import asyncio
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import re
 | 
						|
import resource
 | 
						|
import signal
 | 
						|
import subprocess as sp
 | 
						|
import traceback
 | 
						|
from datetime import datetime
 | 
						|
from typing import Any, List, Optional, Tuple
 | 
						|
 | 
						|
import cv2
 | 
						|
import psutil
 | 
						|
import py3nvml.py3nvml as nvml
 | 
						|
import requests
 | 
						|
 | 
						|
from frigate.const import (
 | 
						|
    DRIVER_AMD,
 | 
						|
    DRIVER_ENV_VAR,
 | 
						|
    FFMPEG_HWACCEL_NVIDIA,
 | 
						|
    FFMPEG_HWACCEL_VAAPI,
 | 
						|
)
 | 
						|
from frigate.util.builtin import clean_camera_user_pass, escape_special_characters
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
def restart_frigate():
 | 
						|
    proc = psutil.Process(1)
 | 
						|
    # if this is running via s6, sigterm pid 1
 | 
						|
    if proc.name() == "s6-svscan":
 | 
						|
        proc.terminate()
 | 
						|
    # otherwise, just try and exit frigate
 | 
						|
    else:
 | 
						|
        os.kill(os.getpid(), signal.SIGINT)
 | 
						|
 | 
						|
 | 
						|
def print_stack(sig, frame):
 | 
						|
    traceback.print_stack(frame)
 | 
						|
 | 
						|
 | 
						|
def listen():
 | 
						|
    signal.signal(signal.SIGUSR1, print_stack)
 | 
						|
 | 
						|
 | 
						|
def get_cgroups_version() -> str:
 | 
						|
    """Determine what version of cgroups is enabled."""
 | 
						|
 | 
						|
    cgroup_path = "/sys/fs/cgroup"
 | 
						|
 | 
						|
    if not os.path.ismount(cgroup_path):
 | 
						|
        logger.debug(f"{cgroup_path} is not a mount point.")
 | 
						|
        return "unknown"
 | 
						|
 | 
						|
    try:
 | 
						|
        with open("/proc/mounts", "r") as f:
 | 
						|
            mounts = f.readlines()
 | 
						|
 | 
						|
        for mount in mounts:
 | 
						|
            mount_info = mount.split()
 | 
						|
            if mount_info[1] == cgroup_path:
 | 
						|
                fs_type = mount_info[2]
 | 
						|
                if fs_type == "cgroup2fs" or fs_type == "cgroup2":
 | 
						|
                    return "cgroup2"
 | 
						|
                elif fs_type == "tmpfs":
 | 
						|
                    return "cgroup"
 | 
						|
                else:
 | 
						|
                    logger.debug(
 | 
						|
                        f"Could not determine cgroups version: unhandled filesystem {fs_type}"
 | 
						|
                    )
 | 
						|
                break
 | 
						|
    except Exception as e:
 | 
						|
        logger.debug(f"Could not determine cgroups version: {e}")
 | 
						|
 | 
						|
    return "unknown"
 | 
						|
 | 
						|
 | 
						|
def get_docker_memlimit_bytes() -> int:
 | 
						|
    """Get mem limit in bytes set in docker if present. Returns -1 if no limit detected."""
 | 
						|
 | 
						|
    # check running a supported cgroups version
 | 
						|
    if get_cgroups_version() == "cgroup2":
 | 
						|
        memlimit_path = "/sys/fs/cgroup/memory.max"
 | 
						|
 | 
						|
        try:
 | 
						|
            with open(memlimit_path, "r") as f:
 | 
						|
                value = f.read().strip()
 | 
						|
 | 
						|
            if value.isnumeric():
 | 
						|
                return int(value)
 | 
						|
            elif value.lower() == "max":
 | 
						|
                return -1
 | 
						|
        except Exception as e:
 | 
						|
            logger.debug(f"Unable to get docker memlimit: {e}")
 | 
						|
 | 
						|
    return -1
 | 
						|
 | 
						|
 | 
						|
def get_cpu_stats() -> dict[str, dict]:
 | 
						|
    """Get cpu usages for each process id"""
 | 
						|
    usages = {}
 | 
						|
    docker_memlimit = get_docker_memlimit_bytes() / 1024
 | 
						|
    total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024
 | 
						|
 | 
						|
    system_cpu = psutil.cpu_percent(
 | 
						|
        interval=None
 | 
						|
    )  # no interval as we don't want to be blocking
 | 
						|
    system_mem = psutil.virtual_memory()
 | 
						|
    usages["frigate.full_system"] = {
 | 
						|
        "cpu": str(system_cpu),
 | 
						|
        "mem": str(system_mem.percent),
 | 
						|
    }
 | 
						|
 | 
						|
    for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]):
 | 
						|
        pid = str(process.info["pid"])
 | 
						|
        try:
 | 
						|
            cpu_percent = process.info["cpu_percent"]
 | 
						|
            cmdline = process.info["cmdline"]
 | 
						|
 | 
						|
            with open(f"/proc/{pid}/stat", "r") as f:
 | 
						|
                stats = f.readline().split()
 | 
						|
            utime = int(stats[13])
 | 
						|
            stime = int(stats[14])
 | 
						|
            start_time = int(stats[21])
 | 
						|
 | 
						|
            with open("/proc/uptime") as f:
 | 
						|
                system_uptime_sec = int(float(f.read().split()[0]))
 | 
						|
 | 
						|
            clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
 | 
						|
 | 
						|
            process_utime_sec = utime // clk_tck
 | 
						|
            process_stime_sec = stime // clk_tck
 | 
						|
            process_start_time_sec = start_time // clk_tck
 | 
						|
 | 
						|
            process_elapsed_sec = system_uptime_sec - process_start_time_sec
 | 
						|
            process_usage_sec = process_utime_sec + process_stime_sec
 | 
						|
            cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec
 | 
						|
 | 
						|
            with open(f"/proc/{pid}/statm", "r") as f:
 | 
						|
                mem_stats = f.readline().split()
 | 
						|
            mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024
 | 
						|
 | 
						|
            if docker_memlimit > 0:
 | 
						|
                mem_pct = round((mem_res / docker_memlimit) * 100, 1)
 | 
						|
            else:
 | 
						|
                mem_pct = round((mem_res / total_mem) * 100, 1)
 | 
						|
 | 
						|
            usages[pid] = {
 | 
						|
                "cpu": str(cpu_percent),
 | 
						|
                "cpu_average": str(round(cpu_average_usage, 2)),
 | 
						|
                "mem": f"{mem_pct}",
 | 
						|
                "cmdline": clean_camera_user_pass(" ".join(cmdline)),
 | 
						|
            }
 | 
						|
        except Exception:
 | 
						|
            continue
 | 
						|
 | 
						|
    return usages
 | 
						|
 | 
						|
 | 
						|
def get_physical_interfaces(interfaces) -> list:
 | 
						|
    if not interfaces:
 | 
						|
        return []
 | 
						|
 | 
						|
    with open("/proc/net/dev", "r") as file:
 | 
						|
        lines = file.readlines()
 | 
						|
 | 
						|
    physical_interfaces = []
 | 
						|
    for line in lines:
 | 
						|
        if ":" in line:
 | 
						|
            interface = line.split(":")[0].strip()
 | 
						|
            for int in interfaces:
 | 
						|
                if interface.startswith(int):
 | 
						|
                    physical_interfaces.append(interface)
 | 
						|
 | 
						|
    return physical_interfaces
 | 
						|
 | 
						|
 | 
						|
def get_bandwidth_stats(config) -> dict[str, dict]:
 | 
						|
    """Get bandwidth usages for each ffmpeg process id"""
 | 
						|
    usages = {}
 | 
						|
    top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces(
 | 
						|
        config.telemetry.network_interfaces
 | 
						|
    )
 | 
						|
 | 
						|
    p = sp.run(
 | 
						|
        top_command,
 | 
						|
        encoding="ascii",
 | 
						|
        capture_output=True,
 | 
						|
    )
 | 
						|
 | 
						|
    if p.returncode != 0:
 | 
						|
        logger.error(f"Error getting network stats :: {p.stderr}")
 | 
						|
        return usages
 | 
						|
    else:
 | 
						|
        lines = p.stdout.split("\n")
 | 
						|
        for line in lines:
 | 
						|
            stats = list(filter(lambda a: a != "", line.strip().split("\t")))
 | 
						|
            try:
 | 
						|
                if re.search(
 | 
						|
                    r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0]
 | 
						|
                ):
 | 
						|
                    process = stats[0].split("/")
 | 
						|
                    usages[process[len(process) - 2]] = {
 | 
						|
                        "bandwidth": round(float(stats[1]) + float(stats[2]), 1),
 | 
						|
                    }
 | 
						|
            except (IndexError, ValueError):
 | 
						|
                continue
 | 
						|
 | 
						|
    return usages
 | 
						|
 | 
						|
 | 
						|
def is_vaapi_amd_driver() -> bool:
 | 
						|
    # Use the explicitly configured driver, if available
 | 
						|
    driver = os.environ.get(DRIVER_ENV_VAR)
 | 
						|
    if driver:
 | 
						|
        return driver == DRIVER_AMD
 | 
						|
 | 
						|
    # Otherwise, ask vainfo what is has autodetected
 | 
						|
    p = vainfo_hwaccel()
 | 
						|
 | 
						|
    if p.returncode != 0:
 | 
						|
        logger.error(f"Unable to poll vainfo: {p.stderr}")
 | 
						|
        return False
 | 
						|
    else:
 | 
						|
        output = p.stdout.decode("unicode_escape").split("\n")
 | 
						|
 | 
						|
        # VA Info will print out the friendly name of the driver
 | 
						|
        return any("AMD Radeon Graphics" in line for line in output)
 | 
						|
 | 
						|
 | 
						|
def get_amd_gpu_stats() -> Optional[dict[str, str]]:
 | 
						|
    """Get stats using radeontop."""
 | 
						|
    radeontop_command = ["radeontop", "-d", "-", "-l", "1"]
 | 
						|
 | 
						|
    p = sp.run(
 | 
						|
        radeontop_command,
 | 
						|
        encoding="ascii",
 | 
						|
        capture_output=True,
 | 
						|
    )
 | 
						|
 | 
						|
    if p.returncode != 0:
 | 
						|
        logger.error(f"Unable to poll radeon GPU stats: {p.stderr}")
 | 
						|
        return None
 | 
						|
    else:
 | 
						|
        usages = p.stdout.split(",")
 | 
						|
        results: dict[str, str] = {}
 | 
						|
 | 
						|
        for hw in usages:
 | 
						|
            if "gpu" in hw:
 | 
						|
                results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
 | 
						|
            elif "vram" in hw:
 | 
						|
                results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
 | 
						|
 | 
						|
        return results
 | 
						|
 | 
						|
 | 
						|
def get_intel_gpu_stats(sriov: bool) -> Optional[dict[str, str]]:
 | 
						|
    """Get stats using intel_gpu_top."""
 | 
						|
 | 
						|
    def get_stats_manually(output: str) -> dict[str, str]:
 | 
						|
        """Find global stats via regex when json fails to parse."""
 | 
						|
        reading = "".join(output)
 | 
						|
        results: dict[str, str] = {}
 | 
						|
 | 
						|
        # render is used for qsv
 | 
						|
        render = []
 | 
						|
        for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading):
 | 
						|
            packet = json.loads(result[14:])
 | 
						|
            single = packet.get("busy", 0.0)
 | 
						|
            render.append(float(single))
 | 
						|
 | 
						|
        if render:
 | 
						|
            render_avg = sum(render) / len(render)
 | 
						|
        else:
 | 
						|
            render_avg = 1
 | 
						|
 | 
						|
        # video is used for vaapi
 | 
						|
        video = []
 | 
						|
        for result in re.findall(r'"Video/\d":{[a-z":\d.,%]+}', reading):
 | 
						|
            packet = json.loads(result[10:])
 | 
						|
            single = packet.get("busy", 0.0)
 | 
						|
            video.append(float(single))
 | 
						|
 | 
						|
        if video:
 | 
						|
            video_avg = sum(video) / len(video)
 | 
						|
        else:
 | 
						|
            video_avg = 1
 | 
						|
 | 
						|
        results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%"
 | 
						|
        results["mem"] = "-%"
 | 
						|
        return results
 | 
						|
 | 
						|
    intel_gpu_top_command = [
 | 
						|
        "timeout",
 | 
						|
        "0.5s",
 | 
						|
        "intel_gpu_top",
 | 
						|
        "-J",
 | 
						|
        "-o",
 | 
						|
        "-",
 | 
						|
        "-s",
 | 
						|
        "1",
 | 
						|
    ]
 | 
						|
 | 
						|
    if sriov:
 | 
						|
        intel_gpu_top_command += ["-d", "sriov"]
 | 
						|
 | 
						|
    try:
 | 
						|
        p = sp.run(
 | 
						|
            intel_gpu_top_command,
 | 
						|
            encoding="ascii",
 | 
						|
            capture_output=True,
 | 
						|
        )
 | 
						|
    except UnicodeDecodeError:
 | 
						|
        return None
 | 
						|
 | 
						|
    # timeout has a non-zero returncode when timeout is reached
 | 
						|
    if p.returncode != 124:
 | 
						|
        logger.error(f"Unable to poll intel GPU stats: {p.stderr}")
 | 
						|
        return None
 | 
						|
    else:
 | 
						|
        output = "".join(p.stdout.split())
 | 
						|
 | 
						|
        try:
 | 
						|
            data = json.loads(f"[{output}]")
 | 
						|
        except json.JSONDecodeError:
 | 
						|
            return get_stats_manually(output)
 | 
						|
 | 
						|
        results: dict[str, str] = {}
 | 
						|
        render = {"global": []}
 | 
						|
        video = {"global": []}
 | 
						|
 | 
						|
        for block in data:
 | 
						|
            global_engine = block.get("engines")
 | 
						|
 | 
						|
            if global_engine:
 | 
						|
                render_frame = global_engine.get("Render/3D/0", {}).get("busy")
 | 
						|
                video_frame = global_engine.get("Video/0", {}).get("busy")
 | 
						|
 | 
						|
                if render_frame is not None:
 | 
						|
                    render["global"].append(float(render_frame))
 | 
						|
 | 
						|
                if video_frame is not None:
 | 
						|
                    video["global"].append(float(video_frame))
 | 
						|
 | 
						|
            clients = block.get("clients", {})
 | 
						|
 | 
						|
            if clients and len(clients):
 | 
						|
                for client_block in clients.values():
 | 
						|
                    key = client_block["pid"]
 | 
						|
 | 
						|
                    if render.get(key) is None:
 | 
						|
                        render[key] = []
 | 
						|
                        video[key] = []
 | 
						|
 | 
						|
                    client_engine = client_block.get("engine-classes", {})
 | 
						|
 | 
						|
                    render_frame = client_engine.get("Render/3D", {}).get("busy")
 | 
						|
                    video_frame = client_engine.get("Video", {}).get("busy")
 | 
						|
 | 
						|
                    if render_frame is not None:
 | 
						|
                        render[key].append(float(render_frame))
 | 
						|
 | 
						|
                    if video_frame is not None:
 | 
						|
                        video[key].append(float(video_frame))
 | 
						|
 | 
						|
        if render["global"] and video["global"]:
 | 
						|
            results["gpu"] = (
 | 
						|
                f"{round(((sum(render['global']) / len(render['global'])) + (sum(video['global']) / len(video['global']))) / 2, 2)}%"
 | 
						|
            )
 | 
						|
            results["mem"] = "-%"
 | 
						|
 | 
						|
        if len(render.keys()) > 1:
 | 
						|
            results["clients"] = {}
 | 
						|
 | 
						|
            for key in render.keys():
 | 
						|
                if key == "global" or not render[key] or not video[key]:
 | 
						|
                    continue
 | 
						|
 | 
						|
                results["clients"][key] = (
 | 
						|
                    f"{round(((sum(render[key]) / len(render[key])) + (sum(video[key]) / len(video[key]))) / 2, 2)}%"
 | 
						|
                )
 | 
						|
 | 
						|
        return results
 | 
						|
 | 
						|
 | 
						|
def get_rockchip_gpu_stats() -> Optional[dict[str, str]]:
 | 
						|
    """Get GPU stats using rk."""
 | 
						|
    try:
 | 
						|
        with open("/sys/kernel/debug/rkrga/load", "r") as f:
 | 
						|
            content = f.read()
 | 
						|
    except FileNotFoundError:
 | 
						|
        return None
 | 
						|
 | 
						|
    load_values = []
 | 
						|
    for line in content.splitlines():
 | 
						|
        match = re.search(r"load = (\d+)%", line)
 | 
						|
        if match:
 | 
						|
            load_values.append(int(match.group(1)))
 | 
						|
 | 
						|
    if not load_values:
 | 
						|
        return None
 | 
						|
 | 
						|
    average_load = f"{round(sum(load_values) / len(load_values), 2)}%"
 | 
						|
    return {"gpu": average_load, "mem": "-"}
 | 
						|
 | 
						|
 | 
						|
def get_rockchip_npu_stats() -> Optional[dict[str, float | str]]:
 | 
						|
    """Get NPU stats using rk."""
 | 
						|
    try:
 | 
						|
        with open("/sys/kernel/debug/rknpu/load", "r") as f:
 | 
						|
            npu_output = f.read()
 | 
						|
 | 
						|
            if "Core0:" in npu_output:
 | 
						|
                # multi core NPU
 | 
						|
                core_loads = re.findall(r"Core\d+:\s*(\d+)%", npu_output)
 | 
						|
            else:
 | 
						|
                # single core NPU
 | 
						|
                core_loads = re.findall(r"NPU load:\s+(\d+)%", npu_output)
 | 
						|
    except FileNotFoundError:
 | 
						|
        core_loads = None
 | 
						|
 | 
						|
    if not core_loads:
 | 
						|
        return None
 | 
						|
 | 
						|
    percentages = [int(load) for load in core_loads]
 | 
						|
    mean = round(sum(percentages) / len(percentages), 2)
 | 
						|
    return {"npu": mean, "mem": "-"}
 | 
						|
 | 
						|
 | 
						|
def try_get_info(f, h, default="N/A"):
 | 
						|
    try:
 | 
						|
        if h:
 | 
						|
            v = f(h)
 | 
						|
        else:
 | 
						|
            v = f()
 | 
						|
    except nvml.NVMLError_NotSupported:
 | 
						|
        v = default
 | 
						|
    return v
 | 
						|
 | 
						|
 | 
						|
def get_nvidia_gpu_stats() -> dict[int, dict]:
 | 
						|
    names: dict[str, int] = {}
 | 
						|
    results = {}
 | 
						|
    try:
 | 
						|
        nvml.nvmlInit()
 | 
						|
        deviceCount = nvml.nvmlDeviceGetCount()
 | 
						|
        for i in range(deviceCount):
 | 
						|
            handle = nvml.nvmlDeviceGetHandleByIndex(i)
 | 
						|
            gpu_name = nvml.nvmlDeviceGetName(handle)
 | 
						|
 | 
						|
            # handle case where user has multiple of same GPU
 | 
						|
            if gpu_name in names:
 | 
						|
                names[gpu_name] += 1
 | 
						|
                gpu_name += f" ({names.get(gpu_name)})"
 | 
						|
            else:
 | 
						|
                names[gpu_name] = 1
 | 
						|
 | 
						|
            meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle)
 | 
						|
            util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle)
 | 
						|
            enc = try_get_info(nvml.nvmlDeviceGetEncoderUtilization, handle)
 | 
						|
            dec = try_get_info(nvml.nvmlDeviceGetDecoderUtilization, handle)
 | 
						|
            pstate = try_get_info(nvml.nvmlDeviceGetPowerState, handle, default=None)
 | 
						|
 | 
						|
            if util != "N/A":
 | 
						|
                gpu_util = util.gpu
 | 
						|
            else:
 | 
						|
                gpu_util = 0
 | 
						|
 | 
						|
            if meminfo != "N/A":
 | 
						|
                gpu_mem_util = meminfo.used / meminfo.total * 100
 | 
						|
            else:
 | 
						|
                gpu_mem_util = -1
 | 
						|
 | 
						|
            if enc != "N/A":
 | 
						|
                enc_util = enc[0]
 | 
						|
            else:
 | 
						|
                enc_util = -1
 | 
						|
 | 
						|
            if dec != "N/A":
 | 
						|
                dec_util = dec[0]
 | 
						|
            else:
 | 
						|
                dec_util = -1
 | 
						|
 | 
						|
            results[i] = {
 | 
						|
                "name": gpu_name,
 | 
						|
                "gpu": gpu_util,
 | 
						|
                "mem": gpu_mem_util,
 | 
						|
                "enc": enc_util,
 | 
						|
                "dec": dec_util,
 | 
						|
                "pstate": pstate or "unknown",
 | 
						|
            }
 | 
						|
    except Exception:
 | 
						|
        pass
 | 
						|
    finally:
 | 
						|
        return results
 | 
						|
 | 
						|
 | 
						|
def get_jetson_stats() -> Optional[dict[int, dict]]:
 | 
						|
    results = {}
 | 
						|
 | 
						|
    try:
 | 
						|
        results["mem"] = "-"  # no discrete gpu memory
 | 
						|
 | 
						|
        with open("/sys/devices/gpu.0/load", "r") as f:
 | 
						|
            gpuload = float(f.readline()) / 10
 | 
						|
            results["gpu"] = f"{gpuload}%"
 | 
						|
    except Exception:
 | 
						|
        return None
 | 
						|
 | 
						|
    return results
 | 
						|
 | 
						|
 | 
						|
def ffprobe_stream(ffmpeg, path: str) -> sp.CompletedProcess:
 | 
						|
    """Run ffprobe on stream."""
 | 
						|
    clean_path = escape_special_characters(path)
 | 
						|
    ffprobe_cmd = [
 | 
						|
        ffmpeg.ffprobe_path,
 | 
						|
        "-timeout",
 | 
						|
        "1000000",
 | 
						|
        "-print_format",
 | 
						|
        "json",
 | 
						|
        "-show_entries",
 | 
						|
        "stream=codec_long_name,width,height,bit_rate,duration,display_aspect_ratio,avg_frame_rate",
 | 
						|
        "-loglevel",
 | 
						|
        "quiet",
 | 
						|
        clean_path,
 | 
						|
    ]
 | 
						|
    return sp.run(ffprobe_cmd, capture_output=True)
 | 
						|
 | 
						|
 | 
						|
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
 | 
						|
    """Run vainfo."""
 | 
						|
    ffprobe_cmd = (
 | 
						|
        ["vainfo"]
 | 
						|
        if not device_name
 | 
						|
        else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"]
 | 
						|
    )
 | 
						|
    return sp.run(ffprobe_cmd, capture_output=True)
 | 
						|
 | 
						|
 | 
						|
def get_nvidia_driver_info() -> dict[str, Any]:
 | 
						|
    """Get general hardware info for nvidia GPU."""
 | 
						|
    results = {}
 | 
						|
    try:
 | 
						|
        nvml.nvmlInit()
 | 
						|
        deviceCount = nvml.nvmlDeviceGetCount()
 | 
						|
        for i in range(deviceCount):
 | 
						|
            handle = nvml.nvmlDeviceGetHandleByIndex(i)
 | 
						|
            driver = try_get_info(nvml.nvmlSystemGetDriverVersion, None, default=None)
 | 
						|
            cuda_compute = try_get_info(
 | 
						|
                nvml.nvmlDeviceGetCudaComputeCapability, handle, default=None
 | 
						|
            )
 | 
						|
            vbios = try_get_info(nvml.nvmlDeviceGetVbiosVersion, handle, default=None)
 | 
						|
            results[i] = {
 | 
						|
                "name": nvml.nvmlDeviceGetName(handle),
 | 
						|
                "driver": driver or "unknown",
 | 
						|
                "cuda_compute": cuda_compute or "unknown",
 | 
						|
                "vbios": vbios or "unknown",
 | 
						|
            }
 | 
						|
    except Exception:
 | 
						|
        pass
 | 
						|
    finally:
 | 
						|
        return results
 | 
						|
 | 
						|
 | 
						|
def auto_detect_hwaccel() -> str:
 | 
						|
    """Detect hwaccel args by default."""
 | 
						|
    try:
 | 
						|
        cuda = False
 | 
						|
        vaapi = False
 | 
						|
        resp = requests.get("http://127.0.0.1:1984/api/ffmpeg/hardware", timeout=3)
 | 
						|
 | 
						|
        if resp.status_code == 200:
 | 
						|
            data: dict[str, list[dict[str, str]]] = resp.json()
 | 
						|
            for source in data.get("sources", []):
 | 
						|
                if "cuda" in source.get("url", "") and source.get("name") == "OK":
 | 
						|
                    cuda = True
 | 
						|
 | 
						|
                if "vaapi" in source.get("url", "") and source.get("name") == "OK":
 | 
						|
                    vaapi = True
 | 
						|
    except requests.RequestException:
 | 
						|
        pass
 | 
						|
 | 
						|
    if cuda:
 | 
						|
        logger.info("Automatically detected nvidia hwaccel for video decoding")
 | 
						|
        return FFMPEG_HWACCEL_NVIDIA
 | 
						|
 | 
						|
    if vaapi:
 | 
						|
        logger.info("Automatically detected vaapi hwaccel for video decoding")
 | 
						|
        return FFMPEG_HWACCEL_VAAPI
 | 
						|
 | 
						|
    logger.warning(
 | 
						|
        "Did not detect hwaccel, using a GPU for accelerated video decoding is highly recommended"
 | 
						|
    )
 | 
						|
    return ""
 | 
						|
 | 
						|
 | 
						|
async def get_video_properties(
 | 
						|
    ffmpeg, url: str, get_duration: bool = False
 | 
						|
) -> dict[str, Any]:
 | 
						|
    async def calculate_duration(video: Optional[Any]) -> float:
 | 
						|
        duration = None
 | 
						|
 | 
						|
        if video is not None:
 | 
						|
            # Get the frames per second (fps) of the video stream
 | 
						|
            fps = video.get(cv2.CAP_PROP_FPS)
 | 
						|
            total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
 | 
						|
 | 
						|
            if fps and total_frames:
 | 
						|
                duration = total_frames / fps
 | 
						|
 | 
						|
        # if cv2 failed need to use ffprobe
 | 
						|
        if duration is None:
 | 
						|
            p = await asyncio.create_subprocess_exec(
 | 
						|
                ffmpeg.ffprobe_path,
 | 
						|
                "-v",
 | 
						|
                "error",
 | 
						|
                "-show_entries",
 | 
						|
                "format=duration",
 | 
						|
                "-of",
 | 
						|
                "default=noprint_wrappers=1:nokey=1",
 | 
						|
                f"{url}",
 | 
						|
                stdout=asyncio.subprocess.PIPE,
 | 
						|
                stderr=asyncio.subprocess.PIPE,
 | 
						|
            )
 | 
						|
            await p.wait()
 | 
						|
 | 
						|
            if p.returncode == 0:
 | 
						|
                result = (await p.stdout.read()).decode()
 | 
						|
            else:
 | 
						|
                result = None
 | 
						|
 | 
						|
            if result:
 | 
						|
                try:
 | 
						|
                    duration = float(result.strip())
 | 
						|
                except ValueError:
 | 
						|
                    duration = -1
 | 
						|
            else:
 | 
						|
                duration = -1
 | 
						|
 | 
						|
        return duration
 | 
						|
 | 
						|
    width = height = 0
 | 
						|
 | 
						|
    try:
 | 
						|
        # Open the video stream using OpenCV
 | 
						|
        video = cv2.VideoCapture(url)
 | 
						|
 | 
						|
        # Check if the video stream was opened successfully
 | 
						|
        if not video.isOpened():
 | 
						|
            video = None
 | 
						|
    except Exception:
 | 
						|
        video = None
 | 
						|
 | 
						|
    result = {}
 | 
						|
 | 
						|
    if get_duration:
 | 
						|
        result["duration"] = await calculate_duration(video)
 | 
						|
 | 
						|
    if video is not None:
 | 
						|
        # Get the width of frames in the video stream
 | 
						|
        width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
 | 
						|
 | 
						|
        # Get the height of frames in the video stream
 | 
						|
        height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
 | 
						|
 | 
						|
        # Get the stream encoding
 | 
						|
        fourcc_int = int(video.get(cv2.CAP_PROP_FOURCC))
 | 
						|
        fourcc = (
 | 
						|
            chr((fourcc_int >> 0) & 255)
 | 
						|
            + chr((fourcc_int >> 8) & 255)
 | 
						|
            + chr((fourcc_int >> 16) & 255)
 | 
						|
            + chr((fourcc_int >> 24) & 255)
 | 
						|
        )
 | 
						|
 | 
						|
        # Release the video stream
 | 
						|
        video.release()
 | 
						|
 | 
						|
        result["width"] = round(width)
 | 
						|
        result["height"] = round(height)
 | 
						|
        result["fourcc"] = fourcc
 | 
						|
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def process_logs(
 | 
						|
    contents: str,
 | 
						|
    service: Optional[str] = None,
 | 
						|
    start: Optional[int] = None,
 | 
						|
    end: Optional[int] = None,
 | 
						|
) -> Tuple[int, List[str]]:
 | 
						|
    log_lines = []
 | 
						|
    last_message = None
 | 
						|
    last_timestamp = None
 | 
						|
    repeat_count = 0
 | 
						|
 | 
						|
    for raw_line in contents.splitlines():
 | 
						|
        clean_line = raw_line.strip()
 | 
						|
 | 
						|
        if len(clean_line) < 10:
 | 
						|
            continue
 | 
						|
 | 
						|
        # Handle cases where S6 does not include date in log line
 | 
						|
        if "  " not in clean_line:
 | 
						|
            clean_line = f"{datetime.now()}  {clean_line}"
 | 
						|
 | 
						|
        try:
 | 
						|
            # Find the position of the first double space to extract timestamp and message
 | 
						|
            date_end = clean_line.index("  ")
 | 
						|
            timestamp = clean_line[:date_end]
 | 
						|
            full_message = clean_line[date_end:].strip()
 | 
						|
 | 
						|
            # For frigate, remove the date part from message comparison
 | 
						|
            if service == "frigate":
 | 
						|
                # Skip the date at the start of the message if it exists
 | 
						|
                date_parts = full_message.split("]", 1)
 | 
						|
                if len(date_parts) > 1:
 | 
						|
                    message_part = date_parts[1].strip()
 | 
						|
                else:
 | 
						|
                    message_part = full_message
 | 
						|
            else:
 | 
						|
                message_part = full_message
 | 
						|
 | 
						|
            if message_part == last_message:
 | 
						|
                repeat_count += 1
 | 
						|
                continue
 | 
						|
            else:
 | 
						|
                if repeat_count > 0:
 | 
						|
                    # Insert a deduplication message formatted the same way as logs
 | 
						|
                    dedup_message = f"{last_timestamp}  [LOGGING] Last message repeated {repeat_count} times"
 | 
						|
                    log_lines.append(dedup_message)
 | 
						|
                    repeat_count = 0
 | 
						|
 | 
						|
                log_lines.append(clean_line)
 | 
						|
                last_timestamp = timestamp
 | 
						|
 | 
						|
                last_message = message_part
 | 
						|
 | 
						|
        except ValueError:
 | 
						|
            # If we can't parse the line properly, just add it as is
 | 
						|
            log_lines.append(clean_line)
 | 
						|
            continue
 | 
						|
 | 
						|
    # If there were repeated messages at the end, log the count
 | 
						|
    if repeat_count > 0:
 | 
						|
        dedup_message = (
 | 
						|
            f"{last_timestamp}  [LOGGING] Last message repeated {repeat_count} times"
 | 
						|
        )
 | 
						|
        log_lines.append(dedup_message)
 | 
						|
 | 
						|
    return len(log_lines), log_lines[start:end]
 | 
						|
 | 
						|
 | 
						|
def set_file_limit() -> None:
 | 
						|
    # Newer versions of containerd 2.X+ impose a very low soft file limit of 1024
 | 
						|
    # This applies to OSs like HA OS (see https://github.com/home-assistant/operating-system/issues/4110)
 | 
						|
    # Attempt to increase this limit
 | 
						|
    soft_limit = int(os.getenv("SOFT_FILE_LIMIT", "65536") or "65536")
 | 
						|
 | 
						|
    current_soft, current_hard = resource.getrlimit(resource.RLIMIT_NOFILE)
 | 
						|
    logger.debug(f"Current file limits - Soft: {current_soft}, Hard: {current_hard}")
 | 
						|
 | 
						|
    new_soft = min(soft_limit, current_hard)
 | 
						|
    resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, current_hard))
 | 
						|
    logger.debug(
 | 
						|
        f"File limit set. New soft limit: {new_soft}, Hard limit remains: {current_hard}"
 | 
						|
    )
 |