mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-08-04 13:47:37 +02:00
* Fix the `Any` typing hint treewide There has been confusion between the Any type[1] and the any function[2] in typing hints. [1] https://docs.python.org/3/library/typing.html#typing.Any [2] https://docs.python.org/3/library/functions.html#any * Fix typing for various frame_shape members Frame shapes are most likely defined by height and width, so a single int cannot express that. * Wrap gpu stats functions in Optional[] These can return `None`, so they need to be `Type | None`, which is what `Optional` expresses very nicely. * Fix return type in get_latest_segment_datetime Returns a datetime object, not an integer. * Make the return type of FrameManager.write optional This is necessary since the SharedMemoryFrameManager.write function can return None. * Fix total_seconds() return type in get_tz_modifiers The function returns a float, not an int. https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds * Account for floating point results in to_relative_box Because the function uses division the return types may either be int or float. * Resolve ruff deprecation warning The config has been split into formatter and linter, and the global options are deprecated.
1010 lines
30 KiB
Python
1010 lines
30 KiB
Python
"""Utilities for creating and manipulating image frames."""
|
|
|
|
import datetime
|
|
import logging
|
|
import subprocess as sp
|
|
import threading
|
|
from abc import ABC, abstractmethod
|
|
from multiprocessing import resource_tracker as _mprt
|
|
from multiprocessing import shared_memory as _mpshm
|
|
from string import printable
|
|
from typing import Any, AnyStr, Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from unidecode import unidecode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def transliterate_to_latin(text: str) -> str:
|
|
"""
|
|
Transliterate a given text to Latin.
|
|
|
|
This function uses the unidecode library to transliterate the input text to Latin.
|
|
It is useful for converting texts with diacritics or non-Latin characters to a
|
|
Latin equivalent.
|
|
|
|
Args:
|
|
text (str): The text to be transliterated.
|
|
|
|
Returns:
|
|
str: The transliterated text.
|
|
|
|
Example:
|
|
>>> transliterate_to_latin('frégate')
|
|
'fregate'
|
|
"""
|
|
return unidecode(text)
|
|
|
|
|
|
def on_edge(box, frame_shape):
|
|
if (
|
|
box[0] == 0
|
|
or box[1] == 0
|
|
or box[2] == frame_shape[1] - 1
|
|
or box[3] == frame_shape[0] - 1
|
|
):
|
|
return True
|
|
|
|
|
|
def has_better_attr(current_thumb, new_obj, attr_label) -> bool:
|
|
max_new_attr = max(
|
|
[0]
|
|
+ [area(a["box"]) for a in new_obj["attributes"] if a["label"] == attr_label]
|
|
)
|
|
max_current_attr = max(
|
|
[0]
|
|
+ [
|
|
area(a["box"])
|
|
for a in current_thumb["attributes"]
|
|
if a["label"] == attr_label
|
|
]
|
|
)
|
|
|
|
# if the thumb has a higher scoring attr
|
|
return max_new_attr > max_current_attr
|
|
|
|
|
|
def is_better_thumbnail(label, current_thumb, new_obj, frame_shape) -> bool:
|
|
# larger is better
|
|
# cutoff images are less ideal, but they should also be smaller?
|
|
# better scores are obviously better too
|
|
|
|
# check face on person
|
|
if label == "person":
|
|
if has_better_attr(current_thumb, new_obj, "face"):
|
|
return True
|
|
# if the current thumb has a face attr, dont update unless it gets better
|
|
if any([a["label"] == "face" for a in current_thumb["attributes"]]):
|
|
return False
|
|
|
|
# check license_plate on car
|
|
if label in ["car", "motorcycle"]:
|
|
if has_better_attr(current_thumb, new_obj, "license_plate"):
|
|
return True
|
|
# if the current thumb has a license_plate attr, dont update unless it gets better
|
|
if any([a["label"] == "license_plate" for a in current_thumb["attributes"]]):
|
|
return False
|
|
|
|
# if the new_thumb is on an edge, and the current thumb is not
|
|
if on_edge(new_obj["box"], frame_shape) and not on_edge(
|
|
current_thumb["box"], frame_shape
|
|
):
|
|
return False
|
|
|
|
# if the score is better by more than 5%
|
|
if new_obj["score"] > current_thumb["score"] + 0.05:
|
|
return True
|
|
|
|
# if the area is 10% larger
|
|
if new_obj["area"] > current_thumb["area"] * 1.1:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def draw_timestamp(
|
|
frame,
|
|
timestamp,
|
|
timestamp_format,
|
|
font_effect=None,
|
|
font_thickness=2,
|
|
font_color=(255, 255, 255),
|
|
position="tl",
|
|
):
|
|
time_to_show = datetime.datetime.fromtimestamp(timestamp).strftime(timestamp_format)
|
|
|
|
# calculate a dynamic font size
|
|
size = cv2.getTextSize(
|
|
time_to_show,
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
fontScale=1.0,
|
|
thickness=font_thickness,
|
|
)
|
|
|
|
text_width = size[0][0]
|
|
desired_size = max(150, 0.33 * frame.shape[1])
|
|
font_scale = desired_size / text_width
|
|
|
|
# calculate the actual size with the dynamic scale
|
|
size = cv2.getTextSize(
|
|
time_to_show,
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
fontScale=font_scale,
|
|
thickness=font_thickness,
|
|
)
|
|
|
|
image_width = frame.shape[1]
|
|
image_height = frame.shape[0]
|
|
text_width = size[0][0]
|
|
text_height = size[0][1]
|
|
line_height = text_height + size[1]
|
|
|
|
if position == "tl":
|
|
text_offset_x = 0
|
|
text_offset_y = 0 if 0 < line_height else 0 - (line_height + 8)
|
|
elif position == "tr":
|
|
text_offset_x = image_width - text_width
|
|
text_offset_y = 0 if 0 < line_height else 0 - (line_height + 8)
|
|
elif position == "bl":
|
|
text_offset_x = 0
|
|
text_offset_y = image_height - (line_height + 8)
|
|
elif position == "br":
|
|
text_offset_x = image_width - text_width
|
|
text_offset_y = image_height - (line_height + 8)
|
|
|
|
if font_effect == "solid":
|
|
# make the coords of the box with a small padding of two pixels
|
|
timestamp_box_coords = np.array(
|
|
[
|
|
[text_offset_x, text_offset_y],
|
|
[text_offset_x + text_width, text_offset_y],
|
|
[text_offset_x + text_width, text_offset_y + line_height + 8],
|
|
[text_offset_x, text_offset_y + line_height + 8],
|
|
]
|
|
)
|
|
|
|
cv2.fillPoly(
|
|
frame,
|
|
[timestamp_box_coords],
|
|
# inverse color of text for background for max. contrast
|
|
(255 - font_color[0], 255 - font_color[1], 255 - font_color[2]),
|
|
)
|
|
elif font_effect == "shadow":
|
|
cv2.putText(
|
|
frame,
|
|
time_to_show,
|
|
(text_offset_x + 3, text_offset_y + line_height),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
fontScale=font_scale,
|
|
color=(255 - font_color[0], 255 - font_color[1], 255 - font_color[2]),
|
|
thickness=font_thickness,
|
|
)
|
|
|
|
cv2.putText(
|
|
frame,
|
|
time_to_show,
|
|
(text_offset_x, text_offset_y + line_height - 3),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
fontScale=font_scale,
|
|
color=font_color,
|
|
thickness=font_thickness,
|
|
)
|
|
|
|
|
|
def draw_box_with_label(
|
|
frame,
|
|
x_min,
|
|
y_min,
|
|
x_max,
|
|
y_max,
|
|
label,
|
|
info,
|
|
thickness=2,
|
|
color=None,
|
|
position="ul",
|
|
):
|
|
if color is None:
|
|
color = (0, 0, 255)
|
|
try:
|
|
display_text = transliterate_to_latin("{}: {}".format(label, info))
|
|
except Exception:
|
|
display_text = "{}: {}".format(label, info)
|
|
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, thickness)
|
|
font_scale = 0.5
|
|
font = cv2.FONT_HERSHEY_SIMPLEX
|
|
# get the width and height of the text box
|
|
size = cv2.getTextSize(display_text, font, fontScale=font_scale, thickness=2)
|
|
text_width = size[0][0]
|
|
text_height = size[0][1]
|
|
line_height = text_height + size[1]
|
|
# get frame height
|
|
frame_height = frame.shape[0]
|
|
# set the text start position
|
|
if position == "ul":
|
|
text_offset_x = x_min
|
|
text_offset_y = max(0, y_min - (line_height + 8))
|
|
elif position == "ur":
|
|
text_offset_x = max(0, x_max - (text_width + 8))
|
|
text_offset_y = max(0, y_min - (line_height + 8))
|
|
elif position == "bl":
|
|
text_offset_x = x_min
|
|
text_offset_y = min(frame_height - line_height, y_max)
|
|
elif position == "br":
|
|
text_offset_x = max(0, x_max - (text_width + 8))
|
|
text_offset_y = min(frame_height - line_height, y_max)
|
|
# Adjust position if it overlaps with the box or goes out of frame
|
|
if position in {"ul", "ur"}:
|
|
if text_offset_y < y_min + thickness: # Label overlaps with the box
|
|
if y_min - (line_height + 8) < 0 and y_max + line_height <= frame_height:
|
|
# Not enough space above, and there is space below
|
|
text_offset_y = y_max
|
|
elif y_min - (line_height + 8) >= 0:
|
|
# Enough space above, keep the label at the top
|
|
text_offset_y = max(0, y_min - (line_height + 8))
|
|
elif position in {"bl", "br"}:
|
|
if text_offset_y + line_height > frame_height:
|
|
# If there's not enough space below, try above the box
|
|
text_offset_y = max(0, y_min - (line_height + 8))
|
|
|
|
# make the coords of the box with a small padding of two pixels
|
|
textbox_coords = (
|
|
(text_offset_x, text_offset_y),
|
|
(text_offset_x + text_width + 2, text_offset_y + line_height),
|
|
)
|
|
cv2.rectangle(frame, textbox_coords[0], textbox_coords[1], color, cv2.FILLED)
|
|
cv2.putText(
|
|
frame,
|
|
display_text,
|
|
(text_offset_x, text_offset_y + line_height - 3),
|
|
font,
|
|
fontScale=font_scale,
|
|
color=(0, 0, 0),
|
|
thickness=2,
|
|
)
|
|
|
|
|
|
def grab_cv2_contours(cnts):
|
|
# if the length the contours tuple returned by cv2.findContours
|
|
# is '2' then we are using either OpenCV v2.4, v4-beta, or
|
|
# v4-official
|
|
if len(cnts) == 2:
|
|
return cnts[0]
|
|
|
|
# if the length of the contours tuple is '3' then we are using
|
|
# either OpenCV v3, v4-pre, or v4-alpha
|
|
elif len(cnts) == 3:
|
|
return cnts[1]
|
|
|
|
|
|
def is_label_printable(label) -> bool:
|
|
"""Check if label is printable."""
|
|
return not bool(set(label) - set(printable))
|
|
|
|
|
|
def calculate_region(frame_shape, xmin, ymin, xmax, ymax, model_size, multiplier=2):
|
|
# size is the longest edge and divisible by 4
|
|
size = int((max(xmax - xmin, ymax - ymin) * multiplier) // 4 * 4)
|
|
# dont go any smaller than the model_size
|
|
if size < model_size:
|
|
size = model_size
|
|
|
|
# x_offset is midpoint of bounding box minus half the size
|
|
x_offset = int((xmax - xmin) / 2.0 + xmin - size / 2.0)
|
|
# if outside the image
|
|
if x_offset < 0:
|
|
x_offset = 0
|
|
elif x_offset > (frame_shape[1] - size):
|
|
x_offset = max(0, (frame_shape[1] - size))
|
|
|
|
# y_offset is midpoint of bounding box minus half the size
|
|
y_offset = int((ymax - ymin) / 2.0 + ymin - size / 2.0)
|
|
# # if outside the image
|
|
if y_offset < 0:
|
|
y_offset = 0
|
|
elif y_offset > (frame_shape[0] - size):
|
|
y_offset = max(0, (frame_shape[0] - size))
|
|
|
|
return (x_offset, y_offset, x_offset + size, y_offset + size)
|
|
|
|
|
|
def calculate_16_9_crop(frame_shape, xmin, ymin, xmax, ymax, multiplier=1.25):
|
|
min_size = 200
|
|
|
|
# size is the longest edge and divisible by 4
|
|
x_size = int((xmax - xmin) * multiplier)
|
|
|
|
if x_size < min_size:
|
|
x_size = min_size
|
|
|
|
y_size = int((ymax - ymin) * multiplier)
|
|
|
|
if y_size < min_size:
|
|
y_size = min_size
|
|
|
|
if frame_shape[1] / frame_shape[0] > 16 / 9 and x_size / y_size > 4:
|
|
return None
|
|
|
|
# calculate 16x9 using height
|
|
aspect_y_size = int(9 / 16 * x_size)
|
|
|
|
# if 16:9 by height is too small
|
|
if aspect_y_size < y_size or aspect_y_size > frame_shape[0]:
|
|
x_size = int((16 / 9) * y_size) // 4 * 4
|
|
|
|
if x_size / y_size > 1.8:
|
|
return None
|
|
else:
|
|
y_size = aspect_y_size // 4 * 4
|
|
|
|
# x_offset is midpoint of bounding box minus half the size
|
|
x_offset = int((xmax - xmin) / 2.0 + xmin - x_size / 2.0)
|
|
# if outside the image
|
|
if x_offset < 0:
|
|
x_offset = 0
|
|
elif x_offset > (frame_shape[1] - x_size):
|
|
x_offset = max(0, (frame_shape[1] - x_size))
|
|
|
|
# y_offset is midpoint of bounding box minus half the size
|
|
y_offset = int((ymax - ymin) / 2.0 + ymin - y_size / 2.0)
|
|
# # if outside the image
|
|
if y_offset < 0:
|
|
y_offset = 0
|
|
elif y_offset > (frame_shape[0] - y_size):
|
|
y_offset = max(0, (frame_shape[0] - y_size))
|
|
|
|
return (x_offset, y_offset, x_offset + x_size, y_offset + y_size)
|
|
|
|
|
|
def get_yuv_crop(frame_shape, crop):
|
|
# crop should be (x1,y1,x2,y2)
|
|
frame_height = frame_shape[0] // 3 * 2
|
|
frame_width = frame_shape[1]
|
|
|
|
# compute the width/height of the uv channels
|
|
uv_width = frame_width // 2 # width of the uv channels
|
|
uv_height = frame_height // 4 # height of the uv channels
|
|
|
|
# compute the offset for upper left corner of the uv channels
|
|
uv_x_offset = crop[0] // 2 # x offset of the uv channels
|
|
uv_y_offset = crop[1] // 4 # y offset of the uv channels
|
|
|
|
# compute the width/height of the uv crops
|
|
uv_crop_width = (crop[2] - crop[0]) // 2 # width of the cropped uv channels
|
|
uv_crop_height = (crop[3] - crop[1]) // 4 # height of the cropped uv channels
|
|
|
|
# ensure crop dimensions are multiples of 2 and 4
|
|
y = (crop[0], crop[1], crop[0] + uv_crop_width * 2, crop[1] + uv_crop_height * 4)
|
|
|
|
u1 = (
|
|
0 + uv_x_offset,
|
|
frame_height + uv_y_offset,
|
|
0 + uv_x_offset + uv_crop_width,
|
|
frame_height + uv_y_offset + uv_crop_height,
|
|
)
|
|
|
|
u2 = (
|
|
uv_width + uv_x_offset,
|
|
frame_height + uv_y_offset,
|
|
uv_width + uv_x_offset + uv_crop_width,
|
|
frame_height + uv_y_offset + uv_crop_height,
|
|
)
|
|
|
|
v1 = (
|
|
0 + uv_x_offset,
|
|
frame_height + uv_height + uv_y_offset,
|
|
0 + uv_x_offset + uv_crop_width,
|
|
frame_height + uv_height + uv_y_offset + uv_crop_height,
|
|
)
|
|
|
|
v2 = (
|
|
uv_width + uv_x_offset,
|
|
frame_height + uv_height + uv_y_offset,
|
|
uv_width + uv_x_offset + uv_crop_width,
|
|
frame_height + uv_height + uv_y_offset + uv_crop_height,
|
|
)
|
|
|
|
return y, u1, u2, v1, v2
|
|
|
|
|
|
def yuv_crop_and_resize(frame, region, height=None):
|
|
# Crops and resizes a YUV frame while maintaining aspect ratio
|
|
# https://stackoverflow.com/a/57022634
|
|
height = frame.shape[0] // 3 * 2
|
|
width = frame.shape[1]
|
|
|
|
# get the crop box if the region extends beyond the frame
|
|
crop_x1 = max(0, region[0])
|
|
crop_y1 = max(0, region[1])
|
|
# ensure these are a multiple of 4
|
|
crop_x2 = min(width, region[2])
|
|
crop_y2 = min(height, region[3])
|
|
crop_box = (crop_x1, crop_y1, crop_x2, crop_y2)
|
|
|
|
y, u1, u2, v1, v2 = get_yuv_crop(frame.shape, crop_box)
|
|
|
|
# if the region starts outside the frame, indent the start point in the cropped frame
|
|
y_channel_x_offset = abs(min(0, region[0]))
|
|
y_channel_y_offset = abs(min(0, region[1]))
|
|
|
|
uv_channel_x_offset = y_channel_x_offset // 2
|
|
uv_channel_y_offset = y_channel_y_offset // 4
|
|
|
|
# create the yuv region frame
|
|
# make sure the size is a multiple of 4
|
|
# TODO: this should be based on the size after resize now
|
|
size = (region[3] - region[1]) // 4 * 4
|
|
yuv_cropped_frame = np.zeros((size + size // 2, size), np.uint8)
|
|
# fill in black
|
|
yuv_cropped_frame[:] = 128
|
|
yuv_cropped_frame[0:size, 0:size] = 16
|
|
|
|
# copy the y channel
|
|
yuv_cropped_frame[
|
|
y_channel_y_offset : y_channel_y_offset + y[3] - y[1],
|
|
y_channel_x_offset : y_channel_x_offset + y[2] - y[0],
|
|
] = frame[y[1] : y[3], y[0] : y[2]]
|
|
|
|
uv_crop_width = u1[2] - u1[0]
|
|
uv_crop_height = u1[3] - u1[1]
|
|
|
|
# copy u1
|
|
yuv_cropped_frame[
|
|
size + uv_channel_y_offset : size + uv_channel_y_offset + uv_crop_height,
|
|
0 + uv_channel_x_offset : 0 + uv_channel_x_offset + uv_crop_width,
|
|
] = frame[u1[1] : u1[3], u1[0] : u1[2]]
|
|
|
|
# copy u2
|
|
yuv_cropped_frame[
|
|
size + uv_channel_y_offset : size + uv_channel_y_offset + uv_crop_height,
|
|
size // 2 + uv_channel_x_offset : size // 2
|
|
+ uv_channel_x_offset
|
|
+ uv_crop_width,
|
|
] = frame[u2[1] : u2[3], u2[0] : u2[2]]
|
|
|
|
# copy v1
|
|
yuv_cropped_frame[
|
|
size + size // 4 + uv_channel_y_offset : size
|
|
+ size // 4
|
|
+ uv_channel_y_offset
|
|
+ uv_crop_height,
|
|
0 + uv_channel_x_offset : 0 + uv_channel_x_offset + uv_crop_width,
|
|
] = frame[v1[1] : v1[3], v1[0] : v1[2]]
|
|
|
|
# copy v2
|
|
yuv_cropped_frame[
|
|
size + size // 4 + uv_channel_y_offset : size
|
|
+ size // 4
|
|
+ uv_channel_y_offset
|
|
+ uv_crop_height,
|
|
size // 2 + uv_channel_x_offset : size // 2
|
|
+ uv_channel_x_offset
|
|
+ uv_crop_width,
|
|
] = frame[v2[1] : v2[3], v2[0] : v2[2]]
|
|
|
|
return yuv_cropped_frame
|
|
|
|
|
|
def yuv_to_3_channel_yuv(yuv_frame):
|
|
height = yuv_frame.shape[0] // 3 * 2
|
|
width = yuv_frame.shape[1]
|
|
|
|
# flatten the image into array
|
|
yuv_data = yuv_frame.ravel()
|
|
|
|
# create a numpy array to hold all the 3 channel yuv data
|
|
all_yuv_data = np.empty((height, width, 3), dtype=np.uint8)
|
|
|
|
y_count = height * width
|
|
uv_count = y_count // 4
|
|
|
|
# copy the y_channel
|
|
all_yuv_data[:, :, 0] = yuv_data[0:y_count].reshape((height, width))
|
|
# copy the u channel doubling each dimension
|
|
all_yuv_data[:, :, 1] = np.repeat(
|
|
np.reshape(
|
|
np.repeat(yuv_data[y_count : y_count + uv_count], repeats=2, axis=0),
|
|
(height // 2, width),
|
|
),
|
|
repeats=2,
|
|
axis=0,
|
|
)
|
|
# copy the v channel doubling each dimension
|
|
all_yuv_data[:, :, 2] = np.repeat(
|
|
np.reshape(
|
|
np.repeat(
|
|
yuv_data[y_count + uv_count : y_count + uv_count + uv_count],
|
|
repeats=2,
|
|
axis=0,
|
|
),
|
|
(height // 2, width),
|
|
),
|
|
repeats=2,
|
|
axis=0,
|
|
)
|
|
|
|
return all_yuv_data
|
|
|
|
|
|
def copy_yuv_to_position(
|
|
destination_frame,
|
|
destination_offset,
|
|
destination_shape,
|
|
source_frame=None,
|
|
source_channel_dim=None,
|
|
interpolation=cv2.INTER_LINEAR,
|
|
):
|
|
# get the coordinates of the channels for this position in the layout
|
|
y, u1, u2, v1, v2 = get_yuv_crop(
|
|
destination_frame.shape,
|
|
(
|
|
destination_offset[1],
|
|
destination_offset[0],
|
|
destination_offset[1] + destination_shape[1],
|
|
destination_offset[0] + destination_shape[0],
|
|
),
|
|
)
|
|
|
|
# clear y
|
|
destination_frame[
|
|
y[1] : y[3],
|
|
y[0] : y[2],
|
|
] = 16
|
|
|
|
# clear u1
|
|
destination_frame[u1[1] : u1[3], u1[0] : u1[2]] = 128
|
|
# clear u2
|
|
destination_frame[u2[1] : u2[3], u2[0] : u2[2]] = 128
|
|
# clear v1
|
|
destination_frame[v1[1] : v1[3], v1[0] : v1[2]] = 128
|
|
# clear v2
|
|
destination_frame[v2[1] : v2[3], v2[0] : v2[2]] = 128
|
|
|
|
if source_frame is not None:
|
|
# calculate the resized frame, maintaining the aspect ratio
|
|
source_aspect_ratio = source_frame.shape[1] / (source_frame.shape[0] // 3 * 2)
|
|
dest_aspect_ratio = destination_shape[1] / destination_shape[0]
|
|
|
|
if source_aspect_ratio <= dest_aspect_ratio:
|
|
y_resize_height = int(destination_shape[0] // 4 * 4)
|
|
y_resize_width = int((y_resize_height * source_aspect_ratio) // 4 * 4)
|
|
else:
|
|
y_resize_width = int(destination_shape[1] // 4 * 4)
|
|
y_resize_height = int((y_resize_width / source_aspect_ratio) // 4 * 4)
|
|
|
|
uv_resize_width = int(y_resize_width // 2)
|
|
uv_resize_height = int(y_resize_height // 4)
|
|
|
|
y_y_offset = int((destination_shape[0] - y_resize_height) / 4 // 4 * 4)
|
|
y_x_offset = int((destination_shape[1] - y_resize_width) / 2 // 4 * 4)
|
|
|
|
uv_y_offset = y_y_offset // 4
|
|
uv_x_offset = y_x_offset // 2
|
|
|
|
# resize/copy y channel
|
|
destination_frame[
|
|
y[1] + y_y_offset : y[1] + y_y_offset + y_resize_height,
|
|
y[0] + y_x_offset : y[0] + y_x_offset + y_resize_width,
|
|
] = cv2.resize(
|
|
source_frame[
|
|
source_channel_dim["y"][1] : source_channel_dim["y"][3],
|
|
source_channel_dim["y"][0] : source_channel_dim["y"][2],
|
|
],
|
|
dsize=(y_resize_width, y_resize_height),
|
|
interpolation=interpolation,
|
|
)
|
|
|
|
# resize/copy u1
|
|
destination_frame[
|
|
u1[1] + uv_y_offset : u1[1] + uv_y_offset + uv_resize_height,
|
|
u1[0] + uv_x_offset : u1[0] + uv_x_offset + uv_resize_width,
|
|
] = cv2.resize(
|
|
source_frame[
|
|
source_channel_dim["u1"][1] : source_channel_dim["u1"][3],
|
|
source_channel_dim["u1"][0] : source_channel_dim["u1"][2],
|
|
],
|
|
dsize=(uv_resize_width, uv_resize_height),
|
|
interpolation=interpolation,
|
|
)
|
|
# resize/copy u2
|
|
destination_frame[
|
|
u2[1] + uv_y_offset : u2[1] + uv_y_offset + uv_resize_height,
|
|
u2[0] + uv_x_offset : u2[0] + uv_x_offset + uv_resize_width,
|
|
] = cv2.resize(
|
|
source_frame[
|
|
source_channel_dim["u2"][1] : source_channel_dim["u2"][3],
|
|
source_channel_dim["u2"][0] : source_channel_dim["u2"][2],
|
|
],
|
|
dsize=(uv_resize_width, uv_resize_height),
|
|
interpolation=interpolation,
|
|
)
|
|
# resize/copy v1
|
|
destination_frame[
|
|
v1[1] + uv_y_offset : v1[1] + uv_y_offset + uv_resize_height,
|
|
v1[0] + uv_x_offset : v1[0] + uv_x_offset + uv_resize_width,
|
|
] = cv2.resize(
|
|
source_frame[
|
|
source_channel_dim["v1"][1] : source_channel_dim["v1"][3],
|
|
source_channel_dim["v1"][0] : source_channel_dim["v1"][2],
|
|
],
|
|
dsize=(uv_resize_width, uv_resize_height),
|
|
interpolation=interpolation,
|
|
)
|
|
# resize/copy v2
|
|
destination_frame[
|
|
v2[1] + uv_y_offset : v2[1] + uv_y_offset + uv_resize_height,
|
|
v2[0] + uv_x_offset : v2[0] + uv_x_offset + uv_resize_width,
|
|
] = cv2.resize(
|
|
source_frame[
|
|
source_channel_dim["v2"][1] : source_channel_dim["v2"][3],
|
|
source_channel_dim["v2"][0] : source_channel_dim["v2"][2],
|
|
],
|
|
dsize=(uv_resize_width, uv_resize_height),
|
|
interpolation=interpolation,
|
|
)
|
|
|
|
|
|
def get_blank_yuv_frame(width: int, height: int) -> np.ndarray:
|
|
"""Creates a black YUV 4:2:0 frame."""
|
|
yuv_height = height * 3 // 2
|
|
yuv_frame = np.zeros((yuv_height, width), dtype=np.uint8)
|
|
|
|
uv_height = height // 2
|
|
|
|
# The U and V planes are stored after the Y plane.
|
|
u_start = height # U plane starts right after Y plane
|
|
v_start = u_start + uv_height // 2 # V plane starts after U plane
|
|
yuv_frame[u_start : u_start + uv_height, :width] = 128
|
|
yuv_frame[v_start : v_start + uv_height, :width] = 128
|
|
|
|
return yuv_frame
|
|
|
|
|
|
def yuv_region_2_yuv(frame, region):
|
|
try:
|
|
# TODO: does this copy the numpy array?
|
|
yuv_cropped_frame = yuv_crop_and_resize(frame, region)
|
|
return yuv_to_3_channel_yuv(yuv_cropped_frame)
|
|
except:
|
|
print(f"frame.shape: {frame.shape}")
|
|
print(f"region: {region}")
|
|
raise
|
|
|
|
|
|
def yuv_region_2_rgb(frame, region):
|
|
try:
|
|
# TODO: does this copy the numpy array?
|
|
yuv_cropped_frame = yuv_crop_and_resize(frame, region)
|
|
return cv2.cvtColor(yuv_cropped_frame, cv2.COLOR_YUV2RGB_I420)
|
|
except:
|
|
print(f"frame.shape: {frame.shape}")
|
|
print(f"region: {region}")
|
|
raise
|
|
|
|
|
|
def yuv_region_2_bgr(frame, region):
|
|
try:
|
|
yuv_cropped_frame = yuv_crop_and_resize(frame, region)
|
|
return cv2.cvtColor(yuv_cropped_frame, cv2.COLOR_YUV2BGR_I420)
|
|
except:
|
|
print(f"frame.shape: {frame.shape}")
|
|
print(f"region: {region}")
|
|
raise
|
|
|
|
|
|
def intersection(box_a, box_b) -> Optional[list[int]]:
|
|
"""Return intersection box or None if boxes do not intersect."""
|
|
if (
|
|
box_a[2] < box_b[0]
|
|
or box_a[0] > box_b[2]
|
|
or box_a[1] > box_b[3]
|
|
or box_a[3] < box_b[1]
|
|
):
|
|
return None
|
|
|
|
return (
|
|
max(box_a[0], box_b[0]),
|
|
max(box_a[1], box_b[1]),
|
|
min(box_a[2], box_b[2]),
|
|
min(box_a[3], box_b[3]),
|
|
)
|
|
|
|
|
|
def area(box):
|
|
return (box[2] - box[0] + 1) * (box[3] - box[1] + 1)
|
|
|
|
|
|
def intersection_over_union(box_a, box_b):
|
|
# determine the (x, y)-coordinates of the intersection rectangle
|
|
intersect = intersection(box_a, box_b)
|
|
|
|
if intersect is None:
|
|
return 0.0
|
|
|
|
# compute the area of intersection rectangle
|
|
inter_area = max(0, intersect[2] - intersect[0] + 1) * max(
|
|
0, intersect[3] - intersect[1] + 1
|
|
)
|
|
|
|
if inter_area == 0:
|
|
return 0.0
|
|
|
|
# compute the area of both the prediction and ground-truth
|
|
# rectangles
|
|
box_a_area = (box_a[2] - box_a[0] + 1) * (box_a[3] - box_a[1] + 1)
|
|
box_b_area = (box_b[2] - box_b[0] + 1) * (box_b[3] - box_b[1] + 1)
|
|
|
|
# compute the intersection over union by taking the intersection
|
|
# area and dividing it by the sum of prediction + ground-truth
|
|
# areas - the intersection area
|
|
iou = inter_area / float(box_a_area + box_b_area - inter_area)
|
|
|
|
# return the intersection over union value
|
|
return iou
|
|
|
|
|
|
def clipped(obj, frame_shape):
|
|
# if the object is within 5 pixels of the region border, and the region is not on the edge
|
|
# consider the object to be clipped
|
|
box = obj[2]
|
|
region = obj[5]
|
|
if (
|
|
(region[0] > 5 and box[0] - region[0] <= 5)
|
|
or (region[1] > 5 and box[1] - region[1] <= 5)
|
|
or (frame_shape[1] - region[2] > 5 and region[2] - box[2] <= 5)
|
|
or (frame_shape[0] - region[3] > 5 and region[3] - box[3] <= 5)
|
|
):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
class FrameManager(ABC):
|
|
@abstractmethod
|
|
def create(self, name: str, size: int) -> AnyStr:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def write(self, name: str) -> Optional[memoryview]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get(self, name: str, timeout_ms: int = 0):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def close(self, name: str):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def delete(self, name: str):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def cleanup(self):
|
|
pass
|
|
|
|
|
|
class UntrackedSharedMemory(_mpshm.SharedMemory):
|
|
# https://github.com/python/cpython/issues/82300#issuecomment-2169035092
|
|
|
|
__lock = threading.Lock()
|
|
|
|
def __init__(
|
|
self,
|
|
name: Optional[str] = None,
|
|
create: bool = False,
|
|
size: int = 0,
|
|
*,
|
|
track: bool = False,
|
|
) -> None:
|
|
self._track = track
|
|
|
|
# if tracking, normal init will suffice
|
|
if track:
|
|
return super().__init__(name=name, create=create, size=size)
|
|
|
|
# lock so that other threads don't attempt to use the
|
|
# register function during this time
|
|
with self.__lock:
|
|
# temporarily disable registration during initialization
|
|
orig_register = _mprt.register
|
|
_mprt.register = self.__tmp_register
|
|
|
|
# initialize; ensure original register function is
|
|
# re-instated
|
|
try:
|
|
super().__init__(name=name, create=create, size=size)
|
|
finally:
|
|
_mprt.register = orig_register
|
|
|
|
@staticmethod
|
|
def __tmp_register(*args, **kwargs) -> None:
|
|
return
|
|
|
|
def unlink(self) -> None:
|
|
if _mpshm._USE_POSIX and self._name:
|
|
_mpshm._posixshmem.shm_unlink(self._name)
|
|
if self._track:
|
|
_mprt.unregister(self._name, "shared_memory")
|
|
|
|
|
|
class SharedMemoryFrameManager(FrameManager):
|
|
def __init__(self):
|
|
self.shm_store: dict[str, UntrackedSharedMemory] = {}
|
|
|
|
def create(self, name: str, size) -> AnyStr:
|
|
try:
|
|
shm = UntrackedSharedMemory(
|
|
name=name,
|
|
create=True,
|
|
size=size,
|
|
)
|
|
except FileExistsError:
|
|
shm = UntrackedSharedMemory(name=name)
|
|
|
|
self.shm_store[name] = shm
|
|
return shm.buf
|
|
|
|
def write(self, name: str) -> Optional[memoryview]:
|
|
try:
|
|
if name in self.shm_store:
|
|
shm = self.shm_store[name]
|
|
else:
|
|
shm = UntrackedSharedMemory(name=name)
|
|
self.shm_store[name] = shm
|
|
return shm.buf
|
|
except FileNotFoundError:
|
|
logger.info(f"the file {name} not found")
|
|
return None
|
|
|
|
def get(self, name: str, shape) -> Optional[np.ndarray]:
|
|
try:
|
|
if name in self.shm_store:
|
|
shm = self.shm_store[name]
|
|
else:
|
|
shm = UntrackedSharedMemory(name=name)
|
|
self.shm_store[name] = shm
|
|
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
def close(self, name: str):
|
|
if name in self.shm_store:
|
|
self.shm_store[name].close()
|
|
del self.shm_store[name]
|
|
|
|
def delete(self, name: str):
|
|
if name in self.shm_store:
|
|
self.shm_store[name].close()
|
|
|
|
try:
|
|
self.shm_store[name].unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
del self.shm_store[name]
|
|
else:
|
|
try:
|
|
shm = UntrackedSharedMemory(name=name)
|
|
shm.close()
|
|
shm.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
def cleanup(self) -> None:
|
|
for shm in self.shm_store.values():
|
|
shm.close()
|
|
|
|
try:
|
|
shm.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def create_mask(frame_shape, mask):
|
|
mask_img = np.zeros(frame_shape, np.uint8)
|
|
mask_img[:] = 255
|
|
|
|
if isinstance(mask, list):
|
|
for m in mask:
|
|
add_mask(m, mask_img)
|
|
|
|
elif isinstance(mask, str):
|
|
add_mask(mask, mask_img)
|
|
|
|
return mask_img
|
|
|
|
|
|
def add_mask(mask: str, mask_img: np.ndarray):
|
|
points = mask.split(",")
|
|
|
|
# masks and zones are saved as relative coordinates
|
|
# we know if any points are > 1 then it is using the
|
|
# old native resolution coordinates
|
|
if any(x > "1.0" for x in points):
|
|
raise Exception("add mask expects relative coordinates only")
|
|
|
|
contour = np.array(
|
|
[
|
|
[
|
|
int(float(points[i]) * mask_img.shape[1]),
|
|
int(float(points[i + 1]) * mask_img.shape[0]),
|
|
]
|
|
for i in range(0, len(points), 2)
|
|
]
|
|
)
|
|
cv2.fillPoly(mask_img, pts=[contour], color=(0))
|
|
|
|
|
|
def get_image_from_recording(
|
|
ffmpeg, # Ffmpeg Config
|
|
file_path: str,
|
|
relative_frame_time: float,
|
|
codec: str,
|
|
height: Optional[int] = None,
|
|
) -> Optional[Any]:
|
|
"""retrieve a frame from given time in recording file."""
|
|
|
|
ffmpeg_cmd = [
|
|
ffmpeg.ffmpeg_path,
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"warning",
|
|
"-ss",
|
|
f"00:00:{relative_frame_time}",
|
|
"-i",
|
|
file_path,
|
|
"-frames:v",
|
|
"1",
|
|
"-c:v",
|
|
codec,
|
|
"-f",
|
|
"image2pipe",
|
|
"-",
|
|
]
|
|
|
|
if height is not None:
|
|
ffmpeg_cmd.insert(-3, "-vf")
|
|
ffmpeg_cmd.insert(-3, f"scale=-1:{height}")
|
|
|
|
process = sp.run(
|
|
ffmpeg_cmd,
|
|
capture_output=True,
|
|
)
|
|
|
|
if process.returncode == 0:
|
|
return process.stdout
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_histogram(image, x_min, y_min, x_max, y_max):
|
|
image_bgr = cv2.cvtColor(image, cv2.COLOR_YUV2BGR_I420)
|
|
image_bgr = image_bgr[y_min:y_max, x_min:x_max]
|
|
|
|
hist = cv2.calcHist(
|
|
[image_bgr], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]
|
|
)
|
|
return cv2.normalize(hist, hist).flatten()
|
|
|
|
|
|
def ensure_jpeg_bytes(image_data):
|
|
"""Ensure image data is jpeg bytes for genai"""
|
|
try:
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
|
|
if img is None:
|
|
return image_data
|
|
|
|
success, encoded_img = cv2.imencode(".jpg", img)
|
|
|
|
if success:
|
|
return encoded_img.tobytes()
|
|
except Exception as e:
|
|
logger.warning(f"Error when converting thumbnail to jpeg for genai: {e}")
|
|
|
|
return image_data
|