blakeblackshear.frigate/frigate/ptz/autotrack.py

1369 lines
56 KiB
Python

"""Automatically pan, tilt, and zoom on detected objects via onvif."""
import copy
import logging
import os
import queue
import threading
import time
from collections import deque
from functools import partial
from multiprocessing.synchronize import Event as MpEvent
import cv2
import numpy as np
from norfair.camera_motion import (
HomographyTransformationGetter,
MotionEstimator,
TranslationTransformationGetter,
)
from frigate.camera import PTZMetrics
from frigate.comms.dispatcher import Dispatcher
from frigate.config import CameraConfig, FrigateConfig, ZoomingModeEnum
from frigate.const import (
AUTOTRACKING_MAX_AREA_RATIO,
AUTOTRACKING_MAX_MOVE_METRICS,
AUTOTRACKING_MOTION_MAX_POINTS,
AUTOTRACKING_MOTION_MIN_DISTANCE,
AUTOTRACKING_ZOOM_EDGE_THRESHOLD,
AUTOTRACKING_ZOOM_IN_HYSTERESIS,
AUTOTRACKING_ZOOM_OUT_HYSTERESIS,
CONFIG_DIR,
)
from frigate.ptz.onvif import OnvifController
from frigate.util.builtin import update_yaml_file
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
logger = logging.getLogger(__name__)
def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
# Determine if the PTZ was in motion at the set frame time
# for non ptz/autotracking cameras, this will always return False
# ptz_start_time is initialized to 0 on startup and only changes
# when autotracking movements are made
return (ptz_start_time != 0.0 and frame_time > ptz_start_time) and (
ptz_stop_time == 0.0 or (ptz_start_time <= frame_time <= ptz_stop_time)
)
class PtzMotionEstimator:
def __init__(self, config: CameraConfig, ptz_metrics: PTZMetrics) -> None:
self.frame_manager = SharedMemoryFrameManager()
self.norfair_motion_estimator = None
self.camera_config = config
self.coord_transformations = None
self.ptz_metrics = ptz_metrics
self.ptz_metrics.reset.set()
logger.debug(f"{config.name}: Motion estimator init")
def motion_estimator(self, detections, frame_time, camera):
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
if self.ptz_metrics.reset.is_set():
self.ptz_metrics.reset.clear()
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
logger.debug(f"{camera}: Motion estimator reset - homography")
transformation_type = HomographyTransformationGetter()
else:
logger.debug(f"{camera}: Motion estimator reset - translation")
transformation_type = TranslationTransformationGetter()
self.norfair_motion_estimator = MotionEstimator(
transformations_getter=transformation_type,
min_distance=AUTOTRACKING_MOTION_MIN_DISTANCE,
max_points=AUTOTRACKING_MOTION_MAX_POINTS,
)
self.coord_transformations = None
if ptz_moving_at_frame_time(
frame_time,
self.ptz_metrics.start_time.value,
self.ptz_metrics.stop_time.value,
):
logger.debug(
f"{camera}: Motion estimator running - frame time: {frame_time}"
)
frame_id = f"{camera}{frame_time}"
yuv_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv
)
if yuv_frame is None:
self.coord_transformations = None
return None
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420)
# mask out detections for better motion estimation
mask = np.ones(frame.shape[:2], frame.dtype)
detection_boxes = [x[2] for x in detections]
for detection in detection_boxes:
x1, y1, x2, y2 = detection
mask[y1:y2, x1:x2] = 0
# merge camera config motion mask with detections. Norfair function needs 0,1 mask
mask = np.bitwise_and(mask, self.camera_config.motion.mask).clip(max=1)
# Norfair estimator function needs color so it can convert it right back to gray
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGRA)
try:
self.coord_transformations = self.norfair_motion_estimator.update(
frame, mask
)
except Exception:
# sometimes opencv can't find enough features in the image to find homography, so catch this error
# https://github.com/tryolabs/norfair/pull/278
logger.warning(
f"Autotracker: motion estimator couldn't get transformations for {camera} at frame time {frame_time}"
)
self.coord_transformations = None
try:
logger.debug(
f"{camera}: Motion estimator transformation: {self.coord_transformations.rel_to_abs([[0,0]])}"
)
except Exception:
pass
self.frame_manager.close(frame_id)
return self.coord_transformations
class PtzAutoTrackerThread(threading.Thread):
def __init__(
self,
config: FrigateConfig,
onvif: OnvifController,
ptz_metrics: dict[str, PTZMetrics],
dispatcher: Dispatcher,
stop_event: MpEvent,
) -> None:
super().__init__(name="ptz_autotracker")
self.ptz_autotracker = PtzAutoTracker(
config, onvif, ptz_metrics, dispatcher, stop_event
)
self.stop_event = stop_event
self.config = config
def run(self):
while not self.stop_event.wait(1):
for camera, camera_config in self.config.cameras.items():
if not camera_config.enabled:
continue
if camera_config.onvif.autotracking.enabled:
self.ptz_autotracker.camera_maintenance(camera)
else:
# disabled dynamically by mqtt
if self.ptz_autotracker.tracked_object.get(camera):
self.ptz_autotracker.tracked_object[camera] = None
self.ptz_autotracker.tracked_object_history[camera].clear()
logger.info("Exiting autotracker...")
class PtzAutoTracker:
def __init__(
self,
config: FrigateConfig,
onvif: OnvifController,
ptz_metrics: PTZMetrics,
dispatcher: Dispatcher,
stop_event: MpEvent,
) -> None:
self.config = config
self.onvif = onvif
self.ptz_metrics = ptz_metrics
self.dispatcher = dispatcher
self.stop_event = stop_event
self.tracked_object: dict[str, object] = {}
self.tracked_object_history: dict[str, object] = {}
self.tracked_object_metrics: dict[str, object] = {}
self.object_types: dict[str, object] = {}
self.required_zones: dict[str, object] = {}
self.move_queues: dict[str, object] = {}
self.move_queue_locks: dict[str, object] = {}
self.move_threads: dict[str, object] = {}
self.autotracker_init: dict[str, object] = {}
self.move_metrics: dict[str, object] = {}
self.calibrating: dict[str, object] = {}
self.intercept: dict[str, object] = {}
self.move_coefficients: dict[str, object] = {}
self.zoom_factor: dict[str, object] = {}
# if cam is set to autotrack, onvif should be set up
for camera, camera_config in self.config.cameras.items():
if not camera_config.enabled:
continue
self.autotracker_init[camera] = False
if (
camera_config.onvif.autotracking.enabled
and camera_config.onvif.autotracking.enabled_in_config
):
self._autotracker_setup(camera_config, camera)
def _autotracker_setup(self, camera_config, camera):
logger.debug(f"{camera}: Autotracker init")
self.object_types[camera] = camera_config.onvif.autotracking.track
self.required_zones[camera] = camera_config.onvif.autotracking.required_zones
self.zoom_factor[camera] = camera_config.onvif.autotracking.zoom_factor
self.tracked_object[camera] = None
self.tracked_object_history[camera] = deque(
maxlen=round(camera_config.detect.fps * 1.5)
)
self.tracked_object_metrics[camera] = {
"max_target_box": AUTOTRACKING_MAX_AREA_RATIO
** (1 / self.zoom_factor[camera])
}
self.calibrating[camera] = False
self.move_metrics[camera] = []
self.intercept[camera] = None
self.move_coefficients[camera] = []
self.move_queues[camera] = queue.Queue()
self.move_queue_locks[camera] = threading.Lock()
# handle onvif constructor failing due to no connection
if camera not in self.onvif.cams:
logger.warning(
f"Disabling autotracking for {camera}: onvif connection failed"
)
camera_config.onvif.autotracking.enabled = False
self.ptz_metrics[camera].autotracker_enabled.value = False
return
if not self.onvif.cams[camera]["init"]:
if not self.onvif._init_onvif(camera):
logger.warning(
f"Disabling autotracking for {camera}: Unable to initialize onvif"
)
camera_config.onvif.autotracking.enabled = False
self.ptz_metrics[camera].autotracker_enabled.value = False
return
if "pt-r-fov" not in self.onvif.cams[camera]["features"]:
logger.warning(
f"Disabling autotracking for {camera}: FOV relative movement not supported"
)
camera_config.onvif.autotracking.enabled = False
self.ptz_metrics[camera].autotracker_enabled.value = False
return
move_status_supported = self.onvif.get_service_capabilities(camera)
if move_status_supported is None or move_status_supported.lower() != "true":
logger.warning(
f"Disabling autotracking for {camera}: ONVIF MoveStatus not supported"
)
camera_config.onvif.autotracking.enabled = False
self.ptz_metrics[camera].autotracker_enabled.value = False
return
if self.onvif.cams[camera]["init"]:
self.onvif.get_camera_status(camera)
# movement thread per camera
self.move_threads[camera] = threading.Thread(
name=f"ptz_move_thread_{camera}",
target=partial(self._process_move_queue, camera),
)
self.move_threads[camera].daemon = True
self.move_threads[camera].start()
if camera_config.onvif.autotracking.movement_weights:
if len(camera_config.onvif.autotracking.movement_weights) == 5:
camera_config.onvif.autotracking.movement_weights = [
float(val)
for val in camera_config.onvif.autotracking.movement_weights
]
self.ptz_metrics[
camera
].min_zoom.value = (
camera_config.onvif.autotracking.movement_weights[0]
)
self.ptz_metrics[
camera
].max_zoom.value = (
camera_config.onvif.autotracking.movement_weights[1]
)
self.intercept[camera] = (
camera_config.onvif.autotracking.movement_weights[2]
)
self.move_coefficients[camera] = (
camera_config.onvif.autotracking.movement_weights[3:]
)
else:
camera_config.onvif.autotracking.enabled = False
self.ptz_metrics[camera].autotracker_enabled.value = False
logger.warning(
f"Autotracker recalibration is required for {camera}. Disabling autotracking."
)
if camera_config.onvif.autotracking.calibrate_on_startup:
self._calibrate_camera(camera)
self.ptz_metrics[camera].tracking_active.clear()
self.dispatcher.publish(f"{camera}/ptz_autotracker/active", "OFF", retain=False)
self.autotracker_init[camera] = True
def _write_config(self, camera):
config_file = os.environ.get("CONFIG_FILE", f"{CONFIG_DIR}/config.yml")
# Check if we can use .yaml instead of .yml
config_file_yaml = config_file.replace(".yml", ".yaml")
if os.path.isfile(config_file_yaml):
config_file = config_file_yaml
logger.debug(
f"{camera}: Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}"
)
update_yaml_file(
config_file,
["cameras", camera, "onvif", "autotracking", "movement_weights"],
self.config.cameras[camera].onvif.autotracking.movement_weights,
)
def _calibrate_camera(self, camera):
# move the camera from the preset in steps and measure the time it takes to move that amount
# this will allow us to predict movement times with a simple linear regression
# start with 0 so we can determine a baseline (to be used as the intercept in the regression calc)
# TODO: take zooming into account too
num_steps = 30
step_sizes = np.linspace(0, 1, num_steps)
zoom_in_values = []
zoom_out_values = []
self.calibrating[camera] = True
logger.info(f"Camera calibration for {camera} in progress")
# zoom levels test
if (
self.config.cameras[camera].onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
logger.info(f"Calibration for {camera} in progress: 0% complete")
for i in range(2):
# absolute move to 0 - fully zoomed out
self.onvif._zoom_absolute(
camera,
self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Min"],
1,
)
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value)
self.onvif._zoom_absolute(
camera,
self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Max"],
1,
)
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
zoom_in_values.append(self.ptz_metrics[camera].zoom_level.value)
if (
self.config.cameras[camera].onvif.autotracking.zooming
== ZoomingModeEnum.relative
):
# relative move to -0.01
self.onvif._move_relative(
camera,
0,
0,
-1e-2,
1,
)
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value)
# relative move to 0.01
self.onvif._move_relative(
camera,
0,
0,
1e-2,
1,
)
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
zoom_in_values.append(self.ptz_metrics[camera].zoom_level.value)
self.ptz_metrics[camera].max_zoom.value = max(zoom_in_values)
self.ptz_metrics[camera].min_zoom.value = min(zoom_out_values)
logger.debug(
f"{camera}: Calibration values: max zoom: {self.ptz_metrics[camera].max_zoom.value}, min zoom: {self.ptz_metrics[camera].min_zoom.value}"
)
else:
self.ptz_metrics[camera].max_zoom.value = 1
self.ptz_metrics[camera].min_zoom.value = 0
self.onvif._move_to_preset(
camera,
self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
)
self.ptz_metrics[camera].reset.set()
self.ptz_metrics[camera].motor_stopped.clear()
# Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
for step in range(num_steps):
pan = step_sizes[step]
tilt = step_sizes[step]
start_time = time.time()
self.onvif._move_relative(camera, pan, tilt, 0, 1)
# Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
stop_time = time.time()
self.move_metrics[camera].append(
{
"pan": pan,
"tilt": tilt,
"start_timestamp": start_time,
"end_timestamp": stop_time,
}
)
self.onvif._move_to_preset(
camera,
self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
)
self.ptz_metrics[camera].reset.set()
self.ptz_metrics[camera].motor_stopped.clear()
# Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
logger.info(
f"Calibration for {camera} in progress: {round((step/num_steps)*100)}% complete"
)
self.calibrating[camera] = False
logger.info(f"Calibration for {camera} complete")
# calculate and save new intercept and coefficients
self._calculate_move_coefficients(camera, True)
def _calculate_move_coefficients(self, camera, calibration=False):
# calculate new coefficients when we have 50 more new values. Save up to 500
if calibration or (
len(self.move_metrics[camera]) % 50 == 0
and len(self.move_metrics[camera]) != 0
and len(self.move_metrics[camera]) <= AUTOTRACKING_MAX_MOVE_METRICS
):
X = np.array(
[abs(d["pan"]) + abs(d["tilt"]) for d in self.move_metrics[camera]]
)
y = np.array(
[
d["end_timestamp"] - d["start_timestamp"]
for d in self.move_metrics[camera]
]
)
# simple linear regression with intercept
X_with_intercept = np.column_stack((np.ones(X.shape[0]), X))
self.move_coefficients[camera] = np.linalg.lstsq(
X_with_intercept, y, rcond=None
)[0]
# only assign a new intercept if we're calibrating
if calibration:
self.intercept[camera] = y[0]
# write the min zoom, max zoom, intercept, and coefficients
# back to the config file as a comma separated string
self.config.cameras[camera].onvif.autotracking.movement_weights = ", ".join(
str(v)
for v in [
self.ptz_metrics[camera].min_zoom.value,
self.ptz_metrics[camera].max_zoom.value,
self.intercept[camera],
*self.move_coefficients[camera],
]
)
logger.debug(
f"{camera}: New regression parameters - intercept: {self.intercept[camera]}, coefficients: {self.move_coefficients[camera]}"
)
self._write_config(camera)
def _predict_movement_time(self, camera, pan, tilt):
combined_movement = abs(pan) + abs(tilt)
input_data = np.array([self.intercept[camera], combined_movement])
return np.dot(self.move_coefficients[camera], input_data)
def _predict_area_after_time(self, camera, time):
return np.dot(
self.tracked_object_metrics[camera]["area_coefficients"],
[self.tracked_object_history[camera][-1]["frame_time"] + time],
)
def _calculate_tracked_object_metrics(self, camera, obj):
def remove_outliers(data):
areas = [item["area"] for item in data]
Q1 = np.percentile(areas, 25)
Q3 = np.percentile(areas, 75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
filtered_data = [
item for item in data if lower_bound <= item["area"] <= upper_bound
]
# Find and log the removed values
removed_values = [item for item in data if item not in filtered_data]
logger.debug(f"{camera}: Removed area outliers: {removed_values}")
return filtered_data
camera_config = self.config.cameras[camera]
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
# Extract areas and calculate weighted average
# grab the largest dimension of the bounding box and create a square from that
areas = [
{
"frame_time": obj["frame_time"],
"box": obj["box"],
"area": max(
obj["box"][2] - obj["box"][0], obj["box"][3] - obj["box"][1]
)
** 2,
}
for obj in self.tracked_object_history[camera]
]
filtered_areas = remove_outliers(areas) if len(areas) >= 2 else areas
# Filter entries that are not touching the frame edge
filtered_areas_not_touching_edge = [
entry
for entry in filtered_areas
if self._touching_frame_edges(camera, entry["box"]) == 0
]
# Calculate regression for area change predictions
if len(filtered_areas_not_touching_edge):
X = np.array(
[item["frame_time"] for item in filtered_areas_not_touching_edge]
)
y = np.array([item["area"] for item in filtered_areas_not_touching_edge])
self.tracked_object_metrics[camera]["area_coefficients"] = np.linalg.lstsq(
X.reshape(-1, 1), y, rcond=None
)[0]
else:
self.tracked_object_metrics[camera]["area_coefficients"] = np.array([0])
weights = np.arange(1, len(filtered_areas) + 1)
weighted_area = np.average(
[item["area"] for item in filtered_areas], weights=weights
)
self.tracked_object_metrics[camera]["target_box"] = (
weighted_area / (camera_width * camera_height)
) ** self.zoom_factor[camera]
if "original_target_box" not in self.tracked_object_metrics[camera]:
self.tracked_object_metrics[camera]["original_target_box"] = (
self.tracked_object_metrics[camera]["target_box"]
)
(
self.tracked_object_metrics[camera]["valid_velocity"],
self.tracked_object_metrics[camera]["velocity"],
) = self._get_valid_velocity(camera, obj)
self.tracked_object_metrics[camera]["distance"] = self._get_distance_threshold(
camera, obj
)
centroid_distance = np.linalg.norm(
[
obj.obj_data["centroid"][0] - camera_config.detect.width / 2,
obj.obj_data["centroid"][1] - camera_config.detect.height / 2,
]
)
logger.debug(f"{camera}: Centroid distance: {centroid_distance}")
self.tracked_object_metrics[camera]["below_distance_threshold"] = (
centroid_distance < self.tracked_object_metrics[camera]["distance"]
)
def _process_move_queue(self, camera):
camera_config = self.config.cameras[camera]
camera_config.frame_shape[1]
camera_config.frame_shape[0]
while not self.stop_event.is_set():
try:
move_data = self.move_queues[camera].get(True, 0.1)
except queue.Empty:
continue
with self.move_queue_locks[camera]:
frame_time, pan, tilt, zoom = move_data
# if we're receiving move requests during a PTZ move, ignore them
if ptz_moving_at_frame_time(
frame_time,
self.ptz_metrics[camera].start_time.value,
self.ptz_metrics[camera].stop_time.value,
):
# instead of dequeueing this might be a good place to preemptively move based
# on an estimate - for fast moving objects, etc.
logger.debug(
f"{camera}: Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}"
)
continue
else:
if (
self.config.cameras[camera].onvif.autotracking.zooming
== ZoomingModeEnum.relative
):
self.onvif._move_relative(camera, pan, tilt, zoom, 1)
else:
if pan != 0 or tilt != 0:
self.onvif._move_relative(camera, pan, tilt, 0, 1)
# Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
if (
zoom > 0
and self.ptz_metrics[camera].zoom_level.value != zoom
):
self.onvif._zoom_absolute(camera, zoom, 1)
# Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
if self.config.cameras[camera].onvif.autotracking.movement_weights:
logger.debug(
f"{camera}: Predicted movement time: {self._predict_movement_time(camera, pan, tilt)}"
)
logger.debug(
f"{camera}: Actual movement time: {self.ptz_metrics[camera].stop_time.value-self.ptz_metrics[camera].start_time.value}"
)
# save metrics for better estimate calculations
if (
self.intercept[camera] is not None
and len(self.move_metrics[camera])
< AUTOTRACKING_MAX_MOVE_METRICS
and (pan != 0 or tilt != 0)
and self.config.cameras[
camera
].onvif.autotracking.calibrate_on_startup
):
logger.debug(f"{camera}: Adding new values to move metrics")
self.move_metrics[camera].append(
{
"pan": pan,
"tilt": tilt,
"start_timestamp": self.ptz_metrics[
camera
].start_time.value,
"end_timestamp": self.ptz_metrics[
camera
].stop_time.value,
}
)
# calculate new coefficients if we have enough data
self._calculate_move_coefficients(camera)
def _enqueue_move(self, camera, frame_time, pan, tilt, zoom):
def split_value(value, suppress_diff=True):
clipped = np.clip(value, -1, 1)
# don't make small movements
if -0.05 < clipped < 0.05 and suppress_diff:
diff = 0.0
else:
diff = value - clipped
return clipped, diff
if (
frame_time > self.ptz_metrics[camera].start_time.value
and frame_time > self.ptz_metrics[camera].stop_time.value
and not self.move_queue_locks[camera].locked()
):
# we can split up any large moves caused by velocity estimated movements if necessary
# get an excess amount and assign it instead of 0 below
while pan != 0 or tilt != 0 or zoom != 0:
pan, _ = split_value(pan)
tilt, _ = split_value(tilt)
zoom, _ = split_value(zoom, False)
logger.debug(
f"{camera}: Enqueue movement for frame time: {frame_time} pan: {pan}, tilt: {tilt}, zoom: {zoom}"
)
move_data = (frame_time, pan, tilt, zoom)
self.move_queues[camera].put(move_data)
# reset values to not split up large movements
pan = 0
tilt = 0
zoom = 0
def _touching_frame_edges(self, camera, box):
camera_config = self.config.cameras[camera]
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
bb_left, bb_top, bb_right, bb_bottom = box
edge_threshold = AUTOTRACKING_ZOOM_EDGE_THRESHOLD
return int(
(bb_left < edge_threshold * camera_width)
+ (bb_right > (1 - edge_threshold) * camera_width)
+ (bb_top < edge_threshold * camera_height)
+ (bb_bottom > (1 - edge_threshold) * camera_height)
)
def _get_valid_velocity(self, camera, obj):
# returns a tuple and euclidean distance if the estimated velocity is valid
# if invalid, returns [0, 0] and -1
camera_config = self.config.cameras[camera]
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
camera_fps = camera_config.detect.fps
# estimate_velocity is a numpy array of bbox top,left and bottom,right velocities
velocities = obj.obj_data["estimate_velocity"]
logger.debug(
f"{camera}: Velocity (Norfair): {tuple(np.round(velocities).flatten().astype(int))}"
)
# if we are close enough to zero, return right away
if np.all(np.round(velocities) == 0):
return True, np.zeros((4,))
# Thresholds
x_mags_thresh = camera_width / camera_fps / 2
y_mags_thresh = camera_height / camera_fps / 2
dir_thresh = 0.93
delta_thresh = 20
var_thresh = 10
# Check magnitude
x_mags = np.abs(velocities[:, 0])
y_mags = np.abs(velocities[:, 1])
invalid_x_mags = np.any(x_mags > x_mags_thresh)
invalid_y_mags = np.any(y_mags > y_mags_thresh)
# Check delta
delta = np.abs(velocities[0] - velocities[1])
invalid_delta = np.any(delta > delta_thresh)
# Check variance
stdev_list = np.std(velocities, axis=0)
high_variances = np.any(stdev_list > var_thresh)
# Check direction difference
velocities = np.round(velocities)
invalid_dirs = False
if not np.any(np.linalg.norm(velocities, axis=1)):
cosine_sim = np.dot(velocities[0], velocities[1]) / (
np.linalg.norm(velocities[0]) * np.linalg.norm(velocities[1])
)
dir_thresh = 0.6 if np.all(delta < delta_thresh / 2) else dir_thresh
invalid_dirs = cosine_sim < dir_thresh
# Combine
invalid = (
invalid_x_mags
or invalid_y_mags
or invalid_dirs
or invalid_delta
or high_variances
)
if invalid:
logger.debug(
f"{camera}: Invalid velocity: {tuple(np.round(velocities, 2).flatten().astype(int))}: Invalid because: "
+ ", ".join(
[
var_name
for var_name, is_invalid in [
("invalid_x_mags", invalid_x_mags),
("invalid_y_mags", invalid_y_mags),
("invalid_dirs", invalid_dirs),
("invalid_delta", invalid_delta),
("high_variances", high_variances),
]
if is_invalid
]
)
)
# invalid velocity
return False, np.zeros((4,))
else:
logger.debug(f"{camera}: Valid velocity ")
return True, velocities.flatten()
def _get_distance_threshold(self, camera, obj):
# Returns true if Euclidean distance from object to center of frame is
# less than 10% of the of the larger dimension (width or height) of the frame,
# multiplied by a scaling factor for object size.
# Distance is increased if object is not moving to prevent small ptz moves
# Adjusting this percentage slightly lower will effectively cause the camera to move
# more often to keep the object in the center. Raising the percentage will cause less
# movement and will be more flexible with objects not quite being centered.
# TODO: there's probably a better way to approach this
camera_config = self.config.cameras[camera]
obj_width = obj.obj_data["box"][2] - obj.obj_data["box"][0]
obj_height = obj.obj_data["box"][3] - obj.obj_data["box"][1]
max_obj = max(obj_width, obj_height)
max_frame = (
camera_config.detect.width
if max_obj == obj_width
else camera_config.detect.height
)
# larger objects should lower the threshold, smaller objects should raise it
scaling_factor = 1 - np.log(max_obj / max_frame)
percentage = (
0.08
if camera_config.onvif.autotracking.movement_weights
and self.tracked_object_metrics[camera]["valid_velocity"]
else 0.03
)
distance_threshold = percentage * max_frame * scaling_factor
logger.debug(f"{camera}: Distance threshold: {distance_threshold}")
return distance_threshold
def _should_zoom_in(self, camera, obj, box, predicted_time, debug_zooming=False):
# returns True if we should zoom in, False if we should zoom out, None to do nothing
camera_config = self.config.cameras[camera]
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
camera_fps = camera_config.detect.fps
average_velocity = self.tracked_object_metrics[camera]["velocity"]
bb_left, bb_top, bb_right, bb_bottom = box
# calculate a velocity threshold based on movement coefficients if available
if camera_config.onvif.autotracking.movement_weights:
predicted_movement_time = self._predict_movement_time(camera, 1, 1)
velocity_threshold_x = camera_width / predicted_movement_time / camera_fps
velocity_threshold_y = camera_height / predicted_movement_time / camera_fps
else:
# use a generic velocity threshold
velocity_threshold_x = camera_width * 0.02
velocity_threshold_y = camera_height * 0.02
# return a count of the number of frame edges the bounding box is touching
touching_frame_edges = self._touching_frame_edges(camera, box)
# make sure object is centered in the frame
below_distance_threshold = self.tracked_object_metrics[camera][
"below_distance_threshold"
]
below_dimension_threshold = (bb_right - bb_left) <= camera_width * (
self.zoom_factor[camera] + 0.1
) and (bb_bottom - bb_top) <= camera_height * (self.zoom_factor[camera] + 0.1)
# ensure object is not moving quickly
below_velocity_threshold = np.all(
np.abs(average_velocity)
< np.tile([velocity_threshold_x, velocity_threshold_y], 2)
) or np.all(average_velocity == 0)
if not predicted_time:
calculated_target_box = self.tracked_object_metrics[camera]["target_box"]
else:
calculated_target_box = self.tracked_object_metrics[camera][
"target_box"
] + self._predict_area_after_time(camera, predicted_time) / (
camera_width * camera_height
)
below_area_threshold = (
calculated_target_box
< self.tracked_object_metrics[camera]["max_target_box"]
)
# introduce some hysteresis to prevent a yo-yo zooming effect
zoom_out_hysteresis = (
calculated_target_box
> self.tracked_object_metrics[camera]["max_target_box"]
* AUTOTRACKING_ZOOM_OUT_HYSTERESIS
)
zoom_in_hysteresis = (
calculated_target_box
< self.tracked_object_metrics[camera]["max_target_box"]
* AUTOTRACKING_ZOOM_IN_HYSTERESIS
)
at_max_zoom = (
self.ptz_metrics[camera].zoom_level.value
== self.ptz_metrics[camera].max_zoom.value
)
at_min_zoom = (
self.ptz_metrics[camera].zoom_level.value
== self.ptz_metrics[camera].min_zoom.value
)
# debug zooming
if debug_zooming:
logger.debug(
f"{camera}: Zoom test: touching edges: count: {touching_frame_edges} left: {bb_left < AUTOTRACKING_ZOOM_EDGE_THRESHOLD * camera_width}, right: {bb_right > (1 - AUTOTRACKING_ZOOM_EDGE_THRESHOLD) * camera_width}, top: {bb_top < AUTOTRACKING_ZOOM_EDGE_THRESHOLD * camera_height}, bottom: {bb_bottom > (1 - AUTOTRACKING_ZOOM_EDGE_THRESHOLD) * camera_height}"
)
logger.debug(
f"{camera}: Zoom test: below distance threshold: {(below_distance_threshold)}"
)
logger.debug(
f"{camera}: Zoom test: below area threshold: {(below_area_threshold)} target: {self.tracked_object_metrics[camera]['target_box']}, calculated: {calculated_target_box}, max: {self.tracked_object_metrics[camera]['max_target_box']}"
)
logger.debug(
f"{camera}: Zoom test: below dimension threshold: {below_dimension_threshold} width: {bb_right - bb_left}, max width: {camera_width * (self.zoom_factor[camera] + 0.1)}, height: {bb_bottom - bb_top}, max height: {camera_height * (self.zoom_factor[camera] + 0.1)}"
)
logger.debug(
f"{camera}: Zoom test: below velocity threshold: {below_velocity_threshold} velocity x: {abs(average_velocity[0])}, x threshold: {velocity_threshold_x}, velocity y: {abs(average_velocity[0])}, y threshold: {velocity_threshold_y}"
)
logger.debug(f"{camera}: Zoom test: at max zoom: {at_max_zoom}")
logger.debug(f"{camera}: Zoom test: at min zoom: {at_min_zoom}")
logger.debug(
f'{camera}: Zoom test: zoom in hysteresis limit: {zoom_in_hysteresis} value: {AUTOTRACKING_ZOOM_IN_HYSTERESIS} original: {self.tracked_object_metrics[camera]["original_target_box"]} max: {self.tracked_object_metrics[camera]["max_target_box"]} target: {calculated_target_box if calculated_target_box else self.tracked_object_metrics[camera]["target_box"]}'
)
logger.debug(
f'{camera}: Zoom test: zoom out hysteresis limit: {zoom_out_hysteresis} value: {AUTOTRACKING_ZOOM_OUT_HYSTERESIS} original: {self.tracked_object_metrics[camera]["original_target_box"]} max: {self.tracked_object_metrics[camera]["max_target_box"]} target: {calculated_target_box if calculated_target_box else self.tracked_object_metrics[camera]["target_box"]}'
)
# Zoom in conditions (and)
if (
zoom_in_hysteresis
and touching_frame_edges == 0
and below_velocity_threshold
and below_dimension_threshold
and below_area_threshold
and not at_max_zoom
):
return True
# Zoom out conditions (or)
if (
(
zoom_out_hysteresis
and not at_max_zoom
and (not below_area_threshold or not below_dimension_threshold)
)
or (zoom_out_hysteresis and not below_area_threshold and at_max_zoom)
or (
touching_frame_edges == 1
and (below_distance_threshold or not below_dimension_threshold)
)
or touching_frame_edges > 1
or not below_velocity_threshold
) and not at_min_zoom:
return False
# Don't zoom at all
return None
def _autotrack_move_ptz(self, camera, obj):
camera_config = self.config.cameras[camera]
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
camera_fps = camera_config.detect.fps
predicted_movement_time = 0
average_velocity = np.zeros((4,))
predicted_box = obj.obj_data["box"]
centroid_x = obj.obj_data["centroid"][0]
centroid_y = obj.obj_data["centroid"][1]
# Normalize coordinates. top right of the fov is (1,1), center is (0,0), bottom left is (-1, -1).
pan = ((centroid_x / camera_width) - 0.5) * 2
tilt = (0.5 - (centroid_y / camera_height)) * 2
if (
camera_config.onvif.autotracking.movement_weights
): # use estimates if we have available coefficients
predicted_movement_time = self._predict_movement_time(camera, pan, tilt)
_, average_velocity = (
self._get_valid_velocity(camera, obj)
if "velocity" not in self.tracked_object_metrics[camera]
else (
self.tracked_object_metrics[camera]["valid_velocity"],
self.tracked_object_metrics[camera]["velocity"],
)
)
if np.any(average_velocity):
# this box could exceed the frame boundaries if velocity is high
# but we'll handle that in _enqueue_move() as two separate moves
current_box = np.array(obj.obj_data["box"])
predicted_box = (
current_box
+ camera_fps * predicted_movement_time * average_velocity
)
predicted_box = np.round(predicted_box).astype(int)
centroid_x = round((predicted_box[0] + predicted_box[2]) / 2)
centroid_y = round((predicted_box[1] + predicted_box[3]) / 2)
# recalculate pan and tilt with new centroid
pan = ((centroid_x / camera_width) - 0.5) * 2
tilt = (0.5 - (centroid_y / camera_height)) * 2
logger.debug(f'{camera}: Original box: {obj.obj_data["box"]}')
logger.debug(f"{camera}: Predicted box: {tuple(predicted_box)}")
logger.debug(
f"{camera}: Velocity: {tuple(np.round(average_velocity).flatten().astype(int))}"
)
zoom = self._get_zoom_amount(
camera, obj, predicted_box, predicted_movement_time, debug_zoom=True
)
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt, zoom)
def _autotrack_move_zoom_only(self, camera, obj):
camera_config = self.config.cameras[camera]
if camera_config.onvif.autotracking.zooming != ZoomingModeEnum.disabled:
zoom = self._get_zoom_amount(camera, obj, obj.obj_data["box"], 0)
if zoom != 0:
self._enqueue_move(camera, obj.obj_data["frame_time"], 0, 0, zoom)
def _get_zoom_amount(
self, camera, obj, predicted_box, predicted_movement_time, debug_zoom=True
):
camera_config = self.config.cameras[camera]
# frame width and height
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
zoom = 0
result = None
current_zoom_level = self.ptz_metrics[camera].zoom_level.value
target_box = max(
obj.obj_data["box"][2] - obj.obj_data["box"][0],
obj.obj_data["box"][3] - obj.obj_data["box"][1],
) ** 2 / (camera_width * camera_height)
# absolute zooming separately from pan/tilt
if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.absolute:
# don't zoom on initial move
if "target_box" not in self.tracked_object_metrics[camera]:
zoom = current_zoom_level
else:
if (
result := self._should_zoom_in(
camera,
obj,
obj.obj_data["box"],
predicted_movement_time,
debug_zoom,
)
) is not None:
# divide zoom in 10 increments and always zoom out more than in
level = (
self.ptz_metrics[camera].max_zoom.value
- self.ptz_metrics[camera].min_zoom.value
) / 20
if result:
zoom = min(1.0, current_zoom_level + level)
else:
zoom = max(0.0, current_zoom_level - 2 * level)
# relative zooming concurrently with pan/tilt
if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.relative:
# this is our initial zoom in on a new object
if "target_box" not in self.tracked_object_metrics[camera]:
zoom = target_box ** self.zoom_factor[camera]
if zoom > self.tracked_object_metrics[camera]["max_target_box"]:
zoom = -(1 - zoom)
logger.debug(
f"{camera}: target box: {target_box}, max: {self.tracked_object_metrics[camera]['max_target_box']}, calc zoom: {zoom}"
)
else:
if (
result := self._should_zoom_in(
camera,
obj,
predicted_box
if camera_config.onvif.autotracking.movement_weights
else obj.obj_data["box"],
predicted_movement_time,
debug_zoom,
)
) is not None:
if predicted_movement_time:
calculated_target_box = self.tracked_object_metrics[camera][
"target_box"
] + self._predict_area_after_time(
camera, predicted_movement_time
) / (camera_width * camera_height)
logger.debug(
f"{camera}: Zooming prediction: predicted movement time: {predicted_movement_time}, original box: {self.tracked_object_metrics[camera]['target_box']}, calculated box: {calculated_target_box}"
)
else:
calculated_target_box = self.tracked_object_metrics[camera][
"target_box"
]
# zoom value
ratio = (
self.tracked_object_metrics[camera]["max_target_box"]
/ calculated_target_box
)
zoom = (ratio - 1) / (ratio + 1)
logger.debug(
f'{camera}: limit: {self.tracked_object_metrics[camera]["max_target_box"]}, ratio: {ratio} zoom calculation: {zoom}'
)
if not result:
# zoom out with special condition if zooming out because of velocity, edges, etc.
zoom = -(1 - zoom) if zoom > 0 else -(zoom * 2 + 1)
if result:
# zoom in
zoom = 1 - zoom if zoom > 0 else (zoom * 2 + 1)
logger.debug(f"{camera}: Zooming: {result} Zoom amount: {zoom}")
return zoom
def is_autotracking(self, camera):
return self.tracked_object[camera] is not None
def autotracked_object_region(self, camera):
return self.tracked_object[camera]["region"]
def autotrack_object(self, camera, obj):
camera_config = self.config.cameras[camera]
if camera_config.onvif.autotracking.enabled:
if not self.autotracker_init[camera]:
self._autotracker_setup(camera_config, camera)
if self.calibrating[camera]:
logger.debug(f"{camera}: Calibrating camera")
return
# this is a brand new object that's on our camera, has our label, entered the zone,
# is not a false positive, and is not initially motionless
if (
# new object
self.tracked_object[camera] is None
and obj.camera == camera
and obj.obj_data["label"] in self.object_types[camera]
and set(obj.entered_zones) & set(self.required_zones[camera])
and not obj.previous["false_positive"]
and not obj.false_positive
and not self.tracked_object_history[camera]
and obj.obj_data["motionless_count"] == 0
):
logger.debug(
f"{camera}: New object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self.ptz_metrics[camera].tracking_active.set()
self.dispatcher.publish(
f"{camera}/ptz_autotracker/active", "ON", retain=False
)
self.tracked_object[camera] = obj
self.tracked_object_history[camera].append(copy.deepcopy(obj.obj_data))
self._autotrack_move_ptz(camera, obj)
return
if (
# already tracking an object
self.tracked_object[camera] is not None
and self.tracked_object_history[camera]
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
and obj.obj_data["frame_time"]
!= self.tracked_object_history[camera][-1]["frame_time"]
):
self.tracked_object_history[camera].append(copy.deepcopy(obj.obj_data))
self._calculate_tracked_object_metrics(camera, obj)
if not ptz_moving_at_frame_time(
obj.obj_data["frame_time"],
self.ptz_metrics[camera].start_time.value,
self.ptz_metrics[camera].stop_time.value,
):
if self.tracked_object_metrics[camera]["below_distance_threshold"]:
logger.debug(
f"{camera}: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
# no need to move, but try zooming
self._autotrack_move_zoom_only(camera, obj)
else:
logger.debug(
f"{camera}: Existing object (need to move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self._autotrack_move_ptz(camera, obj)
return
if (
# The tracker lost an object, so let's check the previous object's region and compare it with the incoming object
# If it's within bounds, start tracking that object.
# Should we check region (maybe too broad) or expand the previous object's box a bit and check that?
self.tracked_object[camera] is None
and obj.camera == camera
and obj.obj_data["label"] in self.object_types[camera]
and not obj.previous["false_positive"]
and not obj.false_positive
and self.tracked_object_history[camera]
):
if (
intersection_over_union(
self.tracked_object_history[camera][-1]["region"],
obj.obj_data["box"],
)
< 0.2
):
logger.debug(
f"{camera}: Reacquired object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self.tracked_object[camera] = obj
self.tracked_object_history[camera].clear()
self.tracked_object_history[camera].append(
copy.deepcopy(obj.obj_data)
)
self._calculate_tracked_object_metrics(camera, obj)
self._autotrack_move_ptz(camera, obj)
return
def end_object(self, camera, obj):
if self.config.cameras[camera].onvif.autotracking.enabled:
if (
self.tracked_object[camera] is not None
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
):
logger.debug(
f"{camera}: End object: {obj.obj_data['id']} {obj.obj_data['box']}"
)
self.tracked_object[camera] = None
self.tracked_object_metrics[camera] = {
"max_target_box": AUTOTRACKING_MAX_AREA_RATIO
** (1 / self.zoom_factor[camera])
}
def camera_maintenance(self, camera):
# bail and don't check anything if we're calibrating or tracking an object
if (
not self.autotracker_init[camera]
or self.calibrating[camera]
or self.tracked_object[camera] is not None
):
return
# calls get_camera_status to check/update ptz movement
# returns camera to preset after timeout when tracking is over
autotracker_config = self.config.cameras[camera].onvif.autotracking
if not self.autotracker_init[camera]:
self._autotracker_setup(self.config.cameras[camera], camera)
# regularly update camera status
if not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
# return to preset if tracking is over
if (
self.tracked_object[camera] is None
and self.tracked_object_history[camera]
and (
# might want to use a different timestamp here?
self.ptz_metrics[camera].frame_time.value
- self.tracked_object_history[camera][-1]["frame_time"]
>= autotracker_config.timeout
)
and autotracker_config.return_preset
):
# clear tracked object and reset zoom level
self.tracked_object[camera] = None
self.tracked_object_history[camera].clear()
# empty move queue
while not self.move_queues[camera].empty():
self.move_queues[camera].get()
self.ptz_metrics[camera].motor_stopped.wait()
logger.debug(
f"{camera}: Time is {self.ptz_metrics[camera].frame_time.value}, returning to preset: {autotracker_config.return_preset}"
)
self.onvif._move_to_preset(
camera,
autotracker_config.return_preset.lower(),
)
# update stored zoom level from preset
if not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera)
self.ptz_metrics[camera].tracking_active.clear()
self.dispatcher.publish(
f"{camera}/ptz_autotracker/active", "OFF", retain=False
)
self.ptz_metrics[camera].reset.set()