mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-16 00:09:14 +01:00
cb30450060
* Run pydantic migration tool * Finish removing deprecated functions * Formatting * Fix movement weights type * Fix movement weight test * Fix config checks * formatting * fix typing * formatting * Fix * Fix serialization issues * Formatting * fix model namespace warnings * Update formatting * Format go2rtc file * Cleanup migrations * Fix warnings * Don't include null values in config json * Formatting * Fix test --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
1365 lines
56 KiB
Python
1365 lines
56 KiB
Python
"""Automatically pan, tilt, and zoom on detected objects via onvif."""
|
|
|
|
import copy
|
|
import logging
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from functools import partial
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from norfair.camera_motion import (
|
|
HomographyTransformationGetter,
|
|
MotionEstimator,
|
|
TranslationTransformationGetter,
|
|
)
|
|
|
|
from frigate.comms.dispatcher import Dispatcher
|
|
from frigate.config import CameraConfig, FrigateConfig, ZoomingModeEnum
|
|
from frigate.const import (
|
|
AUTOTRACKING_MAX_AREA_RATIO,
|
|
AUTOTRACKING_MAX_MOVE_METRICS,
|
|
AUTOTRACKING_MOTION_MAX_POINTS,
|
|
AUTOTRACKING_MOTION_MIN_DISTANCE,
|
|
AUTOTRACKING_ZOOM_EDGE_THRESHOLD,
|
|
AUTOTRACKING_ZOOM_IN_HYSTERESIS,
|
|
AUTOTRACKING_ZOOM_OUT_HYSTERESIS,
|
|
CONFIG_DIR,
|
|
)
|
|
from frigate.ptz.onvif import OnvifController
|
|
from frigate.types import PTZMetricsTypes
|
|
from frigate.util.builtin import update_yaml_file
|
|
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
|
|
# Determine if the PTZ was in motion at the set frame time
|
|
# for non ptz/autotracking cameras, this will always return False
|
|
# ptz_start_time is initialized to 0 on startup and only changes
|
|
# when autotracking movements are made
|
|
return (ptz_start_time != 0.0 and frame_time > ptz_start_time) and (
|
|
ptz_stop_time == 0.0 or (ptz_start_time <= frame_time <= ptz_stop_time)
|
|
)
|
|
|
|
|
|
class PtzMotionEstimator:
|
|
def __init__(
|
|
self, config: CameraConfig, ptz_metrics: dict[str, PTZMetricsTypes]
|
|
) -> None:
|
|
self.frame_manager = SharedMemoryFrameManager()
|
|
self.norfair_motion_estimator = None
|
|
self.camera_config = config
|
|
self.coord_transformations = None
|
|
self.ptz_metrics = ptz_metrics
|
|
self.ptz_start_time = self.ptz_metrics["ptz_start_time"]
|
|
self.ptz_stop_time = self.ptz_metrics["ptz_stop_time"]
|
|
|
|
self.ptz_metrics["ptz_reset"].set()
|
|
logger.debug(f"{config.name}: Motion estimator init")
|
|
|
|
def motion_estimator(self, detections, frame_time, camera):
|
|
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
|
|
if self.ptz_metrics["ptz_reset"].is_set():
|
|
self.ptz_metrics["ptz_reset"].clear()
|
|
|
|
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
|
|
if (
|
|
self.camera_config.onvif.autotracking.zooming
|
|
!= ZoomingModeEnum.disabled
|
|
):
|
|
logger.debug(f"{camera}: Motion estimator reset - homography")
|
|
transformation_type = HomographyTransformationGetter()
|
|
else:
|
|
logger.debug(f"{camera}: Motion estimator reset - translation")
|
|
transformation_type = TranslationTransformationGetter()
|
|
|
|
self.norfair_motion_estimator = MotionEstimator(
|
|
transformations_getter=transformation_type,
|
|
min_distance=AUTOTRACKING_MOTION_MIN_DISTANCE,
|
|
max_points=AUTOTRACKING_MOTION_MAX_POINTS,
|
|
)
|
|
|
|
self.coord_transformations = None
|
|
|
|
if ptz_moving_at_frame_time(
|
|
frame_time, self.ptz_start_time.value, self.ptz_stop_time.value
|
|
):
|
|
logger.debug(
|
|
f"{camera}: Motion estimator running - frame time: {frame_time}"
|
|
)
|
|
|
|
frame_id = f"{camera}{frame_time}"
|
|
yuv_frame = self.frame_manager.get(
|
|
frame_id, self.camera_config.frame_shape_yuv
|
|
)
|
|
|
|
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420)
|
|
|
|
# mask out detections for better motion estimation
|
|
mask = np.ones(frame.shape[:2], frame.dtype)
|
|
|
|
detection_boxes = [x[2] for x in detections]
|
|
for detection in detection_boxes:
|
|
x1, y1, x2, y2 = detection
|
|
mask[y1:y2, x1:x2] = 0
|
|
|
|
# merge camera config motion mask with detections. Norfair function needs 0,1 mask
|
|
mask = np.bitwise_and(mask, self.camera_config.motion.mask).clip(max=1)
|
|
|
|
# Norfair estimator function needs color so it can convert it right back to gray
|
|
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGRA)
|
|
|
|
try:
|
|
self.coord_transformations = self.norfair_motion_estimator.update(
|
|
frame, mask
|
|
)
|
|
except Exception:
|
|
# sometimes opencv can't find enough features in the image to find homography, so catch this error
|
|
# https://github.com/tryolabs/norfair/pull/278
|
|
logger.warning(
|
|
f"Autotracker: motion estimator couldn't get transformations for {camera} at frame time {frame_time}"
|
|
)
|
|
self.coord_transformations = None
|
|
|
|
try:
|
|
logger.debug(
|
|
f"{camera}: Motion estimator transformation: {self.coord_transformations.rel_to_abs([[0,0]])}"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
self.frame_manager.close(frame_id)
|
|
|
|
return self.coord_transformations
|
|
|
|
|
|
class PtzAutoTrackerThread(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
onvif: OnvifController,
|
|
ptz_metrics: dict[str, PTZMetricsTypes],
|
|
dispatcher: Dispatcher,
|
|
stop_event: MpEvent,
|
|
) -> None:
|
|
threading.Thread.__init__(self)
|
|
self.name = "ptz_autotracker"
|
|
self.ptz_autotracker = PtzAutoTracker(
|
|
config, onvif, ptz_metrics, dispatcher, stop_event
|
|
)
|
|
self.stop_event = stop_event
|
|
self.config = config
|
|
|
|
def run(self):
|
|
while not self.stop_event.wait(1):
|
|
for camera, camera_config in self.config.cameras.items():
|
|
if not camera_config.enabled:
|
|
continue
|
|
|
|
if camera_config.onvif.autotracking.enabled:
|
|
self.ptz_autotracker.camera_maintenance(camera)
|
|
else:
|
|
# disabled dynamically by mqtt
|
|
if self.ptz_autotracker.tracked_object.get(camera):
|
|
self.ptz_autotracker.tracked_object[camera] = None
|
|
self.ptz_autotracker.tracked_object_history[camera].clear()
|
|
|
|
logger.info("Exiting autotracker...")
|
|
|
|
|
|
class PtzAutoTracker:
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
onvif: OnvifController,
|
|
ptz_metrics: PTZMetricsTypes,
|
|
dispatcher: Dispatcher,
|
|
stop_event: MpEvent,
|
|
) -> None:
|
|
self.config = config
|
|
self.onvif = onvif
|
|
self.ptz_metrics = ptz_metrics
|
|
self.dispatcher = dispatcher
|
|
self.stop_event = stop_event
|
|
self.tracked_object: dict[str, object] = {}
|
|
self.tracked_object_history: dict[str, object] = {}
|
|
self.tracked_object_metrics: dict[str, object] = {}
|
|
self.object_types: dict[str, object] = {}
|
|
self.required_zones: dict[str, object] = {}
|
|
self.move_queues: dict[str, object] = {}
|
|
self.move_queue_locks: dict[str, object] = {}
|
|
self.move_threads: dict[str, object] = {}
|
|
self.autotracker_init: dict[str, object] = {}
|
|
self.move_metrics: dict[str, object] = {}
|
|
self.calibrating: dict[str, object] = {}
|
|
self.intercept: dict[str, object] = {}
|
|
self.move_coefficients: dict[str, object] = {}
|
|
self.zoom_factor: dict[str, object] = {}
|
|
|
|
# if cam is set to autotrack, onvif should be set up
|
|
for camera, camera_config in self.config.cameras.items():
|
|
if not camera_config.enabled:
|
|
continue
|
|
|
|
self.autotracker_init[camera] = False
|
|
if (
|
|
camera_config.onvif.autotracking.enabled
|
|
and camera_config.onvif.autotracking.enabled_in_config
|
|
):
|
|
self._autotracker_setup(camera_config, camera)
|
|
|
|
def _autotracker_setup(self, camera_config, camera):
|
|
logger.debug(f"{camera}: Autotracker init")
|
|
|
|
self.object_types[camera] = camera_config.onvif.autotracking.track
|
|
self.required_zones[camera] = camera_config.onvif.autotracking.required_zones
|
|
self.zoom_factor[camera] = camera_config.onvif.autotracking.zoom_factor
|
|
|
|
self.tracked_object[camera] = None
|
|
self.tracked_object_history[camera] = deque(
|
|
maxlen=round(camera_config.detect.fps * 1.5)
|
|
)
|
|
self.tracked_object_metrics[camera] = {
|
|
"max_target_box": AUTOTRACKING_MAX_AREA_RATIO
|
|
** (1 / self.zoom_factor[camera])
|
|
}
|
|
|
|
self.calibrating[camera] = False
|
|
self.move_metrics[camera] = []
|
|
self.intercept[camera] = None
|
|
self.move_coefficients[camera] = []
|
|
|
|
self.move_queues[camera] = queue.Queue()
|
|
self.move_queue_locks[camera] = threading.Lock()
|
|
|
|
# handle onvif constructor failing due to no connection
|
|
if camera not in self.onvif.cams:
|
|
logger.warning(
|
|
f"Disabling autotracking for {camera}: onvif connection failed"
|
|
)
|
|
camera_config.onvif.autotracking.enabled = False
|
|
self.ptz_metrics[camera]["ptz_autotracker_enabled"].value = False
|
|
return
|
|
|
|
if not self.onvif.cams[camera]["init"]:
|
|
if not self.onvif._init_onvif(camera):
|
|
logger.warning(
|
|
f"Disabling autotracking for {camera}: Unable to initialize onvif"
|
|
)
|
|
camera_config.onvif.autotracking.enabled = False
|
|
self.ptz_metrics[camera]["ptz_autotracker_enabled"].value = False
|
|
return
|
|
|
|
if "pt-r-fov" not in self.onvif.cams[camera]["features"]:
|
|
logger.warning(
|
|
f"Disabling autotracking for {camera}: FOV relative movement not supported"
|
|
)
|
|
camera_config.onvif.autotracking.enabled = False
|
|
self.ptz_metrics[camera]["ptz_autotracker_enabled"].value = False
|
|
return
|
|
|
|
movestatus_supported = self.onvif.get_service_capabilities(camera)
|
|
|
|
if movestatus_supported is None or movestatus_supported.lower() != "true":
|
|
logger.warning(
|
|
f"Disabling autotracking for {camera}: ONVIF MoveStatus not supported"
|
|
)
|
|
camera_config.onvif.autotracking.enabled = False
|
|
self.ptz_metrics[camera]["ptz_autotracker_enabled"].value = False
|
|
return
|
|
|
|
if self.onvif.cams[camera]["init"]:
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
# movement thread per camera
|
|
self.move_threads[camera] = threading.Thread(
|
|
name=f"ptz_move_thread_{camera}",
|
|
target=partial(self._process_move_queue, camera),
|
|
)
|
|
self.move_threads[camera].daemon = True
|
|
self.move_threads[camera].start()
|
|
|
|
if camera_config.onvif.autotracking.movement_weights:
|
|
if len(camera_config.onvif.autotracking.movement_weights) == 5:
|
|
camera_config.onvif.autotracking.movement_weights = [
|
|
float(val)
|
|
for val in camera_config.onvif.autotracking.movement_weights
|
|
]
|
|
self.ptz_metrics[camera][
|
|
"ptz_min_zoom"
|
|
].value = camera_config.onvif.autotracking.movement_weights[0]
|
|
self.ptz_metrics[camera][
|
|
"ptz_max_zoom"
|
|
].value = camera_config.onvif.autotracking.movement_weights[1]
|
|
self.intercept[camera] = (
|
|
camera_config.onvif.autotracking.movement_weights[2]
|
|
)
|
|
self.move_coefficients[camera] = (
|
|
camera_config.onvif.autotracking.movement_weights[3:]
|
|
)
|
|
else:
|
|
camera_config.onvif.autotracking.enabled = False
|
|
self.ptz_metrics[camera]["ptz_autotracker_enabled"].value = False
|
|
logger.warning(
|
|
f"Autotracker recalibration is required for {camera}. Disabling autotracking."
|
|
)
|
|
|
|
if camera_config.onvif.autotracking.calibrate_on_startup:
|
|
self._calibrate_camera(camera)
|
|
|
|
self.ptz_metrics[camera]["ptz_tracking_active"].clear()
|
|
self.dispatcher.publish(f"{camera}/ptz_autotracker/active", "OFF", retain=False)
|
|
self.autotracker_init[camera] = True
|
|
|
|
def _write_config(self, camera):
|
|
config_file = os.environ.get("CONFIG_FILE", f"{CONFIG_DIR}/config.yml")
|
|
|
|
logger.debug(
|
|
f"{camera}: Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}"
|
|
)
|
|
|
|
update_yaml_file(
|
|
config_file,
|
|
["cameras", camera, "onvif", "autotracking", "movement_weights"],
|
|
self.config.cameras[camera].onvif.autotracking.movement_weights,
|
|
)
|
|
|
|
def _calibrate_camera(self, camera):
|
|
# move the camera from the preset in steps and measure the time it takes to move that amount
|
|
# this will allow us to predict movement times with a simple linear regression
|
|
# start with 0 so we can determine a baseline (to be used as the intercept in the regression calc)
|
|
# TODO: take zooming into account too
|
|
num_steps = 30
|
|
step_sizes = np.linspace(0, 1, num_steps)
|
|
zoom_in_values = []
|
|
zoom_out_values = []
|
|
|
|
self.calibrating[camera] = True
|
|
|
|
logger.info(f"Camera calibration for {camera} in progress")
|
|
|
|
# zoom levels test
|
|
if (
|
|
self.config.cameras[camera].onvif.autotracking.zooming
|
|
!= ZoomingModeEnum.disabled
|
|
):
|
|
logger.info(f"Calibration for {camera} in progress: 0% complete")
|
|
|
|
for i in range(2):
|
|
# absolute move to 0 - fully zoomed out
|
|
self.onvif._zoom_absolute(
|
|
camera,
|
|
self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Min"],
|
|
1,
|
|
)
|
|
|
|
while not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
zoom_out_values.append(self.ptz_metrics[camera]["ptz_zoom_level"].value)
|
|
|
|
self.onvif._zoom_absolute(
|
|
camera,
|
|
self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Max"],
|
|
1,
|
|
)
|
|
|
|
while not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
zoom_in_values.append(self.ptz_metrics[camera]["ptz_zoom_level"].value)
|
|
|
|
if (
|
|
self.config.cameras[camera].onvif.autotracking.zooming
|
|
== ZoomingModeEnum.relative
|
|
):
|
|
# relative move to -0.01
|
|
self.onvif._move_relative(
|
|
camera,
|
|
0,
|
|
0,
|
|
-1e-2,
|
|
1,
|
|
)
|
|
|
|
while not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
zoom_out_values.append(
|
|
self.ptz_metrics[camera]["ptz_zoom_level"].value
|
|
)
|
|
|
|
# relative move to 0.01
|
|
self.onvif._move_relative(
|
|
camera,
|
|
0,
|
|
0,
|
|
1e-2,
|
|
1,
|
|
)
|
|
|
|
while not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
zoom_in_values.append(
|
|
self.ptz_metrics[camera]["ptz_zoom_level"].value
|
|
)
|
|
|
|
self.ptz_metrics[camera]["ptz_max_zoom"].value = max(zoom_in_values)
|
|
self.ptz_metrics[camera]["ptz_min_zoom"].value = min(zoom_out_values)
|
|
|
|
logger.debug(
|
|
f'{camera}: Calibration values: max zoom: {self.ptz_metrics[camera]["ptz_max_zoom"].value}, min zoom: {self.ptz_metrics[camera]["ptz_min_zoom"].value}'
|
|
)
|
|
|
|
else:
|
|
self.ptz_metrics[camera]["ptz_max_zoom"].value = 1
|
|
self.ptz_metrics[camera]["ptz_min_zoom"].value = 0
|
|
|
|
self.onvif._move_to_preset(
|
|
camera,
|
|
self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
|
|
)
|
|
self.ptz_metrics[camera]["ptz_reset"].set()
|
|
self.ptz_metrics[camera]["ptz_motor_stopped"].clear()
|
|
|
|
# Wait until the camera finishes moving
|
|
while not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
for step in range(num_steps):
|
|
pan = step_sizes[step]
|
|
tilt = step_sizes[step]
|
|
|
|
start_time = time.time()
|
|
self.onvif._move_relative(camera, pan, tilt, 0, 1)
|
|
|
|
# Wait until the camera finishes moving
|
|
while not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
stop_time = time.time()
|
|
|
|
self.move_metrics[camera].append(
|
|
{
|
|
"pan": pan,
|
|
"tilt": tilt,
|
|
"start_timestamp": start_time,
|
|
"end_timestamp": stop_time,
|
|
}
|
|
)
|
|
|
|
self.onvif._move_to_preset(
|
|
camera,
|
|
self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
|
|
)
|
|
self.ptz_metrics[camera]["ptz_reset"].set()
|
|
self.ptz_metrics[camera]["ptz_motor_stopped"].clear()
|
|
|
|
# Wait until the camera finishes moving
|
|
while not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
logger.info(
|
|
f"Calibration for {camera} in progress: {round((step/num_steps)*100)}% complete"
|
|
)
|
|
|
|
self.calibrating[camera] = False
|
|
|
|
logger.info(f"Calibration for {camera} complete")
|
|
|
|
# calculate and save new intercept and coefficients
|
|
self._calculate_move_coefficients(camera, True)
|
|
|
|
def _calculate_move_coefficients(self, camera, calibration=False):
|
|
# calculate new coefficients when we have 50 more new values. Save up to 500
|
|
if calibration or (
|
|
len(self.move_metrics[camera]) % 50 == 0
|
|
and len(self.move_metrics[camera]) != 0
|
|
and len(self.move_metrics[camera]) <= AUTOTRACKING_MAX_MOVE_METRICS
|
|
):
|
|
X = np.array(
|
|
[abs(d["pan"]) + abs(d["tilt"]) for d in self.move_metrics[camera]]
|
|
)
|
|
y = np.array(
|
|
[
|
|
d["end_timestamp"] - d["start_timestamp"]
|
|
for d in self.move_metrics[camera]
|
|
]
|
|
)
|
|
|
|
# simple linear regression with intercept
|
|
X_with_intercept = np.column_stack((np.ones(X.shape[0]), X))
|
|
self.move_coefficients[camera] = np.linalg.lstsq(
|
|
X_with_intercept, y, rcond=None
|
|
)[0]
|
|
|
|
# only assign a new intercept if we're calibrating
|
|
if calibration:
|
|
self.intercept[camera] = y[0]
|
|
|
|
# write the min zoom, max zoom, intercept, and coefficients
|
|
# back to the config file as a comma separated string
|
|
self.config.cameras[camera].onvif.autotracking.movement_weights = ", ".join(
|
|
str(v)
|
|
for v in [
|
|
self.ptz_metrics[camera]["ptz_min_zoom"].value,
|
|
self.ptz_metrics[camera]["ptz_max_zoom"].value,
|
|
self.intercept[camera],
|
|
*self.move_coefficients[camera],
|
|
]
|
|
)
|
|
|
|
logger.debug(
|
|
f"{camera}: New regression parameters - intercept: {self.intercept[camera]}, coefficients: {self.move_coefficients[camera]}"
|
|
)
|
|
|
|
self._write_config(camera)
|
|
|
|
def _predict_movement_time(self, camera, pan, tilt):
|
|
combined_movement = abs(pan) + abs(tilt)
|
|
input_data = np.array([self.intercept[camera], combined_movement])
|
|
|
|
return np.dot(self.move_coefficients[camera], input_data)
|
|
|
|
def _predict_area_after_time(self, camera, time):
|
|
return np.dot(
|
|
self.tracked_object_metrics[camera]["area_coefficients"],
|
|
[self.tracked_object_history[camera][-1]["frame_time"] + time],
|
|
)
|
|
|
|
def _calculate_tracked_object_metrics(self, camera, obj):
|
|
def remove_outliers(data):
|
|
areas = [item["area"] for item in data]
|
|
|
|
Q1 = np.percentile(areas, 25)
|
|
Q3 = np.percentile(areas, 75)
|
|
IQR = Q3 - Q1
|
|
lower_bound = Q1 - 1.5 * IQR
|
|
upper_bound = Q3 + 1.5 * IQR
|
|
|
|
filtered_data = [
|
|
item for item in data if lower_bound <= item["area"] <= upper_bound
|
|
]
|
|
|
|
# Find and log the removed values
|
|
removed_values = [item for item in data if item not in filtered_data]
|
|
logger.debug(f"{camera}: Removed area outliers: {removed_values}")
|
|
|
|
return filtered_data
|
|
|
|
camera_config = self.config.cameras[camera]
|
|
camera_width = camera_config.frame_shape[1]
|
|
camera_height = camera_config.frame_shape[0]
|
|
|
|
# Extract areas and calculate weighted average
|
|
# grab the largest dimension of the bounding box and create a square from that
|
|
areas = [
|
|
{
|
|
"frame_time": obj["frame_time"],
|
|
"box": obj["box"],
|
|
"area": max(
|
|
obj["box"][2] - obj["box"][0], obj["box"][3] - obj["box"][1]
|
|
)
|
|
** 2,
|
|
}
|
|
for obj in self.tracked_object_history[camera]
|
|
]
|
|
|
|
filtered_areas = remove_outliers(areas) if len(areas) >= 2 else areas
|
|
|
|
# Filter entries that are not touching the frame edge
|
|
filtered_areas_not_touching_edge = [
|
|
entry
|
|
for entry in filtered_areas
|
|
if self._touching_frame_edges(camera, entry["box"]) == 0
|
|
]
|
|
|
|
# Calculate regression for area change predictions
|
|
if len(filtered_areas_not_touching_edge):
|
|
X = np.array(
|
|
[item["frame_time"] for item in filtered_areas_not_touching_edge]
|
|
)
|
|
y = np.array([item["area"] for item in filtered_areas_not_touching_edge])
|
|
|
|
self.tracked_object_metrics[camera]["area_coefficients"] = np.linalg.lstsq(
|
|
X.reshape(-1, 1), y, rcond=None
|
|
)[0]
|
|
else:
|
|
self.tracked_object_metrics[camera]["area_coefficients"] = np.array([0])
|
|
|
|
weights = np.arange(1, len(filtered_areas) + 1)
|
|
weighted_area = np.average(
|
|
[item["area"] for item in filtered_areas], weights=weights
|
|
)
|
|
|
|
self.tracked_object_metrics[camera]["target_box"] = (
|
|
weighted_area / (camera_width * camera_height)
|
|
) ** self.zoom_factor[camera]
|
|
|
|
if "original_target_box" not in self.tracked_object_metrics[camera]:
|
|
self.tracked_object_metrics[camera]["original_target_box"] = (
|
|
self.tracked_object_metrics[camera]["target_box"]
|
|
)
|
|
|
|
(
|
|
self.tracked_object_metrics[camera]["valid_velocity"],
|
|
self.tracked_object_metrics[camera]["velocity"],
|
|
) = self._get_valid_velocity(camera, obj)
|
|
self.tracked_object_metrics[camera]["distance"] = self._get_distance_threshold(
|
|
camera, obj
|
|
)
|
|
|
|
centroid_distance = np.linalg.norm(
|
|
[
|
|
obj.obj_data["centroid"][0] - camera_config.detect.width / 2,
|
|
obj.obj_data["centroid"][1] - camera_config.detect.height / 2,
|
|
]
|
|
)
|
|
|
|
logger.debug(f"{camera}: Centroid distance: {centroid_distance}")
|
|
|
|
self.tracked_object_metrics[camera]["below_distance_threshold"] = (
|
|
centroid_distance < self.tracked_object_metrics[camera]["distance"]
|
|
)
|
|
|
|
def _process_move_queue(self, camera):
|
|
camera_config = self.config.cameras[camera]
|
|
camera_config.frame_shape[1]
|
|
camera_config.frame_shape[0]
|
|
|
|
while not self.stop_event.is_set():
|
|
try:
|
|
move_data = self.move_queues[camera].get(True, 0.1)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
with self.move_queue_locks[camera]:
|
|
frame_time, pan, tilt, zoom = move_data
|
|
|
|
# if we're receiving move requests during a PTZ move, ignore them
|
|
if ptz_moving_at_frame_time(
|
|
frame_time,
|
|
self.ptz_metrics[camera]["ptz_start_time"].value,
|
|
self.ptz_metrics[camera]["ptz_stop_time"].value,
|
|
):
|
|
# instead of dequeueing this might be a good place to preemptively move based
|
|
# on an estimate - for fast moving objects, etc.
|
|
logger.debug(
|
|
f"{camera}: Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}"
|
|
)
|
|
continue
|
|
|
|
else:
|
|
if (
|
|
self.config.cameras[camera].onvif.autotracking.zooming
|
|
== ZoomingModeEnum.relative
|
|
):
|
|
self.onvif._move_relative(camera, pan, tilt, zoom, 1)
|
|
|
|
else:
|
|
if pan != 0 or tilt != 0:
|
|
self.onvif._move_relative(camera, pan, tilt, 0, 1)
|
|
|
|
# Wait until the camera finishes moving
|
|
while not self.ptz_metrics[camera][
|
|
"ptz_motor_stopped"
|
|
].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
if (
|
|
zoom > 0
|
|
and self.ptz_metrics[camera]["ptz_zoom_level"].value != zoom
|
|
):
|
|
self.onvif._zoom_absolute(camera, zoom, 1)
|
|
|
|
# Wait until the camera finishes moving
|
|
while not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
if self.config.cameras[camera].onvif.autotracking.movement_weights:
|
|
logger.debug(
|
|
f"{camera}: Predicted movement time: {self._predict_movement_time(camera, pan, tilt)}"
|
|
)
|
|
logger.debug(
|
|
f'{camera}: Actual movement time: {self.ptz_metrics[camera]["ptz_stop_time"].value-self.ptz_metrics[camera]["ptz_start_time"].value}'
|
|
)
|
|
|
|
# save metrics for better estimate calculations
|
|
if (
|
|
self.intercept[camera] is not None
|
|
and len(self.move_metrics[camera])
|
|
< AUTOTRACKING_MAX_MOVE_METRICS
|
|
and (pan != 0 or tilt != 0)
|
|
and self.config.cameras[
|
|
camera
|
|
].onvif.autotracking.calibrate_on_startup
|
|
):
|
|
logger.debug(f"{camera}: Adding new values to move metrics")
|
|
self.move_metrics[camera].append(
|
|
{
|
|
"pan": pan,
|
|
"tilt": tilt,
|
|
"start_timestamp": self.ptz_metrics[camera][
|
|
"ptz_start_time"
|
|
].value,
|
|
"end_timestamp": self.ptz_metrics[camera][
|
|
"ptz_stop_time"
|
|
].value,
|
|
}
|
|
)
|
|
|
|
# calculate new coefficients if we have enough data
|
|
self._calculate_move_coefficients(camera)
|
|
|
|
def _enqueue_move(self, camera, frame_time, pan, tilt, zoom):
|
|
def split_value(value, suppress_diff=True):
|
|
clipped = np.clip(value, -1, 1)
|
|
|
|
# don't make small movements
|
|
if -0.05 < clipped < 0.05 and suppress_diff:
|
|
diff = 0.0
|
|
else:
|
|
diff = value - clipped
|
|
|
|
return clipped, diff
|
|
|
|
if (
|
|
frame_time > self.ptz_metrics[camera]["ptz_start_time"].value
|
|
and frame_time > self.ptz_metrics[camera]["ptz_stop_time"].value
|
|
and not self.move_queue_locks[camera].locked()
|
|
):
|
|
# we can split up any large moves caused by velocity estimated movements if necessary
|
|
# get an excess amount and assign it instead of 0 below
|
|
while pan != 0 or tilt != 0 or zoom != 0:
|
|
pan, _ = split_value(pan)
|
|
tilt, _ = split_value(tilt)
|
|
zoom, _ = split_value(zoom, False)
|
|
|
|
logger.debug(
|
|
f"{camera}: Enqueue movement for frame time: {frame_time} pan: {pan}, tilt: {tilt}, zoom: {zoom}"
|
|
)
|
|
move_data = (frame_time, pan, tilt, zoom)
|
|
self.move_queues[camera].put(move_data)
|
|
|
|
# reset values to not split up large movements
|
|
pan = 0
|
|
tilt = 0
|
|
zoom = 0
|
|
|
|
def _touching_frame_edges(self, camera, box):
|
|
camera_config = self.config.cameras[camera]
|
|
camera_width = camera_config.frame_shape[1]
|
|
camera_height = camera_config.frame_shape[0]
|
|
bb_left, bb_top, bb_right, bb_bottom = box
|
|
|
|
edge_threshold = AUTOTRACKING_ZOOM_EDGE_THRESHOLD
|
|
|
|
return int(
|
|
(bb_left < edge_threshold * camera_width)
|
|
+ (bb_right > (1 - edge_threshold) * camera_width)
|
|
+ (bb_top < edge_threshold * camera_height)
|
|
+ (bb_bottom > (1 - edge_threshold) * camera_height)
|
|
)
|
|
|
|
def _get_valid_velocity(self, camera, obj):
|
|
# returns a tuple and euclidean distance if the estimated velocity is valid
|
|
# if invalid, returns [0, 0] and -1
|
|
camera_config = self.config.cameras[camera]
|
|
camera_width = camera_config.frame_shape[1]
|
|
camera_height = camera_config.frame_shape[0]
|
|
camera_fps = camera_config.detect.fps
|
|
|
|
# estimate_velocity is a numpy array of bbox top,left and bottom,right velocities
|
|
velocities = obj.obj_data["estimate_velocity"]
|
|
logger.debug(
|
|
f"{camera}: Velocity (Norfair): {tuple(np.round(velocities).flatten().astype(int))}"
|
|
)
|
|
|
|
# if we are close enough to zero, return right away
|
|
if np.all(np.round(velocities) == 0):
|
|
return True, np.zeros((4,))
|
|
|
|
# Thresholds
|
|
x_mags_thresh = camera_width / camera_fps / 2
|
|
y_mags_thresh = camera_height / camera_fps / 2
|
|
dir_thresh = 0.93
|
|
delta_thresh = 20
|
|
var_thresh = 10
|
|
|
|
# Check magnitude
|
|
x_mags = np.abs(velocities[:, 0])
|
|
y_mags = np.abs(velocities[:, 1])
|
|
invalid_x_mags = np.any(x_mags > x_mags_thresh)
|
|
invalid_y_mags = np.any(y_mags > y_mags_thresh)
|
|
|
|
# Check delta
|
|
delta = np.abs(velocities[0] - velocities[1])
|
|
invalid_delta = np.any(delta > delta_thresh)
|
|
|
|
# Check variance
|
|
stdevs = np.std(velocities, axis=0)
|
|
high_variances = np.any(stdevs > var_thresh)
|
|
|
|
# Check direction difference
|
|
velocities = np.round(velocities)
|
|
invalid_dirs = False
|
|
if not np.any(np.linalg.norm(velocities, axis=1)):
|
|
cosine_sim = np.dot(velocities[0], velocities[1]) / (
|
|
np.linalg.norm(velocities[0]) * np.linalg.norm(velocities[1])
|
|
)
|
|
dir_thresh = 0.6 if np.all(delta < delta_thresh / 2) else dir_thresh
|
|
invalid_dirs = cosine_sim < dir_thresh
|
|
|
|
# Combine
|
|
invalid = (
|
|
invalid_x_mags
|
|
or invalid_y_mags
|
|
or invalid_dirs
|
|
or invalid_delta
|
|
or high_variances
|
|
)
|
|
|
|
if invalid:
|
|
logger.debug(
|
|
f"{camera}: Invalid velocity: {tuple(np.round(velocities, 2).flatten().astype(int))}: Invalid because: "
|
|
+ ", ".join(
|
|
[
|
|
var_name
|
|
for var_name, is_invalid in [
|
|
("invalid_x_mags", invalid_x_mags),
|
|
("invalid_y_mags", invalid_y_mags),
|
|
("invalid_dirs", invalid_dirs),
|
|
("invalid_delta", invalid_delta),
|
|
("high_variances", high_variances),
|
|
]
|
|
if is_invalid
|
|
]
|
|
)
|
|
)
|
|
# invalid velocity
|
|
return False, np.zeros((4,))
|
|
else:
|
|
logger.debug(f"{camera}: Valid velocity ")
|
|
return True, velocities.flatten()
|
|
|
|
def _get_distance_threshold(self, camera, obj):
|
|
# Returns true if Euclidean distance from object to center of frame is
|
|
# less than 10% of the of the larger dimension (width or height) of the frame,
|
|
# multiplied by a scaling factor for object size.
|
|
# Distance is increased if object is not moving to prevent small ptz moves
|
|
# Adjusting this percentage slightly lower will effectively cause the camera to move
|
|
# more often to keep the object in the center. Raising the percentage will cause less
|
|
# movement and will be more flexible with objects not quite being centered.
|
|
# TODO: there's probably a better way to approach this
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
obj_width = obj.obj_data["box"][2] - obj.obj_data["box"][0]
|
|
obj_height = obj.obj_data["box"][3] - obj.obj_data["box"][1]
|
|
|
|
max_obj = max(obj_width, obj_height)
|
|
max_frame = (
|
|
camera_config.detect.width
|
|
if max_obj == obj_width
|
|
else camera_config.detect.height
|
|
)
|
|
|
|
# larger objects should lower the threshold, smaller objects should raise it
|
|
scaling_factor = 1 - np.log(max_obj / max_frame)
|
|
|
|
percentage = (
|
|
0.08
|
|
if camera_config.onvif.autotracking.movement_weights
|
|
and self.tracked_object_metrics[camera]["valid_velocity"]
|
|
else 0.03
|
|
)
|
|
distance_threshold = percentage * max_frame * scaling_factor
|
|
|
|
logger.debug(f"{camera}: Distance threshold: {distance_threshold}")
|
|
|
|
return distance_threshold
|
|
|
|
def _should_zoom_in(self, camera, obj, box, predicted_time, debug_zooming=False):
|
|
# returns True if we should zoom in, False if we should zoom out, None to do nothing
|
|
camera_config = self.config.cameras[camera]
|
|
camera_width = camera_config.frame_shape[1]
|
|
camera_height = camera_config.frame_shape[0]
|
|
camera_fps = camera_config.detect.fps
|
|
|
|
average_velocity = self.tracked_object_metrics[camera]["velocity"]
|
|
|
|
bb_left, bb_top, bb_right, bb_bottom = box
|
|
|
|
# calculate a velocity threshold based on movement coefficients if available
|
|
if camera_config.onvif.autotracking.movement_weights:
|
|
predicted_movement_time = self._predict_movement_time(camera, 1, 1)
|
|
velocity_threshold_x = camera_width / predicted_movement_time / camera_fps
|
|
velocity_threshold_y = camera_height / predicted_movement_time / camera_fps
|
|
else:
|
|
# use a generic velocity threshold
|
|
velocity_threshold_x = camera_width * 0.02
|
|
velocity_threshold_y = camera_height * 0.02
|
|
|
|
# return a count of the number of frame edges the bounding box is touching
|
|
touching_frame_edges = self._touching_frame_edges(camera, box)
|
|
|
|
# make sure object is centered in the frame
|
|
below_distance_threshold = self.tracked_object_metrics[camera][
|
|
"below_distance_threshold"
|
|
]
|
|
|
|
below_dimension_threshold = (bb_right - bb_left) <= camera_width * (
|
|
self.zoom_factor[camera] + 0.1
|
|
) and (bb_bottom - bb_top) <= camera_height * (self.zoom_factor[camera] + 0.1)
|
|
|
|
# ensure object is not moving quickly
|
|
below_velocity_threshold = np.all(
|
|
np.abs(average_velocity)
|
|
< np.tile([velocity_threshold_x, velocity_threshold_y], 2)
|
|
) or np.all(average_velocity == 0)
|
|
|
|
if not predicted_time:
|
|
calculated_target_box = self.tracked_object_metrics[camera]["target_box"]
|
|
else:
|
|
calculated_target_box = self.tracked_object_metrics[camera][
|
|
"target_box"
|
|
] + self._predict_area_after_time(camera, predicted_time) / (
|
|
camera_width * camera_height
|
|
)
|
|
|
|
below_area_threshold = (
|
|
calculated_target_box
|
|
< self.tracked_object_metrics[camera]["max_target_box"]
|
|
)
|
|
|
|
# introduce some hysteresis to prevent a yo-yo zooming effect
|
|
zoom_out_hysteresis = (
|
|
calculated_target_box
|
|
> self.tracked_object_metrics[camera]["max_target_box"]
|
|
* AUTOTRACKING_ZOOM_OUT_HYSTERESIS
|
|
)
|
|
zoom_in_hysteresis = (
|
|
calculated_target_box
|
|
< self.tracked_object_metrics[camera]["max_target_box"]
|
|
* AUTOTRACKING_ZOOM_IN_HYSTERESIS
|
|
)
|
|
|
|
at_max_zoom = (
|
|
self.ptz_metrics[camera]["ptz_zoom_level"].value
|
|
== self.ptz_metrics[camera]["ptz_max_zoom"].value
|
|
)
|
|
at_min_zoom = (
|
|
self.ptz_metrics[camera]["ptz_zoom_level"].value
|
|
== self.ptz_metrics[camera]["ptz_min_zoom"].value
|
|
)
|
|
|
|
# debug zooming
|
|
if debug_zooming:
|
|
logger.debug(
|
|
f"{camera}: Zoom test: touching edges: count: {touching_frame_edges} left: {bb_left < AUTOTRACKING_ZOOM_EDGE_THRESHOLD * camera_width}, right: {bb_right > (1 - AUTOTRACKING_ZOOM_EDGE_THRESHOLD) * camera_width}, top: {bb_top < AUTOTRACKING_ZOOM_EDGE_THRESHOLD * camera_height}, bottom: {bb_bottom > (1 - AUTOTRACKING_ZOOM_EDGE_THRESHOLD) * camera_height}"
|
|
)
|
|
logger.debug(
|
|
f"{camera}: Zoom test: below distance threshold: {(below_distance_threshold)}"
|
|
)
|
|
logger.debug(
|
|
f"{camera}: Zoom test: below area threshold: {(below_area_threshold)} target: {self.tracked_object_metrics[camera]['target_box']}, calculated: {calculated_target_box}, max: {self.tracked_object_metrics[camera]['max_target_box']}"
|
|
)
|
|
logger.debug(
|
|
f"{camera}: Zoom test: below dimension threshold: {below_dimension_threshold} width: {bb_right - bb_left}, max width: {camera_width * (self.zoom_factor[camera] + 0.1)}, height: {bb_bottom - bb_top}, max height: {camera_height * (self.zoom_factor[camera] + 0.1)}"
|
|
)
|
|
logger.debug(
|
|
f"{camera}: Zoom test: below velocity threshold: {below_velocity_threshold} velocity x: {abs(average_velocity[0])}, x threshold: {velocity_threshold_x}, velocity y: {abs(average_velocity[0])}, y threshold: {velocity_threshold_y}"
|
|
)
|
|
logger.debug(f"{camera}: Zoom test: at max zoom: {at_max_zoom}")
|
|
logger.debug(f"{camera}: Zoom test: at min zoom: {at_min_zoom}")
|
|
logger.debug(
|
|
f'{camera}: Zoom test: zoom in hysteresis limit: {zoom_in_hysteresis} value: {AUTOTRACKING_ZOOM_IN_HYSTERESIS} original: {self.tracked_object_metrics[camera]["original_target_box"]} max: {self.tracked_object_metrics[camera]["max_target_box"]} target: {calculated_target_box if calculated_target_box else self.tracked_object_metrics[camera]["target_box"]}'
|
|
)
|
|
logger.debug(
|
|
f'{camera}: Zoom test: zoom out hysteresis limit: {zoom_out_hysteresis} value: {AUTOTRACKING_ZOOM_OUT_HYSTERESIS} original: {self.tracked_object_metrics[camera]["original_target_box"]} max: {self.tracked_object_metrics[camera]["max_target_box"]} target: {calculated_target_box if calculated_target_box else self.tracked_object_metrics[camera]["target_box"]}'
|
|
)
|
|
|
|
# Zoom in conditions (and)
|
|
if (
|
|
zoom_in_hysteresis
|
|
and touching_frame_edges == 0
|
|
and below_velocity_threshold
|
|
and below_dimension_threshold
|
|
and below_area_threshold
|
|
and not at_max_zoom
|
|
):
|
|
return True
|
|
|
|
# Zoom out conditions (or)
|
|
if (
|
|
(
|
|
zoom_out_hysteresis
|
|
and not at_max_zoom
|
|
and (not below_area_threshold or not below_dimension_threshold)
|
|
)
|
|
or (zoom_out_hysteresis and not below_area_threshold and at_max_zoom)
|
|
or (
|
|
touching_frame_edges == 1
|
|
and (below_distance_threshold or not below_dimension_threshold)
|
|
)
|
|
or touching_frame_edges > 1
|
|
or not below_velocity_threshold
|
|
) and not at_min_zoom:
|
|
return False
|
|
|
|
# Don't zoom at all
|
|
return None
|
|
|
|
def _autotrack_move_ptz(self, camera, obj):
|
|
camera_config = self.config.cameras[camera]
|
|
camera_width = camera_config.frame_shape[1]
|
|
camera_height = camera_config.frame_shape[0]
|
|
camera_fps = camera_config.detect.fps
|
|
predicted_movement_time = 0
|
|
|
|
average_velocity = np.zeros((4,))
|
|
predicted_box = obj.obj_data["box"]
|
|
|
|
centroid_x = obj.obj_data["centroid"][0]
|
|
centroid_y = obj.obj_data["centroid"][1]
|
|
|
|
# Normalize coordinates. top right of the fov is (1,1), center is (0,0), bottom left is (-1, -1).
|
|
pan = ((centroid_x / camera_width) - 0.5) * 2
|
|
tilt = (0.5 - (centroid_y / camera_height)) * 2
|
|
|
|
if (
|
|
camera_config.onvif.autotracking.movement_weights
|
|
): # use estimates if we have available coefficients
|
|
predicted_movement_time = self._predict_movement_time(camera, pan, tilt)
|
|
|
|
_, average_velocity = (
|
|
self._get_valid_velocity(camera, obj)
|
|
if "velocity" not in self.tracked_object_metrics[camera]
|
|
else (
|
|
self.tracked_object_metrics[camera]["valid_velocity"],
|
|
self.tracked_object_metrics[camera]["velocity"],
|
|
)
|
|
)
|
|
|
|
if np.any(average_velocity):
|
|
# this box could exceed the frame boundaries if velocity is high
|
|
# but we'll handle that in _enqueue_move() as two separate moves
|
|
current_box = np.array(obj.obj_data["box"])
|
|
predicted_box = (
|
|
current_box
|
|
+ camera_fps * predicted_movement_time * average_velocity
|
|
)
|
|
|
|
predicted_box = np.round(predicted_box).astype(int)
|
|
|
|
centroid_x = round((predicted_box[0] + predicted_box[2]) / 2)
|
|
centroid_y = round((predicted_box[1] + predicted_box[3]) / 2)
|
|
|
|
# recalculate pan and tilt with new centroid
|
|
pan = ((centroid_x / camera_width) - 0.5) * 2
|
|
tilt = (0.5 - (centroid_y / camera_height)) * 2
|
|
|
|
logger.debug(f'{camera}: Original box: {obj.obj_data["box"]}')
|
|
logger.debug(f"{camera}: Predicted box: {tuple(predicted_box)}")
|
|
logger.debug(
|
|
f"{camera}: Velocity: {tuple(np.round(average_velocity).flatten().astype(int))}"
|
|
)
|
|
|
|
zoom = self._get_zoom_amount(
|
|
camera, obj, predicted_box, predicted_movement_time, debug_zoom=True
|
|
)
|
|
|
|
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt, zoom)
|
|
|
|
def _autotrack_move_zoom_only(self, camera, obj):
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
if camera_config.onvif.autotracking.zooming != ZoomingModeEnum.disabled:
|
|
zoom = self._get_zoom_amount(camera, obj, obj.obj_data["box"], 0)
|
|
|
|
if zoom != 0:
|
|
self._enqueue_move(camera, obj.obj_data["frame_time"], 0, 0, zoom)
|
|
|
|
def _get_zoom_amount(
|
|
self, camera, obj, predicted_box, predicted_movement_time, debug_zoom=True
|
|
):
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
# frame width and height
|
|
camera_width = camera_config.frame_shape[1]
|
|
camera_height = camera_config.frame_shape[0]
|
|
|
|
zoom = 0
|
|
result = None
|
|
current_zoom_level = self.ptz_metrics[camera]["ptz_zoom_level"].value
|
|
target_box = max(
|
|
obj.obj_data["box"][2] - obj.obj_data["box"][0],
|
|
obj.obj_data["box"][3] - obj.obj_data["box"][1],
|
|
) ** 2 / (camera_width * camera_height)
|
|
|
|
# absolute zooming separately from pan/tilt
|
|
if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.absolute:
|
|
# don't zoom on initial move
|
|
if "target_box" not in self.tracked_object_metrics[camera]:
|
|
zoom = current_zoom_level
|
|
else:
|
|
if (
|
|
result := self._should_zoom_in(
|
|
camera,
|
|
obj,
|
|
obj.obj_data["box"],
|
|
predicted_movement_time,
|
|
debug_zoom,
|
|
)
|
|
) is not None:
|
|
# divide zoom in 10 increments and always zoom out more than in
|
|
level = (
|
|
self.ptz_metrics[camera]["ptz_max_zoom"].value
|
|
- self.ptz_metrics[camera]["ptz_min_zoom"].value
|
|
) / 20
|
|
if result:
|
|
zoom = min(1.0, current_zoom_level + level)
|
|
else:
|
|
zoom = max(0.0, current_zoom_level - 2 * level)
|
|
|
|
# relative zooming concurrently with pan/tilt
|
|
if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.relative:
|
|
# this is our initial zoom in on a new object
|
|
if "target_box" not in self.tracked_object_metrics[camera]:
|
|
zoom = target_box ** self.zoom_factor[camera]
|
|
if zoom > self.tracked_object_metrics[camera]["max_target_box"]:
|
|
zoom = -(1 - zoom)
|
|
logger.debug(
|
|
f"{camera}: target box: {target_box}, max: {self.tracked_object_metrics[camera]['max_target_box']}, calc zoom: {zoom}"
|
|
)
|
|
else:
|
|
if (
|
|
result := self._should_zoom_in(
|
|
camera,
|
|
obj,
|
|
predicted_box
|
|
if camera_config.onvif.autotracking.movement_weights
|
|
else obj.obj_data["box"],
|
|
predicted_movement_time,
|
|
debug_zoom,
|
|
)
|
|
) is not None:
|
|
if predicted_movement_time:
|
|
calculated_target_box = self.tracked_object_metrics[camera][
|
|
"target_box"
|
|
] + self._predict_area_after_time(
|
|
camera, predicted_movement_time
|
|
) / (camera_width * camera_height)
|
|
logger.debug(
|
|
f"{camera}: Zooming prediction: predicted movement time: {predicted_movement_time}, original box: {self.tracked_object_metrics[camera]['target_box']}, calculated box: {calculated_target_box}"
|
|
)
|
|
else:
|
|
calculated_target_box = self.tracked_object_metrics[camera][
|
|
"target_box"
|
|
]
|
|
# zoom value
|
|
ratio = (
|
|
self.tracked_object_metrics[camera]["max_target_box"]
|
|
/ calculated_target_box
|
|
)
|
|
zoom = (ratio - 1) / (ratio + 1)
|
|
logger.debug(
|
|
f'{camera}: limit: {self.tracked_object_metrics[camera]["max_target_box"]}, ratio: {ratio} zoom calculation: {zoom}'
|
|
)
|
|
if not result:
|
|
# zoom out with special condition if zooming out because of velocity, edges, etc.
|
|
zoom = -(1 - zoom) if zoom > 0 else -(zoom * 2 + 1)
|
|
if result:
|
|
# zoom in
|
|
zoom = 1 - zoom if zoom > 0 else (zoom * 2 + 1)
|
|
|
|
logger.debug(f"{camera}: Zooming: {result} Zoom amount: {zoom}")
|
|
|
|
return zoom
|
|
|
|
def is_autotracking(self, camera):
|
|
return self.tracked_object[camera] is not None
|
|
|
|
def autotracked_object_region(self, camera):
|
|
return self.tracked_object[camera]["region"]
|
|
|
|
def autotrack_object(self, camera, obj):
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
if camera_config.onvif.autotracking.enabled:
|
|
if not self.autotracker_init[camera]:
|
|
self._autotracker_setup(camera_config, camera)
|
|
|
|
if self.calibrating[camera]:
|
|
logger.debug(f"{camera}: Calibrating camera")
|
|
return
|
|
|
|
# this is a brand new object that's on our camera, has our label, entered the zone,
|
|
# is not a false positive, and is not initially motionless
|
|
if (
|
|
# new object
|
|
self.tracked_object[camera] is None
|
|
and obj.camera == camera
|
|
and obj.obj_data["label"] in self.object_types[camera]
|
|
and set(obj.entered_zones) & set(self.required_zones[camera])
|
|
and not obj.previous["false_positive"]
|
|
and not obj.false_positive
|
|
and not self.tracked_object_history[camera]
|
|
and obj.obj_data["motionless_count"] == 0
|
|
):
|
|
logger.debug(
|
|
f"{camera}: New object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
|
)
|
|
self.ptz_metrics[camera]["ptz_tracking_active"].set()
|
|
self.dispatcher.publish(
|
|
f"{camera}/ptz_autotracker/active", "ON", retain=False
|
|
)
|
|
self.tracked_object[camera] = obj
|
|
|
|
self.tracked_object_history[camera].append(copy.deepcopy(obj.obj_data))
|
|
self._autotrack_move_ptz(camera, obj)
|
|
|
|
return
|
|
|
|
if (
|
|
# already tracking an object
|
|
self.tracked_object[camera] is not None
|
|
and self.tracked_object_history[camera]
|
|
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
|
|
and obj.obj_data["frame_time"]
|
|
!= self.tracked_object_history[camera][-1]["frame_time"]
|
|
):
|
|
self.tracked_object_history[camera].append(copy.deepcopy(obj.obj_data))
|
|
self._calculate_tracked_object_metrics(camera, obj)
|
|
|
|
if not ptz_moving_at_frame_time(
|
|
obj.obj_data["frame_time"],
|
|
self.ptz_metrics[camera]["ptz_start_time"].value,
|
|
self.ptz_metrics[camera]["ptz_stop_time"].value,
|
|
):
|
|
if self.tracked_object_metrics[camera]["below_distance_threshold"]:
|
|
logger.debug(
|
|
f"{camera}: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
|
)
|
|
|
|
# no need to move, but try zooming
|
|
self._autotrack_move_zoom_only(camera, obj)
|
|
else:
|
|
logger.debug(
|
|
f"{camera}: Existing object (need to move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
|
)
|
|
|
|
self._autotrack_move_ptz(camera, obj)
|
|
|
|
return
|
|
|
|
if (
|
|
# The tracker lost an object, so let's check the previous object's region and compare it with the incoming object
|
|
# If it's within bounds, start tracking that object.
|
|
# Should we check region (maybe too broad) or expand the previous object's box a bit and check that?
|
|
self.tracked_object[camera] is None
|
|
and obj.camera == camera
|
|
and obj.obj_data["label"] in self.object_types[camera]
|
|
and not obj.previous["false_positive"]
|
|
and not obj.false_positive
|
|
and self.tracked_object_history[camera]
|
|
):
|
|
if (
|
|
intersection_over_union(
|
|
self.tracked_object_history[camera][-1]["region"],
|
|
obj.obj_data["box"],
|
|
)
|
|
< 0.2
|
|
):
|
|
logger.debug(
|
|
f"{camera}: Reacquired object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
|
)
|
|
self.tracked_object[camera] = obj
|
|
|
|
self.tracked_object_history[camera].clear()
|
|
self.tracked_object_history[camera].append(
|
|
copy.deepcopy(obj.obj_data)
|
|
)
|
|
self._calculate_tracked_object_metrics(camera, obj)
|
|
self._autotrack_move_ptz(camera, obj)
|
|
|
|
return
|
|
|
|
def end_object(self, camera, obj):
|
|
if self.config.cameras[camera].onvif.autotracking.enabled:
|
|
if (
|
|
self.tracked_object[camera] is not None
|
|
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
|
|
):
|
|
logger.debug(
|
|
f"{camera}: End object: {obj.obj_data['id']} {obj.obj_data['box']}"
|
|
)
|
|
self.tracked_object[camera] = None
|
|
self.tracked_object_metrics[camera] = {
|
|
"max_target_box": AUTOTRACKING_MAX_AREA_RATIO
|
|
** (1 / self.zoom_factor[camera])
|
|
}
|
|
|
|
def camera_maintenance(self, camera):
|
|
# bail and don't check anything if we're calibrating or tracking an object
|
|
if (
|
|
not self.autotracker_init[camera]
|
|
or self.calibrating[camera]
|
|
or self.tracked_object[camera] is not None
|
|
):
|
|
return
|
|
|
|
# calls get_camera_status to check/update ptz movement
|
|
# returns camera to preset after timeout when tracking is over
|
|
autotracker_config = self.config.cameras[camera].onvif.autotracking
|
|
|
|
if not self.autotracker_init[camera]:
|
|
self._autotracker_setup(self.config.cameras[camera], camera)
|
|
# regularly update camera status
|
|
if not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
# return to preset if tracking is over
|
|
if (
|
|
self.tracked_object[camera] is None
|
|
and self.tracked_object_history[camera]
|
|
and (
|
|
# might want to use a different timestamp here?
|
|
self.ptz_metrics[camera]["ptz_frame_time"].value
|
|
- self.tracked_object_history[camera][-1]["frame_time"]
|
|
>= autotracker_config.timeout
|
|
)
|
|
and autotracker_config.return_preset
|
|
):
|
|
# clear tracked object and reset zoom level
|
|
self.tracked_object[camera] = None
|
|
self.tracked_object_history[camera].clear()
|
|
|
|
# empty move queue
|
|
while not self.move_queues[camera].empty():
|
|
self.move_queues[camera].get()
|
|
|
|
self.ptz_metrics[camera]["ptz_motor_stopped"].wait()
|
|
logger.debug(
|
|
f"{camera}: Time is {self.ptz_metrics[camera]['ptz_frame_time'].value}, returning to preset: {autotracker_config.return_preset}"
|
|
)
|
|
self.onvif._move_to_preset(
|
|
camera,
|
|
autotracker_config.return_preset.lower(),
|
|
)
|
|
|
|
# update stored zoom level from preset
|
|
if not self.ptz_metrics[camera]["ptz_motor_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
self.ptz_metrics[camera]["ptz_tracking_active"].clear()
|
|
self.dispatcher.publish(
|
|
f"{camera}/ptz_autotracker/active", "OFF", retain=False
|
|
)
|
|
self.ptz_metrics[camera]["ptz_reset"].set()
|