blakeblackshear.frigate/frigate/util.py

959 lines
29 KiB
Python
Raw Normal View History

2021-06-24 07:45:27 +02:00
import copy
2020-11-04 13:31:25 +01:00
import datetime
2021-01-14 14:19:12 +01:00
import logging
import subprocess as sp
import json
import re
2020-11-04 13:31:25 +01:00
import signal
import traceback
import urllib.parse
import yaml
2020-11-04 13:31:25 +01:00
from abc import ABC, abstractmethod
from collections import Counter
from collections.abc import Mapping
from multiprocessing import shared_memory
from typing import Any, AnyStr
2020-11-04 13:31:25 +01:00
import cv2
import numpy as np
2021-06-23 09:34:08 +02:00
import os
2021-09-18 14:40:27 +02:00
import psutil
2020-11-04 13:31:25 +01:00
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
2021-01-14 14:19:12 +01:00
logger = logging.getLogger(__name__)
2021-06-22 14:02:00 +02:00
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
2021-06-24 07:45:27 +02:00
"""
:param dct1: First dict to merge
:param dct2: Second dict to merge
:param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True)
:return: The merge dictionary
"""
merged = copy.deepcopy(dct1)
for k, v2 in dct2.items():
if k in merged:
v1 = merged[k]
if isinstance(v1, dict) and isinstance(v2, Mapping):
2021-06-24 07:45:27 +02:00
merged[k] = deep_merge(v1, v2, override)
elif isinstance(v1, list) and isinstance(v2, list):
if merge_lists:
merged[k] = v1 + v2
2021-06-24 07:45:27 +02:00
else:
if override:
merged[k] = copy.deepcopy(v2)
else:
merged[k] = copy.deepcopy(v2)
return merged
def load_config_with_no_duplicates(raw_config) -> dict:
"""Get config ensuring duplicate keys are not allowed."""
# https://stackoverflow.com/a/71751051
class PreserveDuplicatesLoader(yaml.loader.Loader):
pass
def map_constructor(loader, node, deep=False):
keys = [loader.construct_object(node, deep=deep) for node, _ in node.value]
vals = [loader.construct_object(node, deep=deep) for _, node in node.value]
key_count = Counter(keys)
data = {}
for key, val in zip(keys, vals):
if key_count[key] > 1:
raise ValueError(
f"Config input {key} is defined multiple times for the same field, this is not allowed."
)
else:
data[key] = val
return data
PreserveDuplicatesLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, map_constructor
)
return yaml.load(raw_config, PreserveDuplicatesLoader)
def draw_timestamp(
frame,
timestamp,
timestamp_format,
font_effect=None,
font_thickness=2,
font_color=(255, 255, 255),
2021-06-22 14:02:00 +02:00
position="tl",
):
time_to_show = datetime.datetime.fromtimestamp(timestamp).strftime(timestamp_format)
# calculate a dynamic font size
size = cv2.getTextSize(
time_to_show,
cv2.FONT_HERSHEY_SIMPLEX,
fontScale=1.0,
thickness=font_thickness,
)
text_width = size[0][0]
desired_size = max(150, 0.33 * frame.shape[1])
font_scale = desired_size / text_width
# calculate the actual size with the dynamic scale
size = cv2.getTextSize(
2021-06-22 14:02:00 +02:00
time_to_show,
cv2.FONT_HERSHEY_SIMPLEX,
fontScale=font_scale,
thickness=font_thickness,
)
image_width = frame.shape[1]
image_height = frame.shape[0]
text_width = size[0][0]
text_height = size[0][1]
line_height = text_height + size[1]
2021-06-22 14:02:00 +02:00
if position == "tl":
text_offset_x = 0
text_offset_y = 0 if 0 < line_height else 0 - (line_height + 8)
2021-06-22 14:02:00 +02:00
elif position == "tr":
text_offset_x = image_width - text_width
text_offset_y = 0 if 0 < line_height else 0 - (line_height + 8)
elif position == "bl":
text_offset_x = 0
text_offset_y = image_height - (line_height + 8)
elif position == "br":
text_offset_x = image_width - text_width
text_offset_y = image_height - (line_height + 8)
if font_effect == "solid":
# make the coords of the box with a small padding of two pixels
2021-06-22 14:02:00 +02:00
timestamp_box_coords = np.array(
[
[text_offset_x, text_offset_y],
[text_offset_x + text_width, text_offset_y],
[text_offset_x + text_width, text_offset_y + line_height + 8],
[text_offset_x, text_offset_y + line_height + 8],
]
)
cv2.fillPoly(
2021-06-22 14:02:00 +02:00
frame,
[timestamp_box_coords],
# inverse color of text for background for max. contrast
2021-06-22 14:02:00 +02:00
(255 - font_color[0], 255 - font_color[1], 255 - font_color[2]),
)
elif font_effect == "shadow":
cv2.putText(
frame,
time_to_show,
(text_offset_x + 3, text_offset_y + line_height),
cv2.FONT_HERSHEY_SIMPLEX,
2021-06-22 14:02:00 +02:00
fontScale=font_scale,
color=(255 - font_color[0], 255 - font_color[1], 255 - font_color[2]),
thickness=font_thickness,
)
cv2.putText(
frame,
time_to_show,
(text_offset_x, text_offset_y + line_height - 3),
cv2.FONT_HERSHEY_SIMPLEX,
2021-06-22 14:02:00 +02:00
fontScale=font_scale,
color=font_color,
thickness=font_thickness,
)
2020-11-04 13:31:25 +01:00
2021-06-22 14:02:00 +02:00
2021-02-17 14:23:32 +01:00
def draw_box_with_label(
frame,
x_min,
y_min,
x_max,
y_max,
label,
info,
thickness=2,
color=None,
position="ul",
):
2020-02-16 04:07:54 +01:00
if color is None:
2021-02-17 14:23:32 +01:00
color = (0, 0, 255)
2020-02-16 04:07:54 +01:00
display_text = "{}: {}".format(label, info)
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, thickness)
font_scale = 0.5
font = cv2.FONT_HERSHEY_SIMPLEX
# get the width and height of the text box
size = cv2.getTextSize(display_text, font, fontScale=font_scale, thickness=2)
text_width = size[0][0]
text_height = size[0][1]
line_height = text_height + size[1]
# set the text start position
2021-02-17 14:23:32 +01:00
if position == "ul":
2020-02-16 04:07:54 +01:00
text_offset_x = x_min
2021-02-17 14:23:32 +01:00
text_offset_y = 0 if y_min < line_height else y_min - (line_height + 8)
elif position == "ur":
text_offset_x = x_max - (text_width + 8)
text_offset_y = 0 if y_min < line_height else y_min - (line_height + 8)
elif position == "bl":
2020-02-16 04:07:54 +01:00
text_offset_x = x_min
text_offset_y = y_max
2021-02-17 14:23:32 +01:00
elif position == "br":
text_offset_x = x_max - (text_width + 8)
2020-02-16 04:07:54 +01:00
text_offset_y = y_max
# make the coords of the box with a small padding of two pixels
2021-02-17 14:23:32 +01:00
textbox_coords = (
(text_offset_x, text_offset_y),
(text_offset_x + text_width + 2, text_offset_y + line_height),
)
2020-02-16 04:07:54 +01:00
cv2.rectangle(frame, textbox_coords[0], textbox_coords[1], color, cv2.FILLED)
2021-02-17 14:23:32 +01:00
cv2.putText(
frame,
display_text,
(text_offset_x, text_offset_y + line_height - 3),
font,
fontScale=font_scale,
color=(0, 0, 0),
thickness=2,
)
2020-02-16 04:07:54 +01:00
2021-02-17 14:23:32 +01:00
def calculate_region(frame_shape, xmin, ymin, xmax, ymax, model_size, multiplier=2):
# size is the longest edge and divisible by 4
2021-10-30 14:24:26 +02:00
size = int((max(xmax - xmin, ymax - ymin) * multiplier) // 4 * 4)
# dont go any smaller than the model_size
if size < model_size:
size = model_size
# x_offset is midpoint of bounding box minus half the size
2021-02-17 14:23:32 +01:00
x_offset = int((xmax - xmin) / 2.0 + xmin - size / 2.0)
# if outside the image
if x_offset < 0:
x_offset = 0
2021-02-17 14:23:32 +01:00
elif x_offset > (frame_shape[1] - size):
x_offset = max(0, (frame_shape[1] - size))
2019-12-31 21:59:22 +01:00
# y_offset is midpoint of bounding box minus half the size
2021-02-17 14:23:32 +01:00
y_offset = int((ymax - ymin) / 2.0 + ymin - size / 2.0)
# # if outside the image
if y_offset < 0:
y_offset = 0
2021-02-17 14:23:32 +01:00
elif y_offset > (frame_shape[0] - size):
y_offset = max(0, (frame_shape[0] - size))
return (x_offset, y_offset, x_offset + size, y_offset + size)
def get_yuv_crop(frame_shape, crop):
# crop should be (x1,y1,x2,y2)
2021-02-17 14:23:32 +01:00
frame_height = frame_shape[0] // 3 * 2
frame_width = frame_shape[1]
# compute the width/height of the uv channels
2021-02-17 14:23:32 +01:00
uv_width = frame_width // 2 # width of the uv channels
uv_height = frame_height // 4 # height of the uv channels
# compute the offset for upper left corner of the uv channels
2021-02-17 14:23:32 +01:00
uv_x_offset = crop[0] // 2 # x offset of the uv channels
uv_y_offset = crop[1] // 4 # y offset of the uv channels
# compute the width/height of the uv crops
2021-02-17 14:23:32 +01:00
uv_crop_width = (crop[2] - crop[0]) // 2 # width of the cropped uv channels
uv_crop_height = (crop[3] - crop[1]) // 4 # height of the cropped uv channels
# ensure crop dimensions are multiples of 2 and 4
2021-02-17 14:23:32 +01:00
y = (crop[0], crop[1], crop[0] + uv_crop_width * 2, crop[1] + uv_crop_height * 4)
u1 = (
2021-02-17 14:23:32 +01:00
0 + uv_x_offset,
frame_height + uv_y_offset,
2021-02-17 14:23:32 +01:00
0 + uv_x_offset + uv_crop_width,
frame_height + uv_y_offset + uv_crop_height,
)
u2 = (
2021-02-17 14:23:32 +01:00
uv_width + uv_x_offset,
frame_height + uv_y_offset,
2021-02-17 14:23:32 +01:00
uv_width + uv_x_offset + uv_crop_width,
frame_height + uv_y_offset + uv_crop_height,
)
v1 = (
2021-02-17 14:23:32 +01:00
0 + uv_x_offset,
frame_height + uv_height + uv_y_offset,
0 + uv_x_offset + uv_crop_width,
frame_height + uv_height + uv_y_offset + uv_crop_height,
)
v2 = (
2021-02-17 14:23:32 +01:00
uv_width + uv_x_offset,
frame_height + uv_height + uv_y_offset,
uv_width + uv_x_offset + uv_crop_width,
frame_height + uv_height + uv_y_offset + uv_crop_height,
)
return y, u1, u2, v1, v2
2021-02-17 14:23:32 +01:00
2021-05-08 15:27:27 +02:00
def yuv_crop_and_resize(frame, region, height=None):
# Crops and resizes a YUV frame while maintaining aspect ratio
# https://stackoverflow.com/a/57022634
height = frame.shape[0] // 3 * 2
width = frame.shape[1]
# get the crop box if the region extends beyond the frame
crop_x1 = max(0, region[0])
crop_y1 = max(0, region[1])
# ensure these are a multiple of 4
crop_x2 = min(width, region[2])
crop_y2 = min(height, region[3])
crop_box = (crop_x1, crop_y1, crop_x2, crop_y2)
y, u1, u2, v1, v2 = get_yuv_crop(frame.shape, crop_box)
# if the region starts outside the frame, indent the start point in the cropped frame
y_channel_x_offset = abs(min(0, region[0]))
y_channel_y_offset = abs(min(0, region[1]))
uv_channel_x_offset = y_channel_x_offset // 2
uv_channel_y_offset = y_channel_y_offset // 4
# create the yuv region frame
# make sure the size is a multiple of 4
# TODO: this should be based on the size after resize now
size = (region[3] - region[1]) // 4 * 4
yuv_cropped_frame = np.zeros((size + size // 2, size), np.uint8)
# fill in black
yuv_cropped_frame[:] = 128
yuv_cropped_frame[0:size, 0:size] = 16
# copy the y channel
yuv_cropped_frame[
y_channel_y_offset : y_channel_y_offset + y[3] - y[1],
y_channel_x_offset : y_channel_x_offset + y[2] - y[0],
] = frame[y[1] : y[3], y[0] : y[2]]
uv_crop_width = u1[2] - u1[0]
uv_crop_height = u1[3] - u1[1]
# copy u1
yuv_cropped_frame[
size + uv_channel_y_offset : size + uv_channel_y_offset + uv_crop_height,
0 + uv_channel_x_offset : 0 + uv_channel_x_offset + uv_crop_width,
] = frame[u1[1] : u1[3], u1[0] : u1[2]]
# copy u2
yuv_cropped_frame[
size + uv_channel_y_offset : size + uv_channel_y_offset + uv_crop_height,
size // 2
+ uv_channel_x_offset : size // 2
+ uv_channel_x_offset
+ uv_crop_width,
] = frame[u2[1] : u2[3], u2[0] : u2[2]]
# copy v1
yuv_cropped_frame[
size
+ size // 4
+ uv_channel_y_offset : size
+ size // 4
+ uv_channel_y_offset
+ uv_crop_height,
0 + uv_channel_x_offset : 0 + uv_channel_x_offset + uv_crop_width,
] = frame[v1[1] : v1[3], v1[0] : v1[2]]
# copy v2
yuv_cropped_frame[
size
+ size // 4
+ uv_channel_y_offset : size
+ size // 4
+ uv_channel_y_offset
+ uv_crop_height,
size // 2
+ uv_channel_x_offset : size // 2
+ uv_channel_x_offset
+ uv_crop_width,
] = frame[v2[1] : v2[3], v2[0] : v2[2]]
return yuv_cropped_frame
2022-11-27 02:15:47 +01:00
def yuv_to_3_channel_yuv(yuv_frame):
height = yuv_frame.shape[0] // 3 * 2
width = yuv_frame.shape[1]
# flatten the image into array
yuv_data = yuv_frame.ravel()
# create a numpy array to hold all the 3 chanel yuv data
all_yuv_data = np.empty((height, width, 3), dtype=np.uint8)
y_count = height * width
uv_count = y_count // 4
# copy the y_channel
all_yuv_data[:, :, 0] = yuv_data[0:y_count].reshape((height, width))
# copy the u channel doubling each dimension
all_yuv_data[:, :, 1] = np.repeat(
np.reshape(
np.repeat(yuv_data[y_count : y_count + uv_count], repeats=2, axis=0),
(height // 2, width),
),
repeats=2,
axis=0,
)
# copy the v channel doubling each dimension
all_yuv_data[:, :, 2] = np.repeat(
np.reshape(
np.repeat(
yuv_data[y_count + uv_count : y_count + uv_count + uv_count],
repeats=2,
axis=0,
),
(height // 2, width),
),
repeats=2,
axis=0,
)
return all_yuv_data
2021-06-09 14:41:30 +02:00
def copy_yuv_to_position(
destination_frame,
destination_offset,
destination_shape,
2021-06-09 14:41:30 +02:00
source_frame=None,
source_channel_dim=None,
):
# get the coordinates of the channels for this position in the layout
y, u1, u2, v1, v2 = get_yuv_crop(
destination_frame.shape,
(
destination_offset[1],
destination_offset[0],
destination_offset[1] + destination_shape[1],
destination_offset[0] + destination_shape[0],
2021-06-09 14:41:30 +02:00
),
)
2021-06-23 14:38:30 +02:00
# clear y
destination_frame[
y[1] : y[3],
y[0] : y[2],
] = 16
# clear u1
destination_frame[u1[1] : u1[3], u1[0] : u1[2]] = 128
# clear u2
destination_frame[u2[1] : u2[3], u2[0] : u2[2]] = 128
# clear v1
destination_frame[v1[1] : v1[3], v1[0] : v1[2]] = 128
# clear v2
destination_frame[v2[1] : v2[3], v2[0] : v2[2]] = 128
if not source_frame is None:
2021-06-23 14:36:47 +02:00
# calculate the resized frame, maintaining the aspect ratio
source_aspect_ratio = source_frame.shape[1] / (source_frame.shape[0] // 3 * 2)
dest_aspect_ratio = destination_shape[1] / destination_shape[0]
if source_aspect_ratio <= dest_aspect_ratio:
y_resize_height = int(destination_shape[0] // 4 * 4)
y_resize_width = int((y_resize_height * source_aspect_ratio) // 4 * 4)
else:
y_resize_width = int(destination_shape[1] // 4 * 4)
y_resize_height = int((y_resize_width / source_aspect_ratio) // 4 * 4)
uv_resize_width = int(y_resize_width // 2)
uv_resize_height = int(y_resize_height // 4)
y_y_offset = int((destination_shape[0] - y_resize_height) / 4 // 4 * 4)
y_x_offset = int((destination_shape[1] - y_resize_width) / 2 // 4 * 4)
uv_y_offset = y_y_offset // 4
uv_x_offset = y_x_offset // 2
2021-06-10 15:05:06 +02:00
interpolation = cv2.INTER_LINEAR
2021-06-09 14:41:30 +02:00
# resize/copy y channel
2021-06-23 14:36:47 +02:00
destination_frame[
y[1] + y_y_offset : y[1] + y_y_offset + y_resize_height,
y[0] + y_x_offset : y[0] + y_x_offset + y_resize_width,
] = cv2.resize(
2021-06-09 14:41:30 +02:00
source_frame[
source_channel_dim["y"][1] : source_channel_dim["y"][3],
source_channel_dim["y"][0] : source_channel_dim["y"][2],
],
2021-06-23 14:36:47 +02:00
dsize=(y_resize_width, y_resize_height),
2021-06-09 14:41:30 +02:00
interpolation=interpolation,
)
# resize/copy u1
2021-06-23 14:36:47 +02:00
destination_frame[
u1[1] + uv_y_offset : u1[1] + uv_y_offset + uv_resize_height,
u1[0] + uv_x_offset : u1[0] + uv_x_offset + uv_resize_width,
] = cv2.resize(
2021-06-09 14:41:30 +02:00
source_frame[
source_channel_dim["u1"][1] : source_channel_dim["u1"][3],
source_channel_dim["u1"][0] : source_channel_dim["u1"][2],
],
2021-06-23 14:36:47 +02:00
dsize=(uv_resize_width, uv_resize_height),
2021-06-09 14:41:30 +02:00
interpolation=interpolation,
)
# resize/copy u2
2021-06-23 14:36:47 +02:00
destination_frame[
u2[1] + uv_y_offset : u2[1] + uv_y_offset + uv_resize_height,
u2[0] + uv_x_offset : u2[0] + uv_x_offset + uv_resize_width,
] = cv2.resize(
2021-06-09 14:41:30 +02:00
source_frame[
source_channel_dim["u2"][1] : source_channel_dim["u2"][3],
source_channel_dim["u2"][0] : source_channel_dim["u2"][2],
],
2021-06-23 14:36:47 +02:00
dsize=(uv_resize_width, uv_resize_height),
2021-06-09 14:41:30 +02:00
interpolation=interpolation,
)
# resize/copy v1
2021-06-23 14:36:47 +02:00
destination_frame[
v1[1] + uv_y_offset : v1[1] + uv_y_offset + uv_resize_height,
v1[0] + uv_x_offset : v1[0] + uv_x_offset + uv_resize_width,
] = cv2.resize(
2021-06-09 14:41:30 +02:00
source_frame[
source_channel_dim["v1"][1] : source_channel_dim["v1"][3],
source_channel_dim["v1"][0] : source_channel_dim["v1"][2],
],
2021-06-23 14:36:47 +02:00
dsize=(uv_resize_width, uv_resize_height),
2021-06-09 14:41:30 +02:00
interpolation=interpolation,
)
# resize/copy v2
2021-06-23 14:36:47 +02:00
destination_frame[
v2[1] + uv_y_offset : v2[1] + uv_y_offset + uv_resize_height,
v2[0] + uv_x_offset : v2[0] + uv_x_offset + uv_resize_width,
] = cv2.resize(
2021-06-09 14:41:30 +02:00
source_frame[
source_channel_dim["v2"][1] : source_channel_dim["v2"][3],
source_channel_dim["v2"][0] : source_channel_dim["v2"][2],
],
2021-06-23 14:36:47 +02:00
dsize=(uv_resize_width, uv_resize_height),
2021-06-09 14:41:30 +02:00
interpolation=interpolation,
)
2022-11-27 02:15:47 +01:00
def yuv_region_2_yuv(frame, region):
try:
# TODO: does this copy the numpy array?
yuv_cropped_frame = yuv_crop_and_resize(frame, region)
return yuv_to_3_channel_yuv(yuv_cropped_frame)
except:
print(f"frame.shape: {frame.shape}")
print(f"region: {region}")
raise
2020-10-11 04:28:12 +02:00
def yuv_region_2_rgb(frame, region):
try:
2021-05-08 15:27:27 +02:00
# TODO: does this copy the numpy array?
yuv_cropped_frame = yuv_crop_and_resize(frame, region)
return cv2.cvtColor(yuv_cropped_frame, cv2.COLOR_YUV2RGB_I420)
except:
print(f"frame.shape: {frame.shape}")
print(f"region: {region}")
raise
2020-10-11 04:28:12 +02:00
2021-02-17 14:23:32 +01:00
def yuv_region_2_bgr(frame, region):
try:
yuv_cropped_frame = yuv_crop_and_resize(frame, region)
return cv2.cvtColor(yuv_cropped_frame, cv2.COLOR_YUV2BGR_I420)
except:
print(f"frame.shape: {frame.shape}")
print(f"region: {region}")
raise
2020-02-16 04:07:54 +01:00
def intersection(box_a, box_b):
return (
max(box_a[0], box_b[0]),
max(box_a[1], box_b[1]),
min(box_a[2], box_b[2]),
2021-02-17 14:23:32 +01:00
min(box_a[3], box_b[3]),
2020-02-16 04:07:54 +01:00
)
2021-02-17 14:23:32 +01:00
2020-02-16 04:07:54 +01:00
def area(box):
2021-02-17 14:23:32 +01:00
return (box[2] - box[0] + 1) * (box[3] - box[1] + 1)
2020-02-16 04:07:54 +01:00
def intersection_over_union(box_a, box_b):
2019-12-31 21:59:22 +01:00
# determine the (x, y)-coordinates of the intersection rectangle
2020-02-16 04:07:54 +01:00
intersect = intersection(box_a, box_b)
2019-12-31 21:59:22 +01:00
# compute the area of intersection rectangle
2021-02-17 14:23:32 +01:00
inter_area = max(0, intersect[2] - intersect[0] + 1) * max(
0, intersect[3] - intersect[1] + 1
)
2019-12-31 21:59:22 +01:00
if inter_area == 0:
return 0.0
2021-02-17 14:23:32 +01:00
2019-12-31 21:59:22 +01:00
# compute the area of both the prediction and ground-truth
# rectangles
2020-02-16 04:07:54 +01:00
box_a_area = (box_a[2] - box_a[0] + 1) * (box_a[3] - box_a[1] + 1)
box_b_area = (box_b[2] - box_b[0] + 1) * (box_b[3] - box_b[1] + 1)
2019-12-31 21:59:22 +01:00
# compute the intersection over union by taking the intersection
# area and dividing it by the sum of prediction + ground-truth
# areas - the interesection area
iou = inter_area / float(box_a_area + box_b_area - inter_area)
# return the intersection over union value
return iou
2021-02-17 14:23:32 +01:00
2020-02-16 04:07:54 +01:00
def clipped(obj, frame_shape):
# if the object is within 5 pixels of the region border, and the region is not on the edge
# consider the object to be clipped
box = obj[2]
region = obj[5]
2021-02-17 14:23:32 +01:00
if (
(region[0] > 5 and box[0] - region[0] <= 5)
or (region[1] > 5 and box[1] - region[1] <= 5)
or (frame_shape[1] - region[2] > 5 and region[2] - box[2] <= 5)
or (frame_shape[0] - region[3] > 5 and region[3] - box[3] <= 5)
):
2020-02-16 04:07:54 +01:00
return True
else:
return False
2021-02-17 14:23:32 +01:00
2021-06-21 17:03:33 +02:00
def restart_frigate():
2021-09-18 14:40:27 +02:00
proc = psutil.Process(1)
# if this is running via s6, sigterm pid 1
if proc.name() == "s6-svscan":
proc.terminate()
# otherwise, just try and exit frigate
else:
os.kill(os.getpid(), signal.SIGTERM)
2021-06-21 17:03:33 +02:00
class EventsPerSecond:
def __init__(self, max_events=1000):
self._start = None
self._max_events = max_events
self._timestamps = []
2021-02-17 14:23:32 +01:00
def start(self):
self._start = datetime.datetime.now().timestamp()
def update(self):
2020-09-13 05:29:53 +02:00
if self._start is None:
self.start()
self._timestamps.append(datetime.datetime.now().timestamp())
# truncate the list when it goes 100 over the max_size
2021-02-17 14:23:32 +01:00
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
def eps(self, last_n_seconds=10):
2020-09-13 05:29:53 +02:00
if self._start is None:
self.start()
2021-02-17 14:23:32 +01:00
# compute the (approximate) events in the last n seconds
now = datetime.datetime.now().timestamp()
2021-02-17 14:23:32 +01:00
seconds = min(now - self._start, last_n_seconds)
2022-02-06 20:28:53 +01:00
# avoid divide by zero
if seconds == 0:
seconds = 1
2021-02-17 14:23:32 +01:00
return (
len([t for t in self._timestamps if t > (now - last_n_seconds)]) / seconds
)
def print_stack(sig, frame):
traceback.print_stack(frame)
2021-02-17 14:23:32 +01:00
def listen():
signal.signal(signal.SIGUSR1, print_stack)
2021-02-17 14:23:32 +01:00
2021-01-14 14:19:12 +01:00
def create_mask(frame_shape, mask):
mask_img = np.zeros(frame_shape, np.uint8)
mask_img[:] = 255
if isinstance(mask, list):
for m in mask:
add_mask(m, mask_img)
elif isinstance(mask, str):
add_mask(mask, mask_img)
return mask_img
2021-02-17 14:23:32 +01:00
2021-01-14 14:19:12 +01:00
def add_mask(mask, mask_img):
2021-02-17 14:23:32 +01:00
points = mask.split(",")
contour = np.array(
[[int(points[i]), int(points[i + 1])] for i in range(0, len(points), 2)]
)
2021-01-16 14:04:17 +01:00
cv2.fillPoly(mask_img, pts=[contour], color=(0))
2021-01-14 14:19:12 +01:00
2022-02-06 20:28:53 +01:00
def load_labels(path, encoding="utf-8"):
"""Loads labels from file (with or without index numbers).
Args:
path: path to label file.
encoding: label file encoding.
Returns:
Dictionary mapping indices to labels.
"""
with open(path, "r", encoding=encoding) as f:
lines = f.readlines()
if not lines:
return {}
if lines[0].split(" ", maxsplit=1)[0].isdigit():
pairs = [line.split(" ", maxsplit=1) for line in lines]
return {int(index): label.strip() for index, label in pairs}
else:
return {index: line.strip() for index, line in enumerate(lines)}
2021-02-17 14:23:32 +01:00
2022-02-06 20:28:53 +01:00
def clean_camera_user_pass(line: str) -> str:
"""Removes user and password from line."""
if line.startswith("rtsp://"):
return re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)
else:
return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", line)
def escape_special_characters(path: str) -> str:
"""Cleans reserved characters to encodings for ffmpeg."""
try:
found = re.search(REGEX_RTSP_CAMERA_USER_PASS, path).group(0)[3:-1]
pw = found[(found.index(":") + 1) :]
return path.replace(pw, urllib.parse.quote_plus(pw))
except AttributeError:
# path does not have user:pass
return path
def get_cpu_stats() -> dict[str, dict]:
"""Get cpu usages for each process id"""
usages = {}
# -n=2 runs to ensure extraneous values are not included
top_command = ["top", "-b", "-n", "2"]
p = sp.run(
top_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(p.stderr)
return usages
else:
lines = p.stdout.split("\n")
for line in lines:
stats = list(filter(lambda a: a != "", line.strip().split(" ")))
try:
usages[stats[0]] = {
"cpu": stats[8],
"mem": stats[9],
}
except:
continue
return usages
Add GPU stats to the /stats API and debug screen (#3931) * Add ffprobe endpoint * Get ffprobe for multiple inputs * Copy ffprobe in output * Fix bad if statement * Return full output of ffprobe process * Return full output of ffprobe process * Make ffprobe button show dialog with output and option to copy * Add driver names to consts * Add driver env var name * Setup general tracking for GPU stats * Catch RPi args as well * Add util to get radeontop results * Add real amd GPU stats * Fix missed arg * pass config * Use only the values * Fix vram * Add nvidia gpu stats * Use nvidia stats * Add chart for gpu stats * Format AMD with space between percent * Get correct nvidia % * Start to add support for intel GPU stats * Block out RPi as util is not currently available * Formatting * Fix mypy * Strip for float conversion * Strip for float conversion * Fix percent formatting * Remove name from gpu map * Add tests and fix AMD formatting * Add nvidia gpu stats test * Formatting * Add intel_gpu_top for testing * Formatting * Handle case where hwaccel is not setup * Formatting * Check to remove none * Don't use set * Cleanup and fix types * Handle case where args is list * Fix mypy * Cast to str * Fix type checking * Return none instead of empty * Fix organization * Make keys consistent * Make gpu match style * Get support for vainfo * Add vainfo endpoint * Set vainfo output in error correctly * Remove duplicate function * Fix errors * Do cpu & gpu work asynchonously * Fix async * Fix event loop * Fix crash * Fix naming * Send empty data for gpu if error occurs * Show error if gpu stats could not be retrieved * Fix mypy * Fix test * Don't use json for vainfo * Fix cross references * Strip unicode still * await vainfo response * Add gpu deps * Formatting * remove comments * Use empty string * Add vainfo back in
2022-11-29 02:24:20 +01:00
def get_amd_gpu_stats() -> dict[str, str]:
"""Get stats using radeontop."""
radeontop_command = ["radeontop", "-d", "-", "-l", "1"]
p = sp.run(
radeontop_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(p.stderr)
return None
else:
usages = p.stdout.split(",")
results: dict[str, str] = {}
for hw in usages:
if "gpu" in hw:
results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')} %"
elif "vram" in hw:
results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')} %"
return results
def get_intel_gpu_stats() -> dict[str, str]:
"""Get stats using intel_gpu_top."""
intel_gpu_top_command = [
"timeout",
"0.1s",
"intel_gpu_top",
"-J",
"-o",
"-",
"-s",
"1",
]
p = sp.run(
intel_gpu_top_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(p.stderr)
return None
else:
readings = json.loads(f"[{p.stdout}]")
results: dict[str, str] = {}
for reading in readings:
if reading.get("engines", {}).get("Video/0", {}).get(
"busy", 0
) or reading.get("engines", {}).get("Video/1", {}).get("busy", 0):
gpu_usage = round(
float(reading.get("engines", {}).get("Video/0", {}).get("busy", 0))
+ float(
reading.get("engines", {}).get("Video/1", {}).get("busy", 0)
),
2,
)
results["gpu"] = f"{gpu_usage} %"
break
results["mem"] = "- %"
return results
def get_nvidia_gpu_stats() -> dict[str, str]:
"""Get stats using nvidia-smi."""
nvidia_smi_command = [
"nvidia-smi",
"--query-gpu=gpu_name,utilization.gpu,memory.used,memory.total",
"--format=csv",
]
p = sp.run(
nvidia_smi_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(p.stderr)
return None
else:
usages = p.stdout.split("\n")[1].strip().split(",")
memory_percent = f"{round(float(usages[2].replace(' MiB', '').strip()) / float(usages[3].replace(' MiB', '').strip()) * 100, 1)} %"
results: dict[str, str] = {
"name": usages[0],
"gpu": usages[1].strip(),
"mem": memory_percent,
}
return results
def ffprobe_stream(path: str) -> sp.CompletedProcess:
"""Run ffprobe on stream."""
ffprobe_cmd = [
"ffprobe",
"-print_format",
"json",
"-show_entries",
"stream=codec_long_name,width,height,bit_rate,duration,display_aspect_ratio,avg_frame_rate",
"-loglevel",
"quiet",
path,
]
return sp.run(ffprobe_cmd, capture_output=True)
Add GPU stats to the /stats API and debug screen (#3931) * Add ffprobe endpoint * Get ffprobe for multiple inputs * Copy ffprobe in output * Fix bad if statement * Return full output of ffprobe process * Return full output of ffprobe process * Make ffprobe button show dialog with output and option to copy * Add driver names to consts * Add driver env var name * Setup general tracking for GPU stats * Catch RPi args as well * Add util to get radeontop results * Add real amd GPU stats * Fix missed arg * pass config * Use only the values * Fix vram * Add nvidia gpu stats * Use nvidia stats * Add chart for gpu stats * Format AMD with space between percent * Get correct nvidia % * Start to add support for intel GPU stats * Block out RPi as util is not currently available * Formatting * Fix mypy * Strip for float conversion * Strip for float conversion * Fix percent formatting * Remove name from gpu map * Add tests and fix AMD formatting * Add nvidia gpu stats test * Formatting * Add intel_gpu_top for testing * Formatting * Handle case where hwaccel is not setup * Formatting * Check to remove none * Don't use set * Cleanup and fix types * Handle case where args is list * Fix mypy * Cast to str * Fix type checking * Return none instead of empty * Fix organization * Make keys consistent * Make gpu match style * Get support for vainfo * Add vainfo endpoint * Set vainfo output in error correctly * Remove duplicate function * Fix errors * Do cpu & gpu work asynchonously * Fix async * Fix event loop * Fix crash * Fix naming * Send empty data for gpu if error occurs * Show error if gpu stats could not be retrieved * Fix mypy * Fix test * Don't use json for vainfo * Fix cross references * Strip unicode still * await vainfo response * Add gpu deps * Formatting * remove comments * Use empty string * Add vainfo back in
2022-11-29 02:24:20 +01:00
def vainfo_hwaccel() -> sp.CompletedProcess:
"""Run vainfo."""
ffprobe_cmd = ["vainfo"]
return sp.run(ffprobe_cmd, capture_output=True)
def get_ffmpeg_arg_list(arg: Any) -> list:
"""Use arg if list or convert to list format."""
return arg if isinstance(arg, list) else arg.split(" ")
class FrameManager(ABC):
@abstractmethod
def create(self, name, size) -> AnyStr:
pass
@abstractmethod
def get(self, name, timeout_ms=0):
pass
@abstractmethod
def close(self, name):
pass
@abstractmethod
def delete(self, name):
pass
2021-02-17 14:23:32 +01:00
class DictFrameManager(FrameManager):
def __init__(self):
self.frames = {}
2021-02-17 14:23:32 +01:00
def create(self, name, size) -> AnyStr:
mem = bytearray(size)
self.frames[name] = mem
return mem
2021-02-17 14:23:32 +01:00
def get(self, name, shape):
mem = self.frames[name]
return np.ndarray(shape, dtype=np.uint8, buffer=mem)
2021-02-17 14:23:32 +01:00
def close(self, name):
pass
2021-02-17 14:23:32 +01:00
def delete(self, name):
del self.frames[name]
2021-02-17 14:23:32 +01:00
class SharedMemoryFrameManager(FrameManager):
def __init__(self):
self.shm_store = {}
2021-02-17 14:23:32 +01:00
def create(self, name, size) -> AnyStr:
shm = shared_memory.SharedMemory(name=name, create=True, size=size)
self.shm_store[name] = shm
return shm.buf
def get(self, name, shape):
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
def close(self, name):
if name in self.shm_store:
self.shm_store[name].close()
del self.shm_store[name]
def delete(self, name):
if name in self.shm_store:
self.shm_store[name].close()
self.shm_store[name].unlink()
2020-11-04 13:31:25 +01:00
del self.shm_store[name]