import copy import ctypes import datetime import json import logging import multiprocessing import os import re import shlex import signal import subprocess as sp import time import traceback import urllib.parse from abc import ABC, abstractmethod from collections import Counter from collections.abc import Mapping from multiprocessing import shared_memory from queue import Empty, Full from typing import Any, AnyStr, Optional, Tuple import cv2 import numpy as np import psutil import py3nvml.py3nvml as nvml import pytz import yaml from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT from faster_fifo import Queue as FFQueue from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS logger = logging.getLogger(__name__) def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict: """ :param dct1: First dict to merge :param dct2: Second dict to merge :param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True) :return: The merge dictionary """ merged = copy.deepcopy(dct1) for k, v2 in dct2.items(): if k in merged: v1 = merged[k] if isinstance(v1, dict) and isinstance(v2, Mapping): merged[k] = deep_merge(v1, v2, override) elif isinstance(v1, list) and isinstance(v2, list): if merge_lists: merged[k] = v1 + v2 else: if override: merged[k] = copy.deepcopy(v2) else: merged[k] = copy.deepcopy(v2) return merged def load_config_with_no_duplicates(raw_config) -> dict: """Get config ensuring duplicate keys are not allowed.""" # https://stackoverflow.com/a/71751051 class PreserveDuplicatesLoader(yaml.loader.Loader): pass def map_constructor(loader, node, deep=False): keys = [loader.construct_object(node, deep=deep) for node, _ in node.value] vals = [loader.construct_object(node, deep=deep) for _, node in node.value] key_count = Counter(keys) data = {} for key, val in zip(keys, vals): if key_count[key] > 1: raise ValueError( f"Config input {key} is defined multiple times for the same field, this is not allowed." ) else: data[key] = val return data PreserveDuplicatesLoader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, map_constructor ) return yaml.load(raw_config, PreserveDuplicatesLoader) def draw_timestamp( frame, timestamp, timestamp_format, font_effect=None, font_thickness=2, font_color=(255, 255, 255), position="tl", ): time_to_show = datetime.datetime.fromtimestamp(timestamp).strftime(timestamp_format) # calculate a dynamic font size size = cv2.getTextSize( time_to_show, cv2.FONT_HERSHEY_SIMPLEX, fontScale=1.0, thickness=font_thickness, ) text_width = size[0][0] desired_size = max(150, 0.33 * frame.shape[1]) font_scale = desired_size / text_width # calculate the actual size with the dynamic scale size = cv2.getTextSize( time_to_show, cv2.FONT_HERSHEY_SIMPLEX, fontScale=font_scale, thickness=font_thickness, ) image_width = frame.shape[1] image_height = frame.shape[0] text_width = size[0][0] text_height = size[0][1] line_height = text_height + size[1] if position == "tl": text_offset_x = 0 text_offset_y = 0 if 0 < line_height else 0 - (line_height + 8) elif position == "tr": text_offset_x = image_width - text_width text_offset_y = 0 if 0 < line_height else 0 - (line_height + 8) elif position == "bl": text_offset_x = 0 text_offset_y = image_height - (line_height + 8) elif position == "br": text_offset_x = image_width - text_width text_offset_y = image_height - (line_height + 8) if font_effect == "solid": # make the coords of the box with a small padding of two pixels timestamp_box_coords = np.array( [ [text_offset_x, text_offset_y], [text_offset_x + text_width, text_offset_y], [text_offset_x + text_width, text_offset_y + line_height + 8], [text_offset_x, text_offset_y + line_height + 8], ] ) cv2.fillPoly( frame, [timestamp_box_coords], # inverse color of text for background for max. contrast (255 - font_color[0], 255 - font_color[1], 255 - font_color[2]), ) elif font_effect == "shadow": cv2.putText( frame, time_to_show, (text_offset_x + 3, text_offset_y + line_height), cv2.FONT_HERSHEY_SIMPLEX, fontScale=font_scale, color=(255 - font_color[0], 255 - font_color[1], 255 - font_color[2]), thickness=font_thickness, ) cv2.putText( frame, time_to_show, (text_offset_x, text_offset_y + line_height - 3), cv2.FONT_HERSHEY_SIMPLEX, fontScale=font_scale, color=font_color, thickness=font_thickness, ) def draw_box_with_label( frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position="ul", ): if color is None: color = (0, 0, 255) display_text = "{}: {}".format(label, info) cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, thickness) font_scale = 0.5 font = cv2.FONT_HERSHEY_SIMPLEX # get the width and height of the text box size = cv2.getTextSize(display_text, font, fontScale=font_scale, thickness=2) text_width = size[0][0] text_height = size[0][1] line_height = text_height + size[1] # set the text start position if position == "ul": text_offset_x = x_min text_offset_y = 0 if y_min < line_height else y_min - (line_height + 8) elif position == "ur": text_offset_x = x_max - (text_width + 8) text_offset_y = 0 if y_min < line_height else y_min - (line_height + 8) elif position == "bl": text_offset_x = x_min text_offset_y = y_max elif position == "br": text_offset_x = x_max - (text_width + 8) text_offset_y = y_max # make the coords of the box with a small padding of two pixels textbox_coords = ( (text_offset_x, text_offset_y), (text_offset_x + text_width + 2, text_offset_y + line_height), ) cv2.rectangle(frame, textbox_coords[0], textbox_coords[1], color, cv2.FILLED) cv2.putText( frame, display_text, (text_offset_x, text_offset_y + line_height - 3), font, fontScale=font_scale, color=(0, 0, 0), thickness=2, ) def calculate_region(frame_shape, xmin, ymin, xmax, ymax, model_size, multiplier=2): # size is the longest edge and divisible by 4 size = int((max(xmax - xmin, ymax - ymin) * multiplier) // 4 * 4) # dont go any smaller than the model_size if size < model_size: size = model_size # x_offset is midpoint of bounding box minus half the size x_offset = int((xmax - xmin) / 2.0 + xmin - size / 2.0) # if outside the image if x_offset < 0: x_offset = 0 elif x_offset > (frame_shape[1] - size): x_offset = max(0, (frame_shape[1] - size)) # y_offset is midpoint of bounding box minus half the size y_offset = int((ymax - ymin) / 2.0 + ymin - size / 2.0) # # if outside the image if y_offset < 0: y_offset = 0 elif y_offset > (frame_shape[0] - size): y_offset = max(0, (frame_shape[0] - size)) return (x_offset, y_offset, x_offset + size, y_offset + size) def get_yuv_crop(frame_shape, crop): # crop should be (x1,y1,x2,y2) frame_height = frame_shape[0] // 3 * 2 frame_width = frame_shape[1] # compute the width/height of the uv channels uv_width = frame_width // 2 # width of the uv channels uv_height = frame_height // 4 # height of the uv channels # compute the offset for upper left corner of the uv channels uv_x_offset = crop[0] // 2 # x offset of the uv channels uv_y_offset = crop[1] // 4 # y offset of the uv channels # compute the width/height of the uv crops uv_crop_width = (crop[2] - crop[0]) // 2 # width of the cropped uv channels uv_crop_height = (crop[3] - crop[1]) // 4 # height of the cropped uv channels # ensure crop dimensions are multiples of 2 and 4 y = (crop[0], crop[1], crop[0] + uv_crop_width * 2, crop[1] + uv_crop_height * 4) u1 = ( 0 + uv_x_offset, frame_height + uv_y_offset, 0 + uv_x_offset + uv_crop_width, frame_height + uv_y_offset + uv_crop_height, ) u2 = ( uv_width + uv_x_offset, frame_height + uv_y_offset, uv_width + uv_x_offset + uv_crop_width, frame_height + uv_y_offset + uv_crop_height, ) v1 = ( 0 + uv_x_offset, frame_height + uv_height + uv_y_offset, 0 + uv_x_offset + uv_crop_width, frame_height + uv_height + uv_y_offset + uv_crop_height, ) v2 = ( uv_width + uv_x_offset, frame_height + uv_height + uv_y_offset, uv_width + uv_x_offset + uv_crop_width, frame_height + uv_height + uv_y_offset + uv_crop_height, ) return y, u1, u2, v1, v2 def yuv_crop_and_resize(frame, region, height=None): # Crops and resizes a YUV frame while maintaining aspect ratio # https://stackoverflow.com/a/57022634 height = frame.shape[0] // 3 * 2 width = frame.shape[1] # get the crop box if the region extends beyond the frame crop_x1 = max(0, region[0]) crop_y1 = max(0, region[1]) # ensure these are a multiple of 4 crop_x2 = min(width, region[2]) crop_y2 = min(height, region[3]) crop_box = (crop_x1, crop_y1, crop_x2, crop_y2) y, u1, u2, v1, v2 = get_yuv_crop(frame.shape, crop_box) # if the region starts outside the frame, indent the start point in the cropped frame y_channel_x_offset = abs(min(0, region[0])) y_channel_y_offset = abs(min(0, region[1])) uv_channel_x_offset = y_channel_x_offset // 2 uv_channel_y_offset = y_channel_y_offset // 4 # create the yuv region frame # make sure the size is a multiple of 4 # TODO: this should be based on the size after resize now size = (region[3] - region[1]) // 4 * 4 yuv_cropped_frame = np.zeros((size + size // 2, size), np.uint8) # fill in black yuv_cropped_frame[:] = 128 yuv_cropped_frame[0:size, 0:size] = 16 # copy the y channel yuv_cropped_frame[ y_channel_y_offset : y_channel_y_offset + y[3] - y[1], y_channel_x_offset : y_channel_x_offset + y[2] - y[0], ] = frame[y[1] : y[3], y[0] : y[2]] uv_crop_width = u1[2] - u1[0] uv_crop_height = u1[3] - u1[1] # copy u1 yuv_cropped_frame[ size + uv_channel_y_offset : size + uv_channel_y_offset + uv_crop_height, 0 + uv_channel_x_offset : 0 + uv_channel_x_offset + uv_crop_width, ] = frame[u1[1] : u1[3], u1[0] : u1[2]] # copy u2 yuv_cropped_frame[ size + uv_channel_y_offset : size + uv_channel_y_offset + uv_crop_height, size // 2 + uv_channel_x_offset : size // 2 + uv_channel_x_offset + uv_crop_width, ] = frame[u2[1] : u2[3], u2[0] : u2[2]] # copy v1 yuv_cropped_frame[ size + size // 4 + uv_channel_y_offset : size + size // 4 + uv_channel_y_offset + uv_crop_height, 0 + uv_channel_x_offset : 0 + uv_channel_x_offset + uv_crop_width, ] = frame[v1[1] : v1[3], v1[0] : v1[2]] # copy v2 yuv_cropped_frame[ size + size // 4 + uv_channel_y_offset : size + size // 4 + uv_channel_y_offset + uv_crop_height, size // 2 + uv_channel_x_offset : size // 2 + uv_channel_x_offset + uv_crop_width, ] = frame[v2[1] : v2[3], v2[0] : v2[2]] return yuv_cropped_frame def yuv_to_3_channel_yuv(yuv_frame): height = yuv_frame.shape[0] // 3 * 2 width = yuv_frame.shape[1] # flatten the image into array yuv_data = yuv_frame.ravel() # create a numpy array to hold all the 3 chanel yuv data all_yuv_data = np.empty((height, width, 3), dtype=np.uint8) y_count = height * width uv_count = y_count // 4 # copy the y_channel all_yuv_data[:, :, 0] = yuv_data[0:y_count].reshape((height, width)) # copy the u channel doubling each dimension all_yuv_data[:, :, 1] = np.repeat( np.reshape( np.repeat(yuv_data[y_count : y_count + uv_count], repeats=2, axis=0), (height // 2, width), ), repeats=2, axis=0, ) # copy the v channel doubling each dimension all_yuv_data[:, :, 2] = np.repeat( np.reshape( np.repeat( yuv_data[y_count + uv_count : y_count + uv_count + uv_count], repeats=2, axis=0, ), (height // 2, width), ), repeats=2, axis=0, ) return all_yuv_data def copy_yuv_to_position( destination_frame, destination_offset, destination_shape, source_frame=None, source_channel_dim=None, ): # get the coordinates of the channels for this position in the layout y, u1, u2, v1, v2 = get_yuv_crop( destination_frame.shape, ( destination_offset[1], destination_offset[0], destination_offset[1] + destination_shape[1], destination_offset[0] + destination_shape[0], ), ) # clear y destination_frame[ y[1] : y[3], y[0] : y[2], ] = 16 # clear u1 destination_frame[u1[1] : u1[3], u1[0] : u1[2]] = 128 # clear u2 destination_frame[u2[1] : u2[3], u2[0] : u2[2]] = 128 # clear v1 destination_frame[v1[1] : v1[3], v1[0] : v1[2]] = 128 # clear v2 destination_frame[v2[1] : v2[3], v2[0] : v2[2]] = 128 if source_frame is not None: # calculate the resized frame, maintaining the aspect ratio source_aspect_ratio = source_frame.shape[1] / (source_frame.shape[0] // 3 * 2) dest_aspect_ratio = destination_shape[1] / destination_shape[0] if source_aspect_ratio <= dest_aspect_ratio: y_resize_height = int(destination_shape[0] // 4 * 4) y_resize_width = int((y_resize_height * source_aspect_ratio) // 4 * 4) else: y_resize_width = int(destination_shape[1] // 4 * 4) y_resize_height = int((y_resize_width / source_aspect_ratio) // 4 * 4) uv_resize_width = int(y_resize_width // 2) uv_resize_height = int(y_resize_height // 4) y_y_offset = int((destination_shape[0] - y_resize_height) / 4 // 4 * 4) y_x_offset = int((destination_shape[1] - y_resize_width) / 2 // 4 * 4) uv_y_offset = y_y_offset // 4 uv_x_offset = y_x_offset // 2 interpolation = cv2.INTER_LINEAR # resize/copy y channel destination_frame[ y[1] + y_y_offset : y[1] + y_y_offset + y_resize_height, y[0] + y_x_offset : y[0] + y_x_offset + y_resize_width, ] = cv2.resize( source_frame[ source_channel_dim["y"][1] : source_channel_dim["y"][3], source_channel_dim["y"][0] : source_channel_dim["y"][2], ], dsize=(y_resize_width, y_resize_height), interpolation=interpolation, ) # resize/copy u1 destination_frame[ u1[1] + uv_y_offset : u1[1] + uv_y_offset + uv_resize_height, u1[0] + uv_x_offset : u1[0] + uv_x_offset + uv_resize_width, ] = cv2.resize( source_frame[ source_channel_dim["u1"][1] : source_channel_dim["u1"][3], source_channel_dim["u1"][0] : source_channel_dim["u1"][2], ], dsize=(uv_resize_width, uv_resize_height), interpolation=interpolation, ) # resize/copy u2 destination_frame[ u2[1] + uv_y_offset : u2[1] + uv_y_offset + uv_resize_height, u2[0] + uv_x_offset : u2[0] + uv_x_offset + uv_resize_width, ] = cv2.resize( source_frame[ source_channel_dim["u2"][1] : source_channel_dim["u2"][3], source_channel_dim["u2"][0] : source_channel_dim["u2"][2], ], dsize=(uv_resize_width, uv_resize_height), interpolation=interpolation, ) # resize/copy v1 destination_frame[ v1[1] + uv_y_offset : v1[1] + uv_y_offset + uv_resize_height, v1[0] + uv_x_offset : v1[0] + uv_x_offset + uv_resize_width, ] = cv2.resize( source_frame[ source_channel_dim["v1"][1] : source_channel_dim["v1"][3], source_channel_dim["v1"][0] : source_channel_dim["v1"][2], ], dsize=(uv_resize_width, uv_resize_height), interpolation=interpolation, ) # resize/copy v2 destination_frame[ v2[1] + uv_y_offset : v2[1] + uv_y_offset + uv_resize_height, v2[0] + uv_x_offset : v2[0] + uv_x_offset + uv_resize_width, ] = cv2.resize( source_frame[ source_channel_dim["v2"][1] : source_channel_dim["v2"][3], source_channel_dim["v2"][0] : source_channel_dim["v2"][2], ], dsize=(uv_resize_width, uv_resize_height), interpolation=interpolation, ) def yuv_region_2_yuv(frame, region): try: # TODO: does this copy the numpy array? yuv_cropped_frame = yuv_crop_and_resize(frame, region) return yuv_to_3_channel_yuv(yuv_cropped_frame) except: print(f"frame.shape: {frame.shape}") print(f"region: {region}") raise def yuv_region_2_rgb(frame, region): try: # TODO: does this copy the numpy array? yuv_cropped_frame = yuv_crop_and_resize(frame, region) return cv2.cvtColor(yuv_cropped_frame, cv2.COLOR_YUV2RGB_I420) except: print(f"frame.shape: {frame.shape}") print(f"region: {region}") raise def yuv_region_2_bgr(frame, region): try: yuv_cropped_frame = yuv_crop_and_resize(frame, region) return cv2.cvtColor(yuv_cropped_frame, cv2.COLOR_YUV2BGR_I420) except: print(f"frame.shape: {frame.shape}") print(f"region: {region}") raise def intersection(box_a, box_b) -> Optional[list[int]]: """Return intersection box or None if boxes do not intersect.""" if ( box_a[2] < box_b[0] or box_a[0] > box_b[2] or box_a[1] > box_b[3] or box_a[3] < box_b[1] ): return None return ( max(box_a[0], box_b[0]), max(box_a[1], box_b[1]), min(box_a[2], box_b[2]), min(box_a[3], box_b[3]), ) def area(box): return (box[2] - box[0] + 1) * (box[3] - box[1] + 1) def intersection_over_union(box_a, box_b): # determine the (x, y)-coordinates of the intersection rectangle intersect = intersection(box_a, box_b) if intersect is None: return 0.0 # compute the area of intersection rectangle inter_area = max(0, intersect[2] - intersect[0] + 1) * max( 0, intersect[3] - intersect[1] + 1 ) if inter_area == 0: return 0.0 # compute the area of both the prediction and ground-truth # rectangles box_a_area = (box_a[2] - box_a[0] + 1) * (box_a[3] - box_a[1] + 1) box_b_area = (box_b[2] - box_b[0] + 1) * (box_b[3] - box_b[1] + 1) # compute the intersection over union by taking the intersection # area and dividing it by the sum of prediction + ground-truth # areas - the interesection area iou = inter_area / float(box_a_area + box_b_area - inter_area) # return the intersection over union value return iou def clipped(obj, frame_shape): # if the object is within 5 pixels of the region border, and the region is not on the edge # consider the object to be clipped box = obj[2] region = obj[5] if ( (region[0] > 5 and box[0] - region[0] <= 5) or (region[1] > 5 and box[1] - region[1] <= 5) or (frame_shape[1] - region[2] > 5 and region[2] - box[2] <= 5) or (frame_shape[0] - region[3] > 5 and region[3] - box[3] <= 5) ): return True else: return False def restart_frigate(): proc = psutil.Process(1) # if this is running via s6, sigterm pid 1 if proc.name() == "s6-svscan": proc.terminate() # otherwise, just try and exit frigate else: os.kill(os.getpid(), signal.SIGTERM) class EventsPerSecond: def __init__(self, max_events=1000, last_n_seconds=10): self._start = None self._max_events = max_events self._last_n_seconds = last_n_seconds self._timestamps = [] def start(self): self._start = datetime.datetime.now().timestamp() def update(self): now = datetime.datetime.now().timestamp() if self._start is None: self._start = now self._timestamps.append(now) # truncate the list when it goes 100 over the max_size if len(self._timestamps) > self._max_events + 100: self._timestamps = self._timestamps[(1 - self._max_events) :] self.expire_timestamps(now) def eps(self): now = datetime.datetime.now().timestamp() if self._start is None: self._start = now # compute the (approximate) events in the last n seconds self.expire_timestamps(now) seconds = min(now - self._start, self._last_n_seconds) # avoid divide by zero if seconds == 0: seconds = 1 return len(self._timestamps) / seconds # remove aged out timestamps def expire_timestamps(self, now): threshold = now - self._last_n_seconds while self._timestamps and self._timestamps[0] < threshold: del self._timestamps[0] def print_stack(sig, frame): traceback.print_stack(frame) def listen(): signal.signal(signal.SIGUSR1, print_stack) def create_mask(frame_shape, mask): mask_img = np.zeros(frame_shape, np.uint8) mask_img[:] = 255 if isinstance(mask, list): for m in mask: add_mask(m, mask_img) elif isinstance(mask, str): add_mask(mask, mask_img) return mask_img def add_mask(mask, mask_img): points = mask.split(",") contour = np.array( [[int(points[i]), int(points[i + 1])] for i in range(0, len(points), 2)] ) cv2.fillPoly(mask_img, pts=[contour], color=(0)) def load_labels(path, encoding="utf-8"): """Loads labels from file (with or without index numbers). Args: path: path to label file. encoding: label file encoding. Returns: Dictionary mapping indices to labels. """ with open(path, "r", encoding=encoding) as f: labels = {index: "unknown" for index in range(91)} lines = f.readlines() if not lines: return {} if lines[0].split(" ", maxsplit=1)[0].isdigit(): pairs = [line.split(" ", maxsplit=1) for line in lines] labels.update({int(index): label.strip() for index, label in pairs}) else: labels.update({index: line.strip() for index, line in enumerate(lines)}) return labels def clean_camera_user_pass(line: str) -> str: """Removes user and password from line.""" if "rtsp://" in line: return re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line) else: return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", line) def escape_special_characters(path: str) -> str: """Cleans reserved characters to encodings for ffmpeg.""" try: found = re.search(REGEX_RTSP_CAMERA_USER_PASS, path).group(0)[3:-1] pw = found[(found.index(":") + 1) :] return path.replace(pw, urllib.parse.quote_plus(pw)) except AttributeError: # path does not have user:pass return path def get_cgroups_version() -> str: """Determine what version of cgroups is enabled.""" cgroup_path = "/sys/fs/cgroup" if not os.path.ismount(cgroup_path): logger.debug(f"{cgroup_path} is not a mount point.") return "unknown" try: with open("/proc/mounts", "r") as f: mounts = f.readlines() for mount in mounts: mount_info = mount.split() if mount_info[1] == cgroup_path: fs_type = mount_info[2] if fs_type == "cgroup2fs" or fs_type == "cgroup2": return "cgroup2" elif fs_type == "tmpfs": return "cgroup" else: logger.debug( f"Could not determine cgroups version: unhandled filesystem {fs_type}" ) break except Exception as e: logger.debug(f"Could not determine cgroups version: {e}") return "unknown" def get_docker_memlimit_bytes() -> int: """Get mem limit in bytes set in docker if present. Returns -1 if no limit detected.""" # check running a supported cgroups version if get_cgroups_version() == "cgroup2": memlimit_path = "/sys/fs/cgroup/memory.max" try: with open(memlimit_path, "r") as f: value = f.read().strip() if value.isnumeric(): return int(value) elif value.lower() == "max": return -1 except Exception as e: logger.debug(f"Unable to get docker memlimit: {e}") return -1 def get_cpu_stats() -> dict[str, dict]: """Get cpu usages for each process id""" usages = {} docker_memlimit = get_docker_memlimit_bytes() / 1024 total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024 for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]): pid = process.info["pid"] try: cpu_percent = process.info["cpu_percent"] cmdline = process.info["cmdline"] with open(f"/proc/{pid}/stat", "r") as f: stats = f.readline().split() utime = int(stats[13]) stime = int(stats[14]) starttime = int(stats[21]) with open("/proc/uptime") as f: system_uptime_sec = int(float(f.read().split()[0])) clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"]) process_utime_sec = utime // clk_tck process_stime_sec = stime // clk_tck process_starttime_sec = starttime // clk_tck process_elapsed_sec = system_uptime_sec - process_starttime_sec process_usage_sec = process_utime_sec + process_stime_sec cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec with open(f"/proc/{pid}/statm", "r") as f: mem_stats = f.readline().split() mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024 if docker_memlimit > 0: mem_pct = round((mem_res / docker_memlimit) * 100, 1) else: mem_pct = round((mem_res / total_mem) * 100, 1) usages[pid] = { "cpu": str(cpu_percent), "cpu_average": str(round(cpu_average_usage, 2)), "mem": f"{mem_pct}", "cmdline": " ".join(cmdline), } except Exception: continue return usages def get_physical_interfaces(interfaces) -> list: with open("/proc/net/dev", "r") as file: lines = file.readlines() physical_interfaces = [] for line in lines: if ":" in line: interface = line.split(":")[0].strip() for int in interfaces: if interface.startswith(int): physical_interfaces.append(interface) return physical_interfaces def get_bandwidth_stats(config) -> dict[str, dict]: """Get bandwidth usages for each ffmpeg process id""" usages = {} top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces( config.telemetry.network_interfaces ) p = sp.run( top_command, encoding="ascii", capture_output=True, ) if p.returncode != 0: return usages else: lines = p.stdout.split("\n") for line in lines: stats = list(filter(lambda a: a != "", line.strip().split("\t"))) try: if re.search( r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0] ): process = stats[0].split("/") usages[process[len(process) - 2]] = { "bandwidth": round(float(stats[1]) + float(stats[2]), 1), } except (IndexError, ValueError): continue return usages def get_amd_gpu_stats() -> dict[str, str]: """Get stats using radeontop.""" radeontop_command = ["radeontop", "-d", "-", "-l", "1"] p = sp.run( radeontop_command, encoding="ascii", capture_output=True, ) if p.returncode != 0: logger.error(f"Unable to poll radeon GPU stats: {p.stderr}") return None else: usages = p.stdout.split(",") results: dict[str, str] = {} for hw in usages: if "gpu" in hw: results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%" elif "vram" in hw: results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%" return results def get_intel_gpu_stats() -> dict[str, str]: """Get stats using intel_gpu_top.""" intel_gpu_top_command = [ "timeout", "0.5s", "intel_gpu_top", "-J", "-o", "-", "-s", "1", ] p = sp.run( intel_gpu_top_command, encoding="ascii", capture_output=True, ) # timeout has a non-zero returncode when timeout is reached if p.returncode != 124: logger.error(f"Unable to poll intel GPU stats: {p.stderr}") return None else: reading = "".join(p.stdout.split()) results: dict[str, str] = {} # render is used for qsv render = [] for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading): packet = json.loads(result[14:]) single = packet.get("busy", 0.0) render.append(float(single)) if render: render_avg = sum(render) / len(render) else: render_avg = 1 # video is used for vaapi video = [] for result in re.findall('"Video/\d":{[a-z":\d.,%]+}', reading): packet = json.loads(result[10:]) single = packet.get("busy", 0.0) video.append(float(single)) if video: video_avg = sum(video) / len(video) else: video_avg = 1 results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%" results["mem"] = "-%" return results def try_get_info(f, h, default="N/A"): try: v = f(h) except nvml.NVMLError_NotSupported: v = default return v def get_nvidia_gpu_stats() -> dict[int, dict]: results = {} try: nvml.nvmlInit() deviceCount = nvml.nvmlDeviceGetCount() for i in range(deviceCount): handle = nvml.nvmlDeviceGetHandleByIndex(i) meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle) util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle) if util != "N/A": gpu_util = util.gpu else: gpu_util = 0 if meminfo != "N/A": gpu_mem_util = meminfo.used / meminfo.total * 100 else: gpu_mem_util = -1 results[i] = { "name": nvml.nvmlDeviceGetName(handle), "gpu": gpu_util, "mem": gpu_mem_util, } except Exception: pass finally: return results def ffprobe_stream(path: str) -> sp.CompletedProcess: """Run ffprobe on stream.""" clean_path = escape_special_characters(path) ffprobe_cmd = [ "ffprobe", "-timeout", "1000000", "-print_format", "json", "-show_entries", "stream=codec_long_name,width,height,bit_rate,duration,display_aspect_ratio,avg_frame_rate", "-loglevel", "quiet", clean_path, ] return sp.run(ffprobe_cmd, capture_output=True) def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess: """Run vainfo.""" ffprobe_cmd = ( ["vainfo"] if not device_name else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"] ) return sp.run(ffprobe_cmd, capture_output=True) def get_ffmpeg_arg_list(arg: Any) -> list: """Use arg if list or convert to list format.""" return arg if isinstance(arg, list) else shlex.split(arg) class FrameManager(ABC): @abstractmethod def create(self, name, size) -> AnyStr: pass @abstractmethod def get(self, name, timeout_ms=0): pass @abstractmethod def close(self, name): pass @abstractmethod def delete(self, name): pass class DictFrameManager(FrameManager): def __init__(self): self.frames = {} def create(self, name, size) -> AnyStr: mem = bytearray(size) self.frames[name] = mem return mem def get(self, name, shape): mem = self.frames[name] return np.ndarray(shape, dtype=np.uint8, buffer=mem) def close(self, name): pass def delete(self, name): del self.frames[name] class SharedMemoryFrameManager(FrameManager): def __init__(self): self.shm_store = {} def create(self, name, size) -> AnyStr: shm = shared_memory.SharedMemory(name=name, create=True, size=size) self.shm_store[name] = shm return shm.buf def get(self, name, shape): if name in self.shm_store: shm = self.shm_store[name] else: shm = shared_memory.SharedMemory(name=name) self.shm_store[name] = shm return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) def close(self, name): if name in self.shm_store: self.shm_store[name].close() del self.shm_store[name] def delete(self, name): if name in self.shm_store: self.shm_store[name].close() self.shm_store[name].unlink() del self.shm_store[name] def get_tz_modifiers(tz_name: str) -> Tuple[str, str]: seconds_offset = ( datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds() ) hours_offset = int(seconds_offset / 60 / 60) minutes_offset = int(seconds_offset / 60 - hours_offset * 60) hour_modifier = f"{hours_offset} hour" minute_modifier = f"{minutes_offset} minute" return hour_modifier, minute_modifier def to_relative_box( width: int, height: int, box: Tuple[int, int, int, int] ) -> Tuple[int, int, int, int]: return ( box[0] / width, # x box[1] / height, # y (box[2] - box[0]) / width, # w (box[3] - box[1]) / height, # h ) def get_video_properties(url, get_duration=False): def calculate_duration(video: Optional[any]) -> float: duration = None if video is not None: # Get the frames per second (fps) of the video stream fps = video.get(cv2.CAP_PROP_FPS) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) if fps and total_frames: duration = total_frames / fps # if cv2 failed need to use ffprobe if duration is None: ffprobe_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", f"{url}", ] p = sp.run(ffprobe_cmd, capture_output=True) if p.returncode == 0 and p.stdout.decode(): duration = float(p.stdout.decode().strip()) else: duration = -1 return duration width = height = 0 try: # Open the video stream video = cv2.VideoCapture(url) # Check if the video stream was opened successfully if not video.isOpened(): video = None except Exception: video = None result = {} if get_duration: result["duration"] = calculate_duration(video) if video is not None: # Get the width of frames in the video stream width = video.get(cv2.CAP_PROP_FRAME_WIDTH) # Get the height of frames in the video stream height = video.get(cv2.CAP_PROP_FRAME_HEIGHT) # Release the video stream video.release() result["width"] = round(width) result["height"] = round(height) return result class LimitedQueue(FFQueue): def __init__( self, maxsize=0, max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE, loads=None, dumps=None, ): super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) self.maxsize = maxsize self.size = multiprocessing.RawValue( ctypes.c_int, 0 ) # Add a counter for the number of items in the queue def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): if self.maxsize > 0 and self.size.value >= self.maxsize: if block: start_time = time.time() while self.size.value >= self.maxsize: remaining = timeout - (time.time() - start_time) if remaining <= 0.0: raise Full time.sleep(min(remaining, 0.1)) else: raise Full self.size.value += 1 return super().put(x, block=block, timeout=timeout) def get(self, block=True, timeout=DEFAULT_TIMEOUT): if self.size.value <= 0 and not block: raise Empty self.size.value -= 1 return super().get(block=block, timeout=timeout) def qsize(self): return self.size def empty(self): return self.qsize() == 0 def full(self): return self.qsize() == self.maxsize