2023-07-08 14:04:47 +02:00
""" Automatically pan, tilt, and zoom on detected objects via onvif. """
import copy
import logging
2023-09-27 13:19:10 +02:00
import os
2023-07-08 14:04:47 +02:00
import queue
import threading
import time
2023-10-22 18:59:13 +02:00
from collections import deque
2023-07-08 14:04:47 +02:00
from functools import partial
from multiprocessing . synchronize import Event as MpEvent
import cv2
import numpy as np
2023-09-27 13:19:10 +02:00
from norfair . camera_motion import (
HomographyTransformationGetter ,
MotionEstimator ,
TranslationTransformationGetter ,
)
2023-11-02 00:20:26 +01:00
from frigate . comms . dispatcher import Dispatcher
2023-09-27 13:19:10 +02:00
from frigate . config import CameraConfig , FrigateConfig , ZoomingModeEnum
2023-10-22 18:59:13 +02:00
from frigate . const import (
AUTOTRACKING_MAX_AREA_RATIO ,
AUTOTRACKING_MAX_MOVE_METRICS ,
AUTOTRACKING_MOTION_MAX_POINTS ,
AUTOTRACKING_MOTION_MIN_DISTANCE ,
AUTOTRACKING_ZOOM_EDGE_THRESHOLD ,
AUTOTRACKING_ZOOM_IN_HYSTERESIS ,
AUTOTRACKING_ZOOM_OUT_HYSTERESIS ,
CONFIG_DIR ,
)
2023-07-08 14:04:47 +02:00
from frigate . ptz . onvif import OnvifController
2023-07-11 13:23:20 +02:00
from frigate . types import PTZMetricsTypes
2023-09-27 13:19:10 +02:00
from frigate . util . builtin import update_yaml_file
2023-07-08 14:04:47 +02:00
from frigate . util . image import SharedMemoryFrameManager , intersection_over_union
logger = logging . getLogger ( __name__ )
2023-07-11 13:23:20 +02:00
def ptz_moving_at_frame_time ( frame_time , ptz_start_time , ptz_stop_time ) :
# Determine if the PTZ was in motion at the set frame time
# for non ptz/autotracking cameras, this will always return False
# ptz_start_time is initialized to 0 on startup and only changes
# when autotracking movements are made
2023-09-27 13:19:10 +02:00
return ( ptz_start_time != 0.0 and frame_time > ptz_start_time ) and (
ptz_stop_time == 0.0 or ( ptz_start_time < = frame_time < = ptz_stop_time )
2023-07-11 13:23:20 +02:00
)
2023-07-08 14:04:47 +02:00
class PtzMotionEstimator :
2023-07-13 12:32:51 +02:00
def __init__ (
self , config : CameraConfig , ptz_metrics : dict [ str , PTZMetricsTypes ]
) - > None :
2023-07-08 14:04:47 +02:00
self . frame_manager = SharedMemoryFrameManager ( )
2023-07-13 12:32:51 +02:00
self . norfair_motion_estimator = None
2023-07-08 14:04:47 +02:00
self . camera_config = config
self . coord_transformations = None
2023-07-11 13:23:20 +02:00
self . ptz_metrics = ptz_metrics
self . ptz_start_time = self . ptz_metrics [ " ptz_start_time " ]
self . ptz_stop_time = self . ptz_metrics [ " ptz_stop_time " ]
2023-07-13 12:32:51 +02:00
self . ptz_metrics [ " ptz_reset " ] . set ( )
2023-10-22 18:59:13 +02:00
logger . debug ( f " { config . name } : Motion estimator init " )
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
def motion_estimator ( self , detections , frame_time , camera ) :
2023-07-13 12:32:51 +02:00
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
if self . ptz_metrics [ " ptz_reset " ] . is_set ( ) :
self . ptz_metrics [ " ptz_reset " ] . clear ( )
2023-09-27 13:19:10 +02:00
2023-07-13 12:32:51 +02:00
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
2023-09-27 13:19:10 +02:00
if (
self . camera_config . onvif . autotracking . zooming
!= ZoomingModeEnum . disabled
) :
2023-10-22 18:59:13 +02:00
logger . debug ( f " { camera } : Motion estimator reset - homography " )
2023-09-27 13:19:10 +02:00
transformation_type = HomographyTransformationGetter ( )
else :
2023-10-22 18:59:13 +02:00
logger . debug ( f " { camera } : Motion estimator reset - translation " )
2023-09-27 13:19:10 +02:00
transformation_type = TranslationTransformationGetter ( )
2023-07-13 12:32:51 +02:00
self . norfair_motion_estimator = MotionEstimator (
2023-09-27 13:19:10 +02:00
transformations_getter = transformation_type ,
2023-10-22 18:59:13 +02:00
min_distance = AUTOTRACKING_MOTION_MIN_DISTANCE ,
max_points = AUTOTRACKING_MOTION_MAX_POINTS ,
2023-07-13 12:32:51 +02:00
)
2023-09-27 13:19:10 +02:00
2023-07-13 12:32:51 +02:00
self . coord_transformations = None
2023-07-11 13:23:20 +02:00
if ptz_moving_at_frame_time (
frame_time , self . ptz_start_time . value , self . ptz_stop_time . value
2023-07-08 14:04:47 +02:00
) :
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : Motion estimator running - frame time: { frame_time } "
2023-07-08 14:04:47 +02:00
)
2023-10-22 18:59:13 +02:00
frame_id = f " { camera } { frame_time } "
2023-07-08 14:04:47 +02:00
yuv_frame = self . frame_manager . get (
frame_id , self . camera_config . frame_shape_yuv
)
frame = cv2 . cvtColor ( yuv_frame , cv2 . COLOR_YUV2GRAY_I420 )
# mask out detections for better motion estimation
mask = np . ones ( frame . shape [ : 2 ] , frame . dtype )
detection_boxes = [ x [ 2 ] for x in detections ]
for detection in detection_boxes :
x1 , y1 , x2 , y2 = detection
mask [ y1 : y2 , x1 : x2 ] = 0
# merge camera config motion mask with detections. Norfair function needs 0,1 mask
mask = np . bitwise_and ( mask , self . camera_config . motion . mask ) . clip ( max = 1 )
# Norfair estimator function needs color so it can convert it right back to gray
frame = cv2 . cvtColor ( frame , cv2 . COLOR_GRAY2BGRA )
2023-09-29 01:21:37 +02:00
try :
self . coord_transformations = self . norfair_motion_estimator . update (
frame , mask
)
except Exception :
# sometimes opencv can't find enough features in the image to find homography, so catch this error
2023-10-22 18:59:13 +02:00
# https://github.com/tryolabs/norfair/pull/278
2023-09-29 01:21:37 +02:00
logger . warning (
2023-10-22 18:59:13 +02:00
f " Autotracker: motion estimator couldn ' t get transformations for { camera } at frame time { frame_time } "
2023-09-29 01:21:37 +02:00
)
self . coord_transformations = None
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
try :
logger . debug (
f " { camera } : Motion estimator transformation: { self . coord_transformations . rel_to_abs ( [ [ 0 , 0 ] ] ) } "
)
except Exception :
pass
2023-07-08 14:04:47 +02:00
self . frame_manager . close ( frame_id )
2023-07-11 13:23:20 +02:00
return self . coord_transformations
2023-07-08 14:04:47 +02:00
class PtzAutoTrackerThread ( threading . Thread ) :
def __init__ (
self ,
config : FrigateConfig ,
onvif : OnvifController ,
2023-07-11 13:23:20 +02:00
ptz_metrics : dict [ str , PTZMetricsTypes ] ,
2023-11-02 00:20:26 +01:00
dispatcher : Dispatcher ,
2023-07-08 14:04:47 +02:00
stop_event : MpEvent ,
) - > None :
threading . Thread . __init__ ( self )
self . name = " ptz_autotracker "
2023-11-11 01:12:20 +01:00
self . ptz_autotracker = PtzAutoTracker (
config , onvif , ptz_metrics , dispatcher , stop_event
)
2023-07-08 14:04:47 +02:00
self . stop_event = stop_event
self . config = config
def run ( self ) :
2023-07-15 02:01:10 +02:00
while not self . stop_event . wait ( 1 ) :
2023-10-22 18:59:13 +02:00
for camera , camera_config in self . config . cameras . items ( ) :
if not camera_config . enabled :
2023-09-24 12:05:29 +02:00
continue
2023-10-22 18:59:13 +02:00
if camera_config . onvif . autotracking . enabled :
self . ptz_autotracker . camera_maintenance ( camera )
2023-07-08 14:04:47 +02:00
else :
# disabled dynamically by mqtt
2023-10-22 18:59:13 +02:00
if self . ptz_autotracker . tracked_object . get ( camera ) :
self . ptz_autotracker . tracked_object [ camera ] = None
self . ptz_autotracker . tracked_object_history [ camera ] . clear ( )
2023-07-15 02:01:10 +02:00
2023-07-08 14:04:47 +02:00
logger . info ( " Exiting autotracker... " )
class PtzAutoTracker :
def __init__ (
self ,
config : FrigateConfig ,
onvif : OnvifController ,
2023-07-11 13:23:20 +02:00
ptz_metrics : PTZMetricsTypes ,
2023-11-02 00:20:26 +01:00
dispatcher : Dispatcher ,
2023-11-11 01:12:20 +01:00
stop_event : MpEvent ,
2023-07-08 14:04:47 +02:00
) - > None :
self . config = config
self . onvif = onvif
2023-07-11 13:23:20 +02:00
self . ptz_metrics = ptz_metrics
2023-11-02 00:20:26 +01:00
self . dispatcher = dispatcher
2023-11-11 01:12:20 +01:00
self . stop_event = stop_event
2023-07-08 14:04:47 +02:00
self . tracked_object : dict [ str , object ] = { }
2023-10-22 18:59:13 +02:00
self . tracked_object_history : dict [ str , object ] = { }
self . tracked_object_metrics : dict [ str , object ] = { }
2023-09-27 13:19:10 +02:00
self . object_types : dict [ str , object ] = { }
self . required_zones : dict [ str , object ] = { }
self . move_queues : dict [ str , object ] = { }
2023-09-29 01:01:05 +02:00
self . move_queue_locks : dict [ str , object ] = { }
2023-09-27 13:19:10 +02:00
self . move_threads : dict [ str , object ] = { }
self . autotracker_init : dict [ str , object ] = { }
self . move_metrics : dict [ str , object ] = { }
self . calibrating : dict [ str , object ] = { }
self . intercept : dict [ str , object ] = { }
self . move_coefficients : dict [ str , object ] = { }
2023-09-29 01:21:37 +02:00
self . zoom_factor : dict [ str , object ] = { }
2023-07-08 14:04:47 +02:00
# if cam is set to autotrack, onvif should be set up
2023-10-22 18:59:13 +02:00
for camera , camera_config in self . config . cameras . items ( ) :
if not camera_config . enabled :
2023-09-24 12:05:29 +02:00
continue
2023-10-22 18:59:13 +02:00
self . autotracker_init [ camera ] = False
2023-11-16 02:25:48 +01:00
if (
camera_config . onvif . autotracking . enabled
and camera_config . onvif . autotracking . enabled_in_config
) :
2023-10-22 18:59:13 +02:00
self . _autotracker_setup ( camera_config , camera )
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
def _autotracker_setup ( self , camera_config , camera ) :
logger . debug ( f " { camera } : Autotracker init " )
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
self . object_types [ camera ] = camera_config . onvif . autotracking . track
self . required_zones [ camera ] = camera_config . onvif . autotracking . required_zones
self . zoom_factor [ camera ] = camera_config . onvif . autotracking . zoom_factor
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
self . tracked_object [ camera ] = None
self . tracked_object_history [ camera ] = deque (
maxlen = round ( camera_config . detect . fps * 1.5 )
)
self . tracked_object_metrics [ camera ] = {
2023-11-01 12:12:43 +01:00
" max_target_box " : AUTOTRACKING_MAX_AREA_RATIO
* * ( 1 / self . zoom_factor [ camera ] )
2023-10-22 18:59:13 +02:00
}
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
self . calibrating [ camera ] = False
self . move_metrics [ camera ] = [ ]
self . intercept [ camera ] = None
self . move_coefficients [ camera ] = [ ]
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
self . move_queues [ camera ] = queue . Queue ( )
self . move_queue_locks [ camera ] = threading . Lock ( )
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
if not self . onvif . cams [ camera ] [ " init " ] :
if not self . onvif . _init_onvif ( camera ) :
logger . warning ( f " Unable to initialize onvif for { camera } " )
camera_config . onvif . autotracking . enabled = False
self . ptz_metrics [ camera ] [ " ptz_autotracker_enabled " ] . value = False
2023-07-08 14:04:47 +02:00
return
2023-10-22 18:59:13 +02:00
if " pt-r-fov " not in self . onvif . cams [ camera ] [ " features " ] :
camera_config . onvif . autotracking . enabled = False
self . ptz_metrics [ camera ] [ " ptz_autotracker_enabled " ] . value = False
2023-07-08 14:04:47 +02:00
logger . warning (
2023-10-22 18:59:13 +02:00
f " Disabling autotracking for { camera } : FOV relative movement not supported "
2023-07-08 14:04:47 +02:00
)
return
2023-10-22 18:59:13 +02:00
movestatus_supported = self . onvif . get_service_capabilities ( camera )
2023-09-27 13:19:10 +02:00
if movestatus_supported is None or movestatus_supported . lower ( ) != " true " :
2023-10-22 18:59:13 +02:00
camera_config . onvif . autotracking . enabled = False
self . ptz_metrics [ camera ] [ " ptz_autotracker_enabled " ] . value = False
2023-09-27 13:19:10 +02:00
logger . warning (
2023-10-22 18:59:13 +02:00
f " Disabling autotracking for { camera } : ONVIF MoveStatus not supported "
2023-09-27 13:19:10 +02:00
)
return
2023-10-22 18:59:13 +02:00
if self . onvif . cams [ camera ] [ " init " ] :
self . onvif . get_camera_status ( camera )
2023-09-27 13:19:10 +02:00
2023-07-08 14:04:47 +02:00
# movement thread per camera
2023-10-22 18:59:13 +02:00
self . move_threads [ camera ] = threading . Thread (
name = f " ptz_move_thread_ { camera } " ,
target = partial ( self . _process_move_queue , camera ) ,
2023-10-07 16:18:07 +02:00
)
2023-10-22 18:59:13 +02:00
self . move_threads [ camera ] . daemon = True
self . move_threads [ camera ] . start ( )
if camera_config . onvif . autotracking . movement_weights :
if len ( camera_config . onvif . autotracking . movement_weights ) == 5 :
2023-11-01 12:12:43 +01:00
camera_config . onvif . autotracking . movement_weights = [
float ( val )
for val in camera_config . onvif . autotracking . movement_weights
]
2023-10-22 18:59:13 +02:00
self . ptz_metrics [ camera ] [
" ptz_min_zoom "
] . value = camera_config . onvif . autotracking . movement_weights [ 0 ]
self . ptz_metrics [ camera ] [
" ptz_max_zoom "
] . value = camera_config . onvif . autotracking . movement_weights [ 1 ]
self . intercept [
camera
] = camera_config . onvif . autotracking . movement_weights [ 2 ]
self . move_coefficients [
camera
] = camera_config . onvif . autotracking . movement_weights [ 3 : ]
else :
camera_config . onvif . autotracking . enabled = False
self . ptz_metrics [ camera ] [ " ptz_autotracker_enabled " ] . value = False
logger . warning (
f " Autotracker recalibration is required for { camera } . Disabling autotracking. "
)
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
if camera_config . onvif . autotracking . calibrate_on_startup :
self . _calibrate_camera ( camera )
2023-09-27 13:19:10 +02:00
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera ] [ " ptz_tracking_active " ] . clear ( )
self . dispatcher . publish ( f " { camera } /ptz_autotracker/active " , " OFF " , retain = False )
2023-10-22 18:59:13 +02:00
self . autotracker_init [ camera ] = True
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
def _write_config ( self , camera ) :
2023-09-27 13:19:10 +02:00
config_file = os . environ . get ( " CONFIG_FILE " , f " { CONFIG_DIR } /config.yml " )
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : Writing new config with autotracker motion coefficients: { self . config . cameras [ camera ] . onvif . autotracking . movement_weights } "
2023-09-27 13:19:10 +02:00
)
update_yaml_file (
config_file ,
[ " cameras " , camera , " onvif " , " autotracking " , " movement_weights " ] ,
self . config . cameras [ camera ] . onvif . autotracking . movement_weights ,
)
def _calibrate_camera ( self , camera ) :
# move the camera from the preset in steps and measure the time it takes to move that amount
# this will allow us to predict movement times with a simple linear regression
# start with 0 so we can determine a baseline (to be used as the intercept in the regression calc)
# TODO: take zooming into account too
num_steps = 30
step_sizes = np . linspace ( 0 , 1 , num_steps )
2023-10-22 18:59:13 +02:00
zoom_in_values = [ ]
zoom_out_values = [ ]
2023-09-27 13:19:10 +02:00
self . calibrating [ camera ] = True
logger . info ( f " Camera calibration for { camera } in progress " )
2023-10-22 18:59:13 +02:00
# zoom levels test
if (
self . config . cameras [ camera ] . onvif . autotracking . zooming
!= ZoomingModeEnum . disabled
) :
logger . info ( f " Calibration for { camera } in progress: 0% complete " )
for i in range ( 2 ) :
# absolute move to 0 - fully zoomed out
self . onvif . _zoom_absolute (
camera ,
self . onvif . cams [ camera ] [ " absolute_zoom_range " ] [ " XRange " ] [ " Min " ] ,
1 ,
)
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-10-22 18:59:13 +02:00
self . onvif . get_camera_status ( camera )
zoom_out_values . append ( self . ptz_metrics [ camera ] [ " ptz_zoom_level " ] . value )
self . onvif . _zoom_absolute (
camera ,
self . onvif . cams [ camera ] [ " absolute_zoom_range " ] [ " XRange " ] [ " Max " ] ,
1 ,
)
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-10-22 18:59:13 +02:00
self . onvif . get_camera_status ( camera )
zoom_in_values . append ( self . ptz_metrics [ camera ] [ " ptz_zoom_level " ] . value )
if (
self . config . cameras [ camera ] . onvif . autotracking . zooming
== ZoomingModeEnum . relative
) :
# relative move to -0.01
self . onvif . _move_relative (
camera ,
0 ,
0 ,
- 1e-2 ,
1 ,
)
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-10-22 18:59:13 +02:00
self . onvif . get_camera_status ( camera )
zoom_out_values . append (
self . ptz_metrics [ camera ] [ " ptz_zoom_level " ] . value
)
# relative move to 0.01
self . onvif . _move_relative (
camera ,
0 ,
0 ,
1e-2 ,
1 ,
)
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-10-22 18:59:13 +02:00
self . onvif . get_camera_status ( camera )
zoom_in_values . append (
self . ptz_metrics [ camera ] [ " ptz_zoom_level " ] . value
)
self . ptz_metrics [ camera ] [ " ptz_max_zoom " ] . value = max ( zoom_in_values )
self . ptz_metrics [ camera ] [ " ptz_min_zoom " ] . value = min ( zoom_out_values )
logger . debug (
f ' { camera } : Calibration values: max zoom: { self . ptz_metrics [ camera ] [ " ptz_max_zoom " ] . value } , min zoom: { self . ptz_metrics [ camera ] [ " ptz_min_zoom " ] . value } '
)
else :
self . ptz_metrics [ camera ] [ " ptz_max_zoom " ] . value = 1
self . ptz_metrics [ camera ] [ " ptz_min_zoom " ] . value = 0
2023-09-27 13:19:10 +02:00
self . onvif . _move_to_preset (
camera ,
self . config . cameras [ camera ] . onvif . autotracking . return_preset . lower ( ) ,
)
self . ptz_metrics [ camera ] [ " ptz_reset " ] . set ( )
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . clear ( )
2023-09-27 13:19:10 +02:00
# Wait until the camera finishes moving
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-09-27 13:19:10 +02:00
self . onvif . get_camera_status ( camera )
for step in range ( num_steps ) :
pan = step_sizes [ step ]
tilt = step_sizes [ step ]
start_time = time . time ( )
self . onvif . _move_relative ( camera , pan , tilt , 0 , 1 )
# Wait until the camera finishes moving
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-09-27 13:19:10 +02:00
self . onvif . get_camera_status ( camera )
stop_time = time . time ( )
self . move_metrics [ camera ] . append (
{
" pan " : pan ,
" tilt " : tilt ,
" start_timestamp " : start_time ,
" end_timestamp " : stop_time ,
}
)
self . onvif . _move_to_preset (
camera ,
self . config . cameras [ camera ] . onvif . autotracking . return_preset . lower ( ) ,
)
self . ptz_metrics [ camera ] [ " ptz_reset " ] . set ( )
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . clear ( )
2023-09-27 13:19:10 +02:00
# Wait until the camera finishes moving
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-09-27 13:19:10 +02:00
self . onvif . get_camera_status ( camera )
2023-10-22 18:59:13 +02:00
logger . info (
f " Calibration for { camera } in progress: { round ( ( step / num_steps ) * 100 ) } % complete "
)
2023-09-27 13:19:10 +02:00
self . calibrating [ camera ] = False
logger . info ( f " Calibration for { camera } complete " )
# calculate and save new intercept and coefficients
self . _calculate_move_coefficients ( camera , True )
def _calculate_move_coefficients ( self , camera , calibration = False ) :
# calculate new coefficients when we have 50 more new values. Save up to 500
if calibration or (
len ( self . move_metrics [ camera ] ) % 50 == 0
and len ( self . move_metrics [ camera ] ) != 0
2023-10-22 18:59:13 +02:00
and len ( self . move_metrics [ camera ] ) < = AUTOTRACKING_MAX_MOVE_METRICS
2023-09-27 13:19:10 +02:00
) :
X = np . array (
[ abs ( d [ " pan " ] ) + abs ( d [ " tilt " ] ) for d in self . move_metrics [ camera ] ]
)
y = np . array (
[
d [ " end_timestamp " ] - d [ " start_timestamp " ]
for d in self . move_metrics [ camera ]
]
)
# simple linear regression with intercept
X_with_intercept = np . column_stack ( ( np . ones ( X . shape [ 0 ] ) , X ) )
self . move_coefficients [ camera ] = np . linalg . lstsq (
X_with_intercept , y , rcond = None
) [ 0 ]
# only assign a new intercept if we're calibrating
if calibration :
self . intercept [ camera ] = y [ 0 ]
2023-10-22 18:59:13 +02:00
# write the min zoom, max zoom, intercept, and coefficients
# back to the config file as a comma separated string
2023-09-27 13:19:10 +02:00
self . config . cameras [ camera ] . onvif . autotracking . movement_weights = " , " . join (
2023-10-22 18:59:13 +02:00
str ( v )
for v in [
self . ptz_metrics [ camera ] [ " ptz_min_zoom " ] . value ,
self . ptz_metrics [ camera ] [ " ptz_max_zoom " ] . value ,
self . intercept [ camera ] ,
* self . move_coefficients [ camera ] ,
]
2023-09-27 13:19:10 +02:00
)
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : New regression parameters - intercept: { self . intercept [ camera ] } , coefficients: { self . move_coefficients [ camera ] } "
2023-09-27 13:19:10 +02:00
)
2023-10-22 18:59:13 +02:00
self . _write_config ( camera )
2023-09-27 13:19:10 +02:00
def _predict_movement_time ( self , camera , pan , tilt ) :
combined_movement = abs ( pan ) + abs ( tilt )
input_data = np . array ( [ self . intercept [ camera ] , combined_movement ] )
return np . dot ( self . move_coefficients [ camera ] , input_data )
2023-10-27 00:21:58 +02:00
def _calculate_tracked_object_metrics ( self , camera , obj ) :
2023-10-22 18:59:13 +02:00
def remove_outliers ( data ) :
Q1 = np . percentile ( data , 25 )
Q3 = np . percentile ( data , 75 )
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
filtered_data = [ x for x in data if lower_bound < = x < = upper_bound ]
removed_values = [ x for x in data if x not in filtered_data ]
logger . debug ( f " { camera } : Removed area outliers: { removed_values } " )
return filtered_data
camera_config = self . config . cameras [ camera ]
camera_width = camera_config . frame_shape [ 1 ]
camera_height = camera_config . frame_shape [ 0 ]
# Extract areas and calculate weighted average
2023-11-01 12:12:43 +01:00
# grab the largest dimension of the bounding box and create a square from that
areas = [
max ( obj [ " box " ] [ 2 ] - obj [ " box " ] [ 0 ] , obj [ " box " ] [ 3 ] - obj [ " box " ] [ 1 ] ) * * 2
for obj in self . tracked_object_history [ camera ]
]
2023-10-22 18:59:13 +02:00
filtered_areas = (
remove_outliers ( areas )
if len ( areas ) > = self . config . cameras [ camera ] . detect . fps / 2
else areas
)
weights = np . arange ( 1 , len ( filtered_areas ) + 1 )
weighted_area = np . average ( filtered_areas , weights = weights )
self . tracked_object_metrics [ camera ] [ " target_box " ] = (
weighted_area / ( camera_width * camera_height )
) * * self . zoom_factor [ camera ]
if " original_target_box " not in self . tracked_object_metrics [ camera ] :
self . tracked_object_metrics [ camera ] [
" original_target_box "
] = self . tracked_object_metrics [ camera ] [ " target_box " ]
2023-10-27 00:21:58 +02:00
(
self . tracked_object_metrics [ camera ] [ " valid_velocity " ] ,
self . tracked_object_metrics [ camera ] [ " velocity " ] ,
) = self . _get_valid_velocity ( camera , obj )
self . tracked_object_metrics [ camera ] [ " distance " ] = self . _get_distance_threshold (
camera , obj
)
centroid_distance = np . linalg . norm (
[
obj . obj_data [ " centroid " ] [ 0 ] - camera_config . detect . width / 2 ,
obj . obj_data [ " centroid " ] [ 1 ] - camera_config . detect . height / 2 ,
]
)
logger . debug ( f " { camera } : Centroid distance: { centroid_distance } " )
self . tracked_object_metrics [ camera ] [ " below_distance_threshold " ] = (
centroid_distance < self . tracked_object_metrics [ camera ] [ " distance " ]
)
2023-07-08 14:04:47 +02:00
def _process_move_queue ( self , camera ) :
2023-10-22 18:59:13 +02:00
camera_config = self . config . cameras [ camera ]
camera_config . frame_shape [ 1 ]
camera_config . frame_shape [ 0 ]
2023-11-11 01:12:20 +01:00
while not self . stop_event . is_set ( ) :
2023-11-18 22:36:13 +01:00
try :
move_data = self . move_queues [ camera ] . get ( True , 0.1 )
except queue . Empty :
continue
2023-09-29 01:01:05 +02:00
with self . move_queue_locks [ camera ] :
2023-09-27 13:19:10 +02:00
frame_time , pan , tilt , zoom = move_data
2023-07-15 02:01:10 +02:00
# if we're receiving move requests during a PTZ move, ignore them
if ptz_moving_at_frame_time (
frame_time ,
self . ptz_metrics [ camera ] [ " ptz_start_time " ] . value ,
self . ptz_metrics [ camera ] [ " ptz_stop_time " ] . value ,
) :
# instead of dequeueing this might be a good place to preemptively move based
# on an estimate - for fast moving objects, etc.
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : Move queue: PTZ moving, dequeueing move request - frame time: { frame_time } , final pan: { pan } , final tilt: { tilt } , final zoom: { zoom } "
2023-07-15 02:01:10 +02:00
)
continue
2023-07-08 14:04:47 +02:00
2023-07-13 12:32:51 +02:00
else :
2023-09-27 13:19:10 +02:00
if (
self . config . cameras [ camera ] . onvif . autotracking . zooming
== ZoomingModeEnum . relative
) :
self . onvif . _move_relative ( camera , pan , tilt , zoom , 1 )
2023-10-22 18:59:13 +02:00
2023-07-15 02:01:10 +02:00
else :
2023-10-22 18:59:13 +02:00
if pan != 0 or tilt != 0 :
2023-09-27 13:19:10 +02:00
self . onvif . _move_relative ( camera , pan , tilt , 0 , 1 )
2023-07-15 02:01:10 +02:00
2023-10-22 18:59:13 +02:00
# Wait until the camera finishes moving
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [
" ptz_motor_stopped "
] . is_set ( ) :
2023-10-22 18:59:13 +02:00
self . onvif . get_camera_status ( camera )
if (
zoom > 0
and self . ptz_metrics [ camera ] [ " ptz_zoom_level " ] . value != zoom
) :
self . onvif . _zoom_absolute ( camera , zoom , 1 )
2023-07-15 02:01:10 +02:00
# Wait until the camera finishes moving
2023-11-02 00:20:26 +01:00
while not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-07-15 02:01:10 +02:00
self . onvif . get_camera_status ( camera )
2023-07-08 23:02:54 +02:00
2023-09-27 13:19:10 +02:00
if self . config . cameras [ camera ] . onvif . autotracking . movement_weights :
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : Predicted movement time: { self . _predict_movement_time ( camera , pan , tilt ) } "
2023-09-27 13:19:10 +02:00
)
logger . debug (
2023-10-22 18:59:13 +02:00
f ' { camera } : Actual movement time: { self . ptz_metrics [ camera ] [ " ptz_stop_time " ] . value - self . ptz_metrics [ camera ] [ " ptz_start_time " ] . value } '
2023-09-27 13:19:10 +02:00
)
# save metrics for better estimate calculations
if (
self . intercept [ camera ] is not None
2023-10-22 18:59:13 +02:00
and len ( self . move_metrics [ camera ] )
< AUTOTRACKING_MAX_MOVE_METRICS
and ( pan != 0 or tilt != 0 )
and self . config . cameras [
camera
] . onvif . autotracking . calibrate_on_startup
2023-09-27 13:19:10 +02:00
) :
2023-10-22 18:59:13 +02:00
logger . debug ( f " { camera } : Adding new values to move metrics " )
2023-09-27 13:19:10 +02:00
self . move_metrics [ camera ] . append (
{
" pan " : pan ,
" tilt " : tilt ,
" start_timestamp " : self . ptz_metrics [ camera ] [
" ptz_start_time "
] . value ,
" end_timestamp " : self . ptz_metrics [ camera ] [
" ptz_stop_time "
] . value ,
}
)
# calculate new coefficients if we have enough data
self . _calculate_move_coefficients ( camera )
def _enqueue_move ( self , camera , frame_time , pan , tilt , zoom ) :
def split_value ( value ) :
clipped = np . clip ( value , - 1 , 1 )
return clipped , value - clipped
2023-07-13 12:32:51 +02:00
if (
frame_time > self . ptz_metrics [ camera ] [ " ptz_start_time " ] . value
and frame_time > self . ptz_metrics [ camera ] [ " ptz_stop_time " ] . value
2023-09-29 01:01:05 +02:00
and not self . move_queue_locks [ camera ] . locked ( )
2023-07-13 12:32:51 +02:00
) :
2023-09-27 13:19:10 +02:00
# don't make small movements
if abs ( pan ) < 0.02 :
pan = 0
if abs ( tilt ) < 0.02 :
tilt = 0
# split up any large moves caused by velocity estimated movements
while pan != 0 or tilt != 0 or zoom != 0 :
pan , pan_excess = split_value ( pan )
tilt , tilt_excess = split_value ( tilt )
zoom , zoom_excess = split_value ( zoom )
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : Enqueue movement for frame time: { frame_time } pan: { pan } , tilt: { tilt } , zoom: { zoom } "
2023-09-27 13:19:10 +02:00
)
move_data = ( frame_time , pan , tilt , zoom )
self . move_queues [ camera ] . put ( move_data )
pan = pan_excess
tilt = tilt_excess
zoom = zoom_excess
2023-10-22 18:59:13 +02:00
def _get_valid_velocity ( self , camera , obj ) :
# returns a tuple and euclidean distance if the estimated velocity is valid
# if invalid, returns [0, 0] and -1
camera_config = self . config . cameras [ camera ]
camera_width = camera_config . frame_shape [ 1 ]
camera_height = camera_config . frame_shape [ 0 ]
camera_fps = camera_config . detect . fps
2023-11-01 12:12:43 +01:00
# estimate_velocity is a numpy array of bbox top,left and bottom,right velocities
2023-10-22 18:59:13 +02:00
velocities = obj . obj_data [ " estimate_velocity " ]
logger . debug ( f " { camera } : Velocities from norfair: { velocities } " )
2023-10-27 00:21:58 +02:00
# if we are close enough to zero, return right away
if np . all ( np . round ( velocities ) == 0 ) :
2023-11-01 12:12:43 +01:00
return True , np . zeros ( ( 4 , ) )
2023-10-27 00:21:58 +02:00
2023-10-22 18:59:13 +02:00
# Thresholds
x_mags_thresh = camera_width / camera_fps / 2
y_mags_thresh = camera_height / camera_fps / 2
2023-10-27 00:21:58 +02:00
dir_thresh = 0.93
2023-11-01 12:12:43 +01:00
delta_thresh = 20
var_thresh = 10
2023-10-22 18:59:13 +02:00
# Check magnitude
x_mags = np . abs ( velocities [ : , 0 ] )
y_mags = np . abs ( velocities [ : , 1 ] )
invalid_x_mags = np . any ( x_mags > x_mags_thresh )
invalid_y_mags = np . any ( y_mags > y_mags_thresh )
# Check delta
delta = np . abs ( velocities [ 0 ] - velocities [ 1 ] )
invalid_delta = np . any ( delta > delta_thresh )
# Check variance
stdevs = np . std ( velocities , axis = 0 )
high_variances = np . any ( stdevs > var_thresh )
# Check direction difference
2023-10-27 00:21:58 +02:00
velocities = np . round ( velocities )
invalid_dirs = False
if not np . any ( np . linalg . norm ( velocities , axis = 1 ) ) :
cosine_sim = np . dot ( velocities [ 0 ] , velocities [ 1 ] ) / (
np . linalg . norm ( velocities [ 0 ] ) * np . linalg . norm ( velocities [ 1 ] )
)
dir_thresh = 0.6 if np . all ( delta < delta_thresh / 2 ) else dir_thresh
invalid_dirs = cosine_sim < dir_thresh
2023-10-22 18:59:13 +02:00
# Combine
invalid = (
invalid_x_mags
or invalid_y_mags
or invalid_dirs
or invalid_delta
or high_variances
)
if invalid :
logger . debug (
f " { camera } : Invalid velocity: { tuple ( np . round ( velocities , 2 ) . flatten ( ) . astype ( int ) ) } : Invalid because: "
+ " , " . join (
[
var_name
for var_name , is_invalid in [
( " invalid_x_mags " , invalid_x_mags ) ,
( " invalid_y_mags " , invalid_y_mags ) ,
( " invalid_dirs " , invalid_dirs ) ,
( " invalid_delta " , invalid_delta ) ,
( " high_variances " , high_variances ) ,
]
if is_invalid
]
)
)
# invalid velocity
2023-11-01 12:12:43 +01:00
return False , np . zeros ( ( 4 , ) )
2023-10-22 18:59:13 +02:00
else :
2023-10-27 00:21:58 +02:00
logger . debug ( f " { camera } : Valid velocity " )
2023-11-01 12:12:43 +01:00
return True , velocities . flatten ( )
2023-10-22 18:59:13 +02:00
2023-10-27 00:21:58 +02:00
def _get_distance_threshold ( self , camera , obj ) :
2023-10-22 18:59:13 +02:00
# Returns true if Euclidean distance from object to center of frame is
# less than 10% of the of the larger dimension (width or height) of the frame,
# multiplied by a scaling factor for object size.
# Distance is increased if object is not moving to prevent small ptz moves
# Adjusting this percentage slightly lower will effectively cause the camera to move
# more often to keep the object in the center. Raising the percentage will cause less
# movement and will be more flexible with objects not quite being centered.
# TODO: there's probably a better way to approach this
camera_config = self . config . cameras [ camera ]
obj_width = obj . obj_data [ " box " ] [ 2 ] - obj . obj_data [ " box " ] [ 0 ]
obj_height = obj . obj_data [ " box " ] [ 3 ] - obj . obj_data [ " box " ] [ 1 ]
max_obj = max ( obj_width , obj_height )
max_frame = (
camera_config . detect . width
if max_obj == obj_width
else camera_config . detect . height
)
# larger objects should lower the threshold, smaller objects should raise it
scaling_factor = 1 - np . log ( max_obj / max_frame )
2023-10-27 00:21:58 +02:00
percentage = (
0.08
if camera_config . onvif . autotracking . movement_weights
and self . tracked_object_metrics [ camera ] [ " valid_velocity " ]
else 0.03
)
2023-10-22 18:59:13 +02:00
distance_threshold = percentage * max_frame * scaling_factor
2023-10-27 00:21:58 +02:00
logger . debug ( f " { camera } : Distance threshold: { distance_threshold } " )
2023-10-22 18:59:13 +02:00
2023-10-27 00:21:58 +02:00
return distance_threshold
2023-10-22 18:59:13 +02:00
def _should_zoom_in ( self , camera , obj , box , debug_zooming = False ) :
# returns True if we should zoom in, False if we should zoom out, None to do nothing
2023-09-27 13:19:10 +02:00
camera_config = self . config . cameras [ camera ]
camera_width = camera_config . frame_shape [ 1 ]
camera_height = camera_config . frame_shape [ 0 ]
2023-10-22 18:59:13 +02:00
camera_fps = camera_config . detect . fps
2023-10-27 00:21:58 +02:00
average_velocity = self . tracked_object_metrics [ camera ] [ " velocity " ]
2023-09-27 13:19:10 +02:00
bb_left , bb_top , bb_right , bb_bottom = box
# TODO: Take into account the area changing when an object is moving out of frame
2023-10-22 18:59:13 +02:00
edge_threshold = AUTOTRACKING_ZOOM_EDGE_THRESHOLD
# calculate a velocity threshold based on movement coefficients if available
if camera_config . onvif . autotracking . movement_weights :
predicted_movement_time = self . _predict_movement_time ( camera , 1 , 1 )
velocity_threshold_x = camera_width / predicted_movement_time / camera_fps
velocity_threshold_y = camera_height / predicted_movement_time / camera_fps
else :
# use a generic velocity threshold
velocity_threshold_x = camera_width * 0.02
velocity_threshold_y = camera_height * 0.02
touching_frame_edges = int (
( bb_left < edge_threshold * camera_width )
+ ( bb_right > ( 1 - edge_threshold ) * camera_width )
+ ( bb_top < edge_threshold * camera_height )
+ ( bb_bottom > ( 1 - edge_threshold ) * camera_height )
2023-09-27 13:19:10 +02:00
)
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
# make sure object is centered in the frame
2023-10-27 00:21:58 +02:00
below_distance_threshold = self . tracked_object_metrics [ camera ] [
" below_distance_threshold "
]
2023-10-22 18:59:13 +02:00
below_dimension_threshold = ( bb_right - bb_left ) < = camera_width * (
self . zoom_factor [ camera ] + 0.1
) and ( bb_bottom - bb_top ) < = camera_height * ( self . zoom_factor [ camera ] + 0.1 )
# ensure object is not moving quickly
below_velocity_threshold = np . all (
np . abs ( average_velocity )
2023-11-01 12:12:43 +01:00
< np . tile ( [ velocity_threshold_x , velocity_threshold_y ] , 2 )
2023-10-22 18:59:13 +02:00
) or np . all ( average_velocity == 0 )
below_area_threshold = (
self . tracked_object_metrics [ camera ] [ " target_box " ]
< self . tracked_object_metrics [ camera ] [ " max_target_box " ]
)
# introduce some hysteresis to prevent a yo-yo zooming effect
zoom_out_hysteresis = (
self . tracked_object_metrics [ camera ] [ " target_box " ]
> self . tracked_object_metrics [ camera ] [ " max_target_box " ]
* AUTOTRACKING_ZOOM_OUT_HYSTERESIS
)
zoom_in_hysteresis = (
self . tracked_object_metrics [ camera ] [ " target_box " ]
< self . tracked_object_metrics [ camera ] [ " max_target_box " ]
* AUTOTRACKING_ZOOM_IN_HYSTERESIS
)
at_max_zoom = (
self . ptz_metrics [ camera ] [ " ptz_zoom_level " ] . value
== self . ptz_metrics [ camera ] [ " ptz_max_zoom " ] . value
)
at_min_zoom = (
self . ptz_metrics [ camera ] [ " ptz_zoom_level " ] . value
== self . ptz_metrics [ camera ] [ " ptz_min_zoom " ] . value
)
# debug zooming
if debug_zooming :
logger . debug (
f " { camera } : Zoom test: touching edges: count: { touching_frame_edges } left: { bb_left < edge_threshold * camera_width } , right: { bb_right > ( 1 - edge_threshold ) * camera_width } , top: { bb_top < edge_threshold * camera_height } , bottom: { bb_bottom > ( 1 - edge_threshold ) * camera_height } "
)
logger . debug (
f " { camera } : Zoom test: below distance threshold: { ( below_distance_threshold ) } "
)
logger . debug (
f " { camera } : Zoom test: below area threshold: { ( below_area_threshold ) } target: { self . tracked_object_metrics [ camera ] [ ' target_box ' ] } max: { self . tracked_object_metrics [ camera ] [ ' max_target_box ' ] } "
)
logger . debug (
f " { camera } : Zoom test: below dimension threshold: { below_dimension_threshold } width: { bb_right - bb_left } , max width: { camera_width * ( self . zoom_factor [ camera ] + 0.1 ) } , height: { bb_bottom - bb_top } , max height: { camera_height * ( self . zoom_factor [ camera ] + 0.1 ) } "
)
logger . debug (
f " { camera } : Zoom test: below velocity threshold: { below_velocity_threshold } velocity x: { abs ( average_velocity [ 0 ] ) } , x threshold: { velocity_threshold_x } , velocity y: { abs ( average_velocity [ 0 ] ) } , y threshold: { velocity_threshold_y } "
)
logger . debug ( f " { camera } : Zoom test: at max zoom: { at_max_zoom } " )
logger . debug ( f " { camera } : Zoom test: at min zoom: { at_min_zoom } " )
logger . debug (
f ' { camera } : Zoom test: zoom in hysteresis limit: { zoom_in_hysteresis } value: { AUTOTRACKING_ZOOM_IN_HYSTERESIS } original: { self . tracked_object_metrics [ camera ] [ " original_target_box " ] } max: { self . tracked_object_metrics [ camera ] [ " max_target_box " ] } target: { self . tracked_object_metrics [ camera ] [ " target_box " ] } '
)
logger . debug (
f ' { camera } : Zoom test: zoom out hysteresis limit: { zoom_out_hysteresis } value: { AUTOTRACKING_ZOOM_OUT_HYSTERESIS } original: { self . tracked_object_metrics [ camera ] [ " original_target_box " ] } max: { self . tracked_object_metrics [ camera ] [ " max_target_box " ] } target: { self . tracked_object_metrics [ camera ] [ " target_box " ] } '
)
# Zoom in conditions (and)
if (
zoom_in_hysteresis
and touching_frame_edges == 0
and below_velocity_threshold
and below_dimension_threshold
and below_area_threshold
and not at_max_zoom
) :
return True
# Zoom out conditions (or)
if (
(
zoom_out_hysteresis
and not at_max_zoom
and ( not below_area_threshold or not below_dimension_threshold )
)
or ( zoom_out_hysteresis and not below_area_threshold and at_max_zoom )
or (
touching_frame_edges == 1
and ( below_distance_threshold or not below_dimension_threshold )
)
or touching_frame_edges > 1
or not below_velocity_threshold
) and not at_min_zoom :
return False
# Don't zoom at all
return None
2023-07-08 14:04:47 +02:00
def _autotrack_move_ptz ( self , camera , obj ) :
camera_config = self . config . cameras [ camera ]
camera_width = camera_config . frame_shape [ 1 ]
camera_height = camera_config . frame_shape [ 0 ]
2023-09-27 13:19:10 +02:00
camera_fps = camera_config . detect . fps
2023-11-01 12:12:43 +01:00
average_velocity = np . zeros ( ( 4 , ) )
2023-10-22 18:59:13 +02:00
predicted_box = obj . obj_data [ " box " ]
2023-09-27 13:19:10 +02:00
centroid_x = obj . obj_data [ " centroid " ] [ 0 ]
centroid_y = obj . obj_data [ " centroid " ] [ 1 ]
2023-07-08 14:04:47 +02:00
2023-07-13 12:32:51 +02:00
# Normalize coordinates. top right of the fov is (1,1), center is (0,0), bottom left is (-1, -1).
2023-09-27 13:19:10 +02:00
pan = ( ( centroid_x / camera_width ) - 0.5 ) * 2
tilt = ( 0.5 - ( centroid_y / camera_height ) ) * 2
if (
camera_config . onvif . autotracking . movement_weights
) : # use estimates if we have available coefficients
predicted_movement_time = self . _predict_movement_time ( camera , pan , tilt )
2023-10-27 00:21:58 +02:00
_ , average_velocity = (
self . _get_valid_velocity ( camera , obj )
if " velocity " not in self . tracked_object_metrics [ camera ]
else (
self . tracked_object_metrics [ camera ] [ " valid_velocity " ] ,
self . tracked_object_metrics [ camera ] [ " velocity " ] ,
)
)
2023-09-29 01:21:37 +02:00
2023-10-22 18:59:13 +02:00
if np . any ( average_velocity ) :
2023-09-29 01:21:37 +02:00
# this box could exceed the frame boundaries if velocity is high
# but we'll handle that in _enqueue_move() as two separate moves
2023-10-22 18:59:13 +02:00
current_box = np . array ( obj . obj_data [ " box " ] )
predicted_box = (
current_box
+ camera_fps * predicted_movement_time * average_velocity
)
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
predicted_box = np . round ( predicted_box ) . astype ( int )
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
centroid_x = round ( ( predicted_box [ 0 ] + predicted_box [ 2 ] ) / 2 )
centroid_y = round ( ( predicted_box [ 1 ] + predicted_box [ 3 ] ) / 2 )
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
# recalculate pan and tilt with new centroid
pan = ( ( centroid_x / camera_width ) - 0.5 ) * 2
tilt = ( 0.5 - ( centroid_y / camera_height ) ) * 2
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
logger . debug ( f ' { camera } : Original box: { obj . obj_data [ " box " ] } ' )
logger . debug ( f " { camera } : Predicted box: { tuple ( predicted_box ) } " )
logger . debug (
f " { camera } : Velocity: { tuple ( np . round ( average_velocity ) . flatten ( ) . astype ( int ) ) } "
2023-09-29 01:21:37 +02:00
)
2023-10-22 18:59:13 +02:00
zoom = self . _get_zoom_amount ( camera , obj , predicted_box , debug_zoom = True )
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
self . _enqueue_move ( camera , obj . obj_data [ " frame_time " ] , pan , tilt , zoom )
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
def _autotrack_move_zoom_only ( self , camera , obj ) :
camera_config = self . config . cameras [ camera ]
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
if camera_config . onvif . autotracking . zooming != ZoomingModeEnum . disabled :
zoom = self . _get_zoom_amount ( camera , obj , obj . obj_data [ " box " ] )
if zoom != 0 :
self . _enqueue_move ( camera , obj . obj_data [ " frame_time " ] , 0 , 0 , zoom )
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
def _get_zoom_amount ( self , camera , obj , predicted_box , debug_zoom = True ) :
2023-09-27 13:19:10 +02:00
camera_config = self . config . cameras [ camera ]
2023-10-22 18:59:13 +02:00
# frame width and height
camera_width = camera_config . frame_shape [ 1 ]
camera_height = camera_config . frame_shape [ 0 ]
zoom = 0
result = None
current_zoom_level = self . ptz_metrics [ camera ] [ " ptz_zoom_level " ] . value
2023-11-01 12:12:43 +01:00
target_box = max (
obj . obj_data [ " box " ] [ 2 ] - obj . obj_data [ " box " ] [ 0 ] ,
obj . obj_data [ " box " ] [ 3 ] - obj . obj_data [ " box " ] [ 1 ] ,
) * * 2 / ( camera_width * camera_height )
2023-10-22 18:59:13 +02:00
2023-09-27 13:19:10 +02:00
# absolute zooming separately from pan/tilt
if camera_config . onvif . autotracking . zooming == ZoomingModeEnum . absolute :
2023-10-22 18:59:13 +02:00
# don't zoom on initial move
if " target_box " not in self . tracked_object_metrics [ camera ] :
zoom = current_zoom_level
else :
if (
result := self . _should_zoom_in (
camera , obj , obj . obj_data [ " box " ] , debug_zoom
)
) is not None :
# divide zoom in 10 increments and always zoom out more than in
level = (
self . ptz_metrics [ camera ] [ " ptz_max_zoom " ] . value
- self . ptz_metrics [ camera ] [ " ptz_min_zoom " ] . value
2023-10-27 00:21:58 +02:00
) / 20
2023-10-22 18:59:13 +02:00
if result :
zoom = min ( 1.0 , current_zoom_level + level )
else :
zoom = max ( 0.0 , current_zoom_level - 2 * level )
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
# relative zooming concurrently with pan/tilt
if camera_config . onvif . autotracking . zooming == ZoomingModeEnum . relative :
# this is our initial zoom in on a new object
if " target_box " not in self . tracked_object_metrics [ camera ] :
zoom = target_box * * self . zoom_factor [ camera ]
if zoom > self . tracked_object_metrics [ camera ] [ " max_target_box " ] :
zoom = - ( 1 - zoom )
logger . debug (
f " { camera } : target box: { target_box } , max: { self . tracked_object_metrics [ camera ] [ ' max_target_box ' ] } , calc zoom: { zoom } "
)
else :
if (
result := self . _should_zoom_in (
camera ,
obj ,
predicted_box
if camera_config . onvif . autotracking . movement_weights
else obj . obj_data [ " box " ] ,
debug_zoom ,
)
) is not None :
# zoom value
2023-11-07 12:31:39 +01:00
ratio = (
self . tracked_object_metrics [ camera ] [ " max_target_box " ]
/ self . tracked_object_metrics [ camera ] [ " target_box " ]
2023-10-22 18:59:13 +02:00
)
2023-11-01 12:12:43 +01:00
zoom = ( ratio - 1 ) / ( ratio + 1 )
2023-11-07 12:31:39 +01:00
logger . debug (
f ' { camera } : limit: { self . tracked_object_metrics [ camera ] [ " max_target_box " ] } , ratio: { ratio } zoom calculation: { zoom } '
)
2023-10-22 18:59:13 +02:00
if not result :
# zoom out with special condition if zooming out because of velocity, edges, etc.
2023-11-01 12:12:43 +01:00
zoom = - ( 1 - zoom ) if zoom > 0 else - ( zoom * 2 + 1 )
2023-10-22 18:59:13 +02:00
if result :
# zoom in
2023-11-01 12:12:43 +01:00
zoom = 1 - zoom if zoom > 0 else ( zoom * 2 + 1 )
2023-10-22 18:59:13 +02:00
logger . debug ( f " { camera } : Zooming: { result } Zoom amount: { zoom } " )
return zoom
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
def is_autotracking ( self , camera ) :
return self . tracked_object [ camera ] is not None
def autotracked_object_region ( self , camera ) :
return self . tracked_object [ camera ] [ " region " ]
2023-07-08 14:04:47 +02:00
def autotrack_object ( self , camera , obj ) :
camera_config = self . config . cameras [ camera ]
2023-07-11 13:23:20 +02:00
if camera_config . onvif . autotracking . enabled :
2023-07-13 12:32:51 +02:00
if not self . autotracker_init [ camera ] :
2023-10-22 18:59:13 +02:00
self . _autotracker_setup ( camera_config , camera )
2023-07-13 12:32:51 +02:00
2023-09-27 13:19:10 +02:00
if self . calibrating [ camera ] :
2023-10-22 18:59:13 +02:00
logger . debug ( f " { camera } : Calibrating camera " )
2023-09-27 13:19:10 +02:00
return
2023-10-22 18:59:13 +02:00
# this is a brand new object that's on our camera, has our label, entered the zone,
# is not a false positive, and is not initially motionless
2023-07-08 14:04:47 +02:00
if (
# new object
self . tracked_object [ camera ] is None
and obj . camera == camera
and obj . obj_data [ " label " ] in self . object_types [ camera ]
and set ( obj . entered_zones ) & set ( self . required_zones [ camera ] )
and not obj . previous [ " false_positive " ]
and not obj . false_positive
2023-10-22 18:59:13 +02:00
and not self . tracked_object_history [ camera ]
2023-07-08 14:04:47 +02:00
and obj . obj_data [ " motionless_count " ] == 0
) :
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : New object: { obj . obj_data [ ' id ' ] } { obj . obj_data [ ' box ' ] } { obj . obj_data [ ' frame_time ' ] } "
2023-07-08 14:04:47 +02:00
)
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera ] [ " ptz_tracking_active " ] . set ( )
self . dispatcher . publish (
f " { camera } /ptz_autotracker/active " , " ON " , retain = False
)
2023-07-08 14:04:47 +02:00
self . tracked_object [ camera ] = obj
2023-10-22 18:59:13 +02:00
self . tracked_object_history [ camera ] . append ( copy . deepcopy ( obj . obj_data ) )
2023-07-13 12:32:51 +02:00
self . _autotrack_move_ptz ( camera , obj )
2023-07-08 14:04:47 +02:00
return
if (
# already tracking an object
self . tracked_object [ camera ] is not None
2023-10-22 18:59:13 +02:00
and self . tracked_object_history [ camera ]
2023-07-08 14:04:47 +02:00
and obj . obj_data [ " id " ] == self . tracked_object [ camera ] . obj_data [ " id " ]
2023-10-22 18:59:13 +02:00
and obj . obj_data [ " frame_time " ]
!= self . tracked_object_history [ camera ] [ - 1 ] [ " frame_time " ]
and not ptz_moving_at_frame_time (
obj . obj_data [ " frame_time " ] ,
self . ptz_metrics [ camera ] [ " ptz_start_time " ] . value ,
self . ptz_metrics [ camera ] [ " ptz_stop_time " ] . value ,
2023-07-11 13:23:20 +02:00
)
2023-10-22 18:59:13 +02:00
) :
self . tracked_object_history [ camera ] . append ( copy . deepcopy ( obj . obj_data ) )
2023-10-27 00:21:58 +02:00
self . _calculate_tracked_object_metrics ( camera , obj )
2023-07-11 13:23:20 +02:00
2023-10-27 00:21:58 +02:00
if self . tracked_object_metrics [ camera ] [ " below_distance_threshold " ] :
2023-07-08 14:04:47 +02:00
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : Existing object (do NOT move ptz): { obj . obj_data [ ' id ' ] } { obj . obj_data [ ' box ' ] } { obj . obj_data [ ' frame_time ' ] } "
2023-07-08 14:04:47 +02:00
)
2023-09-27 13:19:10 +02:00
2023-10-22 18:59:13 +02:00
# no need to move, but try zooming
self . _autotrack_move_zoom_only ( camera , obj )
else :
logger . debug (
f " { camera } : Existing object (need to move ptz): { obj . obj_data [ ' id ' ] } { obj . obj_data [ ' box ' ] } { obj . obj_data [ ' frame_time ' ] } "
)
2023-07-08 14:04:47 +02:00
2023-10-22 18:59:13 +02:00
self . _autotrack_move_ptz ( camera , obj )
2023-09-27 13:19:10 +02:00
2023-07-08 14:04:47 +02:00
return
if (
# The tracker lost an object, so let's check the previous object's region and compare it with the incoming object
# If it's within bounds, start tracking that object.
# Should we check region (maybe too broad) or expand the previous object's box a bit and check that?
self . tracked_object [ camera ] is None
and obj . camera == camera
and obj . obj_data [ " label " ] in self . object_types [ camera ]
and not obj . previous [ " false_positive " ]
and not obj . false_positive
2023-10-22 18:59:13 +02:00
and self . tracked_object_history [ camera ]
2023-07-08 14:04:47 +02:00
) :
if (
intersection_over_union (
2023-10-22 18:59:13 +02:00
self . tracked_object_history [ camera ] [ - 1 ] [ " region " ] ,
2023-07-08 14:04:47 +02:00
obj . obj_data [ " box " ] ,
)
< 0.2
) :
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : Reacquired object: { obj . obj_data [ ' id ' ] } { obj . obj_data [ ' box ' ] } { obj . obj_data [ ' frame_time ' ] } "
2023-07-08 14:04:47 +02:00
)
self . tracked_object [ camera ] = obj
2023-10-22 18:59:13 +02:00
self . tracked_object_history [ camera ] . clear ( )
self . tracked_object_history [ camera ] . append (
copy . deepcopy ( obj . obj_data )
)
2023-10-27 00:21:58 +02:00
self . _calculate_tracked_object_metrics ( camera , obj )
2023-07-13 12:32:51 +02:00
self . _autotrack_move_ptz ( camera , obj )
2023-07-08 14:04:47 +02:00
return
def end_object ( self , camera , obj ) :
if self . config . cameras [ camera ] . onvif . autotracking . enabled :
if (
self . tracked_object [ camera ] is not None
and obj . obj_data [ " id " ] == self . tracked_object [ camera ] . obj_data [ " id " ]
) :
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : End object: { obj . obj_data [ ' id ' ] } { obj . obj_data [ ' box ' ] } "
2023-07-08 14:04:47 +02:00
)
self . tracked_object [ camera ] = None
2023-10-22 18:59:13 +02:00
self . tracked_object_metrics [ camera ] = {
2023-11-01 12:12:43 +01:00
" max_target_box " : AUTOTRACKING_MAX_AREA_RATIO
* * ( 1 / self . zoom_factor [ camera ] )
2023-10-22 18:59:13 +02:00
}
2023-07-08 14:04:47 +02:00
def camera_maintenance ( self , camera ) :
2023-09-27 13:19:10 +02:00
# bail and don't check anything if we're calibrating or tracking an object
2023-10-22 18:59:13 +02:00
if (
not self . autotracker_init [ camera ]
or self . calibrating [ camera ]
or self . tracked_object [ camera ] is not None
) :
2023-09-27 13:19:10 +02:00
return
2023-07-08 14:04:47 +02:00
# calls get_camera_status to check/update ptz movement
# returns camera to preset after timeout when tracking is over
autotracker_config = self . config . cameras [ camera ] . onvif . autotracking
if not self . autotracker_init [ camera ] :
self . _autotracker_setup ( self . config . cameras [ camera ] , camera )
# regularly update camera status
2023-11-02 00:20:26 +01:00
if not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-07-08 14:04:47 +02:00
self . onvif . get_camera_status ( camera )
# return to preset if tracking is over
if (
self . tracked_object [ camera ] is None
2023-10-22 18:59:13 +02:00
and self . tracked_object_history [ camera ]
2023-07-08 14:04:47 +02:00
and (
# might want to use a different timestamp here?
2023-09-27 13:19:10 +02:00
self . ptz_metrics [ camera ] [ " ptz_frame_time " ] . value
2023-10-22 18:59:13 +02:00
- self . tracked_object_history [ camera ] [ - 1 ] [ " frame_time " ]
> = autotracker_config . timeout
2023-07-08 14:04:47 +02:00
)
and autotracker_config . return_preset
) :
2023-10-22 18:59:13 +02:00
# clear tracked object and reset zoom level
self . tracked_object [ camera ] = None
self . tracked_object_history [ camera ] . clear ( )
2023-09-27 13:19:10 +02:00
# empty move queue
while not self . move_queues [ camera ] . empty ( ) :
self . move_queues [ camera ] . get ( )
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . wait ( )
2023-07-08 14:04:47 +02:00
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera } : Time is { self . ptz_metrics [ camera ] [ ' ptz_frame_time ' ] . value } , returning to preset: { autotracker_config . return_preset } "
2023-07-08 14:04:47 +02:00
)
self . onvif . _move_to_preset (
camera ,
autotracker_config . return_preset . lower ( ) ,
)
2023-10-22 18:59:13 +02:00
# update stored zoom level from preset
2023-11-02 00:20:26 +01:00
if not self . ptz_metrics [ camera ] [ " ptz_motor_stopped " ] . is_set ( ) :
2023-10-22 18:59:13 +02:00
self . onvif . get_camera_status ( camera )
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera ] [ " ptz_tracking_active " ] . clear ( )
self . dispatcher . publish (
f " { camera } /ptz_autotracker/active " , " OFF " , retain = False
)
2023-07-13 12:32:51 +02:00
self . ptz_metrics [ camera ] [ " ptz_reset " ] . set ( )