2023-04-26 13:08:53 +02:00
""" Configure and control camera via onvif. """
import logging
from enum import Enum
2024-04-13 19:25:58 +02:00
from importlib . util import find_spec
from pathlib import Path
2023-05-29 12:31:17 +02:00
2023-07-08 14:04:47 +02:00
import numpy
2023-04-26 13:08:53 +02:00
from onvif import ONVIFCamera , ONVIFError
2024-02-06 00:52:47 +01:00
from zeep . exceptions import Fault , TransportError
2024-08-31 23:15:10 +02:00
from zeep . transports import Transport
2023-04-26 13:08:53 +02:00
2023-09-27 13:19:10 +02:00
from frigate . config import FrigateConfig , ZoomingModeEnum
2023-07-11 13:23:20 +02:00
from frigate . types import PTZMetricsTypes
2023-09-27 13:19:10 +02:00
from frigate . util . builtin import find_by_key
2023-04-26 13:08:53 +02:00
logger = logging . getLogger ( __name__ )
class OnvifCommandEnum ( str , Enum ) :
""" Holds all possible move commands """
init = " init "
move_down = " move_down "
move_left = " move_left "
2024-03-23 17:53:33 +01:00
move_relative = " move_relative "
2023-04-26 13:08:53 +02:00
move_right = " move_right "
move_up = " move_up "
preset = " preset "
stop = " stop "
zoom_in = " zoom_in "
zoom_out = " zoom_out "
class OnvifController :
2023-07-08 14:04:47 +02:00
def __init__ (
2023-07-11 13:23:20 +02:00
self , config : FrigateConfig , ptz_metrics : dict [ str , PTZMetricsTypes ]
2023-07-08 14:04:47 +02:00
) - > None :
2023-04-26 13:08:53 +02:00
self . cams : dict [ str , ONVIFCamera ] = { }
2023-09-27 13:19:10 +02:00
self . config = config
2023-07-11 13:23:20 +02:00
self . ptz_metrics = ptz_metrics
2023-04-26 13:08:53 +02:00
for cam_name , cam in config . cameras . items ( ) :
if not cam . enabled :
continue
if cam . onvif . host :
try :
2024-08-31 23:15:10 +02:00
transport = Transport ( timeout = 10 , operation_timeout = 10 )
2023-04-26 13:08:53 +02:00
self . cams [ cam_name ] = {
" onvif " : O NVIFCamera (
cam . onvif . host ,
cam . onvif . port ,
cam . onvif . user ,
cam . onvif . password ,
2024-04-16 22:55:24 +02:00
wsdl_dir = str (
Path ( find_spec ( " onvif " ) . origin ) . parent / " wsdl "
) . replace ( " dist-packages/onvif " , " site-packages " ) ,
2024-07-09 21:00:47 +02:00
adjust_time = cam . onvif . ignore_time_mismatch ,
2024-08-31 23:15:10 +02:00
transport = transport ,
2023-04-26 13:08:53 +02:00
) ,
" init " : False ,
" active " : False ,
2023-07-21 14:27:07 +02:00
" features " : [ ] ,
2023-04-26 13:08:53 +02:00
" presets " : { } ,
}
except ONVIFError as e :
logger . error ( f " Onvif connection to { cam . name } failed: { e } " )
def _init_onvif ( self , camera_name : str ) - > bool :
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
# create init services
media = onvif . create_media_service ( )
2024-02-10 20:41:24 +01:00
logger . debug ( f " Onvif media xaddr for { camera_name } : { media . xaddr } " )
2023-04-26 13:08:53 +02:00
try :
2024-02-06 00:52:47 +01:00
# this will fire an exception if camera is not a ptz
capabilities = onvif . get_definition ( " ptz " )
logger . debug ( f " Onvif capabilities for { camera_name } : { capabilities } " )
except ( ONVIFError , Fault , TransportError ) as e :
2024-02-10 20:41:24 +01:00
logger . error (
f " Unable to get Onvif capabilities for camera: { camera_name } : { e } "
)
2023-04-26 13:08:53 +02:00
return False
2024-02-10 20:41:24 +01:00
try :
profiles = media . GetProfiles ( )
2024-07-14 19:12:26 +02:00
logger . debug ( f " Onvif profiles for { camera_name } : { profiles } " )
2024-02-10 20:41:24 +01:00
except ( ONVIFError , Fault , TransportError ) as e :
logger . error (
f " Unable to get Onvif media profiles for camera: { camera_name } : { e } "
)
return False
profile = None
for key , onvif_profile in enumerate ( profiles ) :
if (
onvif_profile . VideoEncoderConfiguration
2024-02-24 14:49:34 +01:00
and onvif_profile . PTZConfiguration
2024-07-12 19:01:52 +02:00
and (
onvif_profile . PTZConfiguration . DefaultContinuousPanTiltVelocitySpace
is not None
or onvif_profile . PTZConfiguration . DefaultContinuousZoomVelocitySpace
is not None
)
2024-02-10 20:41:24 +01:00
) :
2024-07-14 20:29:49 +02:00
# use the first profile that has a valid ptz configuration
2024-02-10 20:41:24 +01:00
profile = onvif_profile
logger . debug ( f " Selected Onvif profile for { camera_name } : { profile } " )
break
if profile is None :
logger . error (
f " No appropriate Onvif profiles found for camera: { camera_name } . "
)
return False
2023-09-27 13:19:10 +02:00
2024-02-10 20:41:24 +01:00
# get the PTZ config for the profile
try :
configs = profile . PTZConfiguration
logger . debug (
f " Onvif ptz config for media profile in { camera_name } : { configs } "
)
except Exception as e :
logger . error (
f " Invalid Onvif PTZ configuration for camera: { camera_name } : { e } "
)
return False
2023-09-27 13:19:10 +02:00
2024-02-10 20:41:24 +01:00
ptz = onvif . create_ptz_service ( )
2023-09-27 13:19:10 +02:00
2024-03-09 15:48:31 +01:00
# setup continuous moving request
move_request = ptz . create_type ( " ContinuousMove " )
move_request . ProfileToken = profile . token
self . cams [ camera_name ] [ " move_request " ] = move_request
2023-10-22 16:08:05 +02:00
2024-03-09 15:48:31 +01:00
# extra setup for autotracking cameras
2024-02-24 14:49:34 +01:00
if (
2024-03-09 15:48:31 +01:00
self . config . cameras [ camera_name ] . onvif . autotracking . enabled_in_config
and self . config . cameras [ camera_name ] . onvif . autotracking . enabled
2024-02-24 14:49:34 +01:00
) :
2024-03-09 15:48:31 +01:00
request = ptz . create_type ( " GetConfigurationOptions " )
request . ConfigurationToken = profile . PTZConfiguration . token
ptz_config = ptz . GetConfigurationOptions ( request )
logger . debug ( f " Onvif config for { camera_name } : { ptz_config } " )
service_capabilities_request = ptz . create_type ( " GetServiceCapabilities " )
self . cams [ camera_name ] [ " service_capabilities_request " ] = (
service_capabilities_request
)
fov_space_id = next (
2023-09-27 13:19:10 +02:00
(
i
for i , space in enumerate (
2024-03-09 15:48:31 +01:00
ptz_config . Spaces . RelativePanTiltTranslationSpace
2023-09-27 13:19:10 +02:00
)
2024-03-09 15:48:31 +01:00
if " TranslationSpaceFov " in space [ " URI " ]
2023-09-27 13:19:10 +02:00
) ,
None ,
)
2024-03-09 15:48:31 +01:00
# status request for autotracking and filling ptz-parameters
status_request = ptz . create_type ( " GetStatus " )
status_request . ProfileToken = profile . token
self . cams [ camera_name ] [ " status_request " ] = status_request
try :
status = ptz . GetStatus ( status_request )
logger . debug ( f " Onvif status config for { camera_name } : { status } " )
except Exception as e :
logger . warning ( f " Unable to get status from camera: { camera_name } : { e } " )
status = None
2024-05-20 15:37:56 +02:00
# autotracking relative panning/tilting needs a relative zoom value set to 0
2024-03-09 15:48:31 +01:00
# if camera supports relative movement
2023-10-07 16:17:54 +02:00
if (
self . config . cameras [ camera_name ] . onvif . autotracking . zooming
2024-02-24 14:49:34 +01:00
!= ZoomingModeEnum . disabled
2023-10-07 16:17:54 +02:00
) :
2024-03-09 15:48:31 +01:00
zoom_space_id = next (
(
i
for i , space in enumerate (
ptz_config . Spaces . RelativeZoomTranslationSpace
)
if " TranslationGenericSpace " in space [ " URI " ]
) ,
None ,
)
2023-09-01 14:07:18 +02:00
2024-03-09 15:48:31 +01:00
# setup relative moving request for autotracking
move_request = ptz . create_type ( " RelativeMove " )
move_request . ProfileToken = profile . token
logger . debug ( f " { camera_name } : Relative move request: { move_request } " )
if move_request . Translation is None and fov_space_id is not None :
move_request . Translation = status . Position
move_request . Translation . PanTilt . space = ptz_config [ " Spaces " ] [
" RelativePanTiltTranslationSpace "
] [ fov_space_id ] [ " URI " ]
# try setting relative zoom translation space
try :
if (
self . config . cameras [ camera_name ] . onvif . autotracking . zooming
!= ZoomingModeEnum . disabled
) :
if zoom_space_id is not None :
move_request . Translation . Zoom . space = ptz_config [ " Spaces " ] [
" RelativeZoomTranslationSpace "
] [ zoom_space_id ] [ " URI " ]
else :
move_request . Translation . Zoom = [ ]
except Exception :
self . config . cameras [
camera_name
] . onvif . autotracking . zooming = ZoomingModeEnum . disabled
logger . warning (
f " Disabling autotracking zooming for { camera_name } : Relative zoom not supported "
)
2023-07-08 14:04:47 +02:00
2024-03-09 15:48:31 +01:00
if move_request . Speed is None :
move_request . Speed = configs . DefaultPTZSpeed if configs else None
logger . debug (
f " { camera_name } : Relative move request after setup: { move_request } "
)
self . cams [ camera_name ] [ " relative_move_request " ] = move_request
# setup absolute moving request for autotracking zooming
move_request = ptz . create_type ( " AbsoluteMove " )
move_request . ProfileToken = profile . token
self . cams [ camera_name ] [ " absolute_move_request " ] = move_request
2023-07-08 14:04:47 +02:00
2023-04-26 13:08:53 +02:00
# setup existing presets
try :
presets : list [ dict ] = ptz . GetPresets ( { " ProfileToken " : profile . token } )
except ONVIFError as e :
2023-05-17 14:42:56 +02:00
logger . warning ( f " Unable to get presets from camera: { camera_name } : { e } " )
presets = [ ]
2023-04-26 13:08:53 +02:00
for preset in presets :
2023-10-07 16:20:42 +02:00
self . cams [ camera_name ] [ " presets " ] [
2023-10-21 01:27:47 +02:00
( getattr ( preset , " Name " ) or f " preset { preset [ ' token ' ] } " ) . lower ( )
2023-10-07 16:20:42 +02:00
] = preset [ " token " ]
2023-04-26 13:08:53 +02:00
# get list of supported features
supported_features = [ ]
2024-02-06 00:52:47 +01:00
if configs . DefaultContinuousPanTiltVelocitySpace :
2023-04-26 13:08:53 +02:00
supported_features . append ( " pt " )
2024-02-06 00:52:47 +01:00
if configs . DefaultContinuousZoomVelocitySpace :
2023-04-26 13:08:53 +02:00
supported_features . append ( " zoom " )
2024-02-06 00:52:47 +01:00
if configs . DefaultRelativePanTiltTranslationSpace :
2023-07-08 14:04:47 +02:00
supported_features . append ( " pt-r " )
2024-02-06 00:52:47 +01:00
if configs . DefaultRelativeZoomTranslationSpace :
2023-07-08 14:04:47 +02:00
supported_features . append ( " zoom-r " )
2024-03-09 15:48:31 +01:00
if (
self . config . cameras [ camera_name ] . onvif . autotracking . enabled_in_config
and self . config . cameras [ camera_name ] . onvif . autotracking . enabled
) :
try :
# get camera's zoom limits from onvif config
self . cams [ camera_name ] [ " relative_zoom_range " ] = (
ptz_config . Spaces . RelativeZoomTranslationSpace [ 0 ]
2023-10-22 18:59:13 +02:00
)
2024-03-09 15:48:31 +01:00
except Exception :
if (
self . config . cameras [ camera_name ] . onvif . autotracking . zooming
== ZoomingModeEnum . relative
) :
self . config . cameras [
camera_name
] . onvif . autotracking . zooming = ZoomingModeEnum . disabled
logger . warning (
f " Disabling autotracking zooming for { camera_name } : Relative zoom not supported "
)
2023-07-08 14:04:47 +02:00
2024-02-06 00:52:47 +01:00
if configs . DefaultAbsoluteZoomPositionSpace :
2023-09-27 13:19:10 +02:00
supported_features . append ( " zoom-a " )
2024-03-09 15:48:31 +01:00
if (
self . config . cameras [ camera_name ] . onvif . autotracking . enabled_in_config
and self . config . cameras [ camera_name ] . onvif . autotracking . enabled
) :
try :
# get camera's zoom limits from onvif config
self . cams [ camera_name ] [ " absolute_zoom_range " ] = (
ptz_config . Spaces . AbsoluteZoomPositionSpace [ 0 ]
2023-09-27 13:19:10 +02:00
)
2024-03-09 15:48:31 +01:00
self . cams [ camera_name ] [ " zoom_limits " ] = configs . ZoomLimits
except Exception :
if self . config . cameras [ camera_name ] . onvif . autotracking . zooming :
self . config . cameras [
camera_name
] . onvif . autotracking . zooming = ZoomingModeEnum . disabled
logger . warning (
f " Disabling autotracking zooming for { camera_name } : Absolute zoom not supported "
)
2023-09-27 13:19:10 +02:00
# set relative pan/tilt space for autotracker
2024-02-06 00:52:47 +01:00
if (
2024-03-09 15:48:31 +01:00
self . config . cameras [ camera_name ] . onvif . autotracking . enabled_in_config
and self . config . cameras [ camera_name ] . onvif . autotracking . enabled
and fov_space_id is not None
2024-02-06 00:52:47 +01:00
and configs . DefaultRelativePanTiltTranslationSpace is not None
) :
2023-07-08 14:04:47 +02:00
supported_features . append ( " pt-r-fov " )
2024-03-01 00:10:13 +01:00
self . cams [ camera_name ] [ " relative_fov_range " ] = (
ptz_config . Spaces . RelativePanTiltTranslationSpace [ fov_space_id ]
)
2023-07-08 14:04:47 +02:00
2023-04-26 13:08:53 +02:00
self . cams [ camera_name ] [ " features " ] = supported_features
self . cams [ camera_name ] [ " init " ] = True
return True
def _stop ( self , camera_name : str ) - > None :
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
move_request = self . cams [ camera_name ] [ " move_request " ]
onvif . get_service ( " ptz " ) . Stop (
{
" ProfileToken " : move_request . ProfileToken ,
" PanTilt " : True ,
" Zoom " : True ,
}
)
self . cams [ camera_name ] [ " active " ] = False
def _move ( self , camera_name : str , command : OnvifCommandEnum ) - > None :
if self . cams [ camera_name ] [ " active " ] :
logger . warning (
f " { camera_name } is already performing an action, stopping... "
)
self . _stop ( camera_name )
self . cams [ camera_name ] [ " active " ] = True
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
move_request = self . cams [ camera_name ] [ " move_request " ]
if command == OnvifCommandEnum . move_left :
move_request . Velocity = { " PanTilt " : { " x " : - 0.5 , " y " : 0 } }
elif command == OnvifCommandEnum . move_right :
move_request . Velocity = { " PanTilt " : { " x " : 0.5 , " y " : 0 } }
elif command == OnvifCommandEnum . move_up :
move_request . Velocity = {
" PanTilt " : {
" x " : 0 ,
" y " : 0.5 ,
}
}
elif command == OnvifCommandEnum . move_down :
move_request . Velocity = {
" PanTilt " : {
" x " : 0 ,
" y " : - 0.5 ,
}
}
2024-04-01 21:19:24 +02:00
try :
onvif . get_service ( " ptz " ) . ContinuousMove ( move_request )
except ONVIFError as e :
logger . warning ( f " Onvif sending move request to { camera_name } failed: { e } " )
2023-04-26 13:08:53 +02:00
2023-09-27 13:19:10 +02:00
def _move_relative ( self , camera_name : str , pan , tilt , zoom , speed ) - > None :
if " pt-r-fov " not in self . cams [ camera_name ] [ " features " ] :
2023-07-08 14:04:47 +02:00
logger . error ( f " { camera_name } does not support ONVIF RelativeMove (FOV). " )
return
2023-10-22 18:59:13 +02:00
logger . debug (
f " { camera_name } called RelativeMove: pan: { pan } tilt: { tilt } zoom: { zoom } "
)
2023-07-08 14:04:47 +02:00
if self . cams [ camera_name ] [ " active " ] :
logger . warning (
f " { camera_name } is already performing an action, not moving... "
)
return
self . cams [ camera_name ] [ " active " ] = True
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera_name ] [ " ptz_motor_stopped " ] . clear ( )
2023-09-27 13:19:10 +02:00
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera_name } : PTZ start time: { self . ptz_metrics [ camera_name ] [ ' ptz_frame_time ' ] . value } "
2023-09-27 13:19:10 +02:00
)
self . ptz_metrics [ camera_name ] [ " ptz_start_time " ] . value = self . ptz_metrics [
camera_name
] [ " ptz_frame_time " ] . value
2023-07-11 13:23:20 +02:00
self . ptz_metrics [ camera_name ] [ " ptz_stop_time " ] . value = 0
2023-07-08 14:04:47 +02:00
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
move_request = self . cams [ camera_name ] [ " relative_move_request " ]
# function takes in -1 to 1 for pan and tilt, interpolate to the values of the camera.
# The onvif spec says this can report as +INF and -INF, so this may need to be modified
pan = numpy . interp (
pan ,
[ - 1 , 1 ] ,
[
self . cams [ camera_name ] [ " relative_fov_range " ] [ " XRange " ] [ " Min " ] ,
self . cams [ camera_name ] [ " relative_fov_range " ] [ " XRange " ] [ " Max " ] ,
] ,
)
tilt = numpy . interp (
tilt ,
[ - 1 , 1 ] ,
[
self . cams [ camera_name ] [ " relative_fov_range " ] [ " YRange " ] [ " Min " ] ,
self . cams [ camera_name ] [ " relative_fov_range " ] [ " YRange " ] [ " Max " ] ,
] ,
)
move_request . Speed = {
" PanTilt " : {
" x " : speed ,
" y " : speed ,
} ,
}
move_request . Translation . PanTilt . x = pan
move_request . Translation . PanTilt . y = tilt
2023-09-27 13:19:10 +02:00
2024-02-24 14:49:34 +01:00
if (
" zoom-r " in self . cams [ camera_name ] [ " features " ]
and self . config . cameras [ camera_name ] . onvif . autotracking . zooming
== ZoomingModeEnum . relative
) :
2023-09-27 13:19:10 +02:00
move_request . Speed = {
" PanTilt " : {
" x " : speed ,
" y " : speed ,
} ,
" Zoom " : { " x " : speed } ,
}
move_request . Translation . Zoom . x = zoom
2023-07-08 14:04:47 +02:00
onvif . get_service ( " ptz " ) . RelativeMove ( move_request )
2023-09-27 13:19:10 +02:00
# reset after the move request
move_request . Translation . PanTilt . x = 0
move_request . Translation . PanTilt . y = 0
2024-02-24 14:49:34 +01:00
if (
" zoom-r " in self . cams [ camera_name ] [ " features " ]
and self . config . cameras [ camera_name ] . onvif . autotracking . zooming
== ZoomingModeEnum . relative
) :
2023-09-27 13:19:10 +02:00
move_request . Translation . Zoom . x = 0
2023-07-08 14:04:47 +02:00
self . cams [ camera_name ] [ " active " ] = False
2023-04-26 13:08:53 +02:00
def _move_to_preset ( self , camera_name : str , preset : str ) - > None :
2023-05-29 12:31:17 +02:00
if preset not in self . cams [ camera_name ] [ " presets " ] :
2023-04-26 13:08:53 +02:00
logger . error ( f " { preset } is not a valid preset for { camera_name } " )
return
self . cams [ camera_name ] [ " active " ] = True
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera_name ] [ " ptz_motor_stopped " ] . clear ( )
2023-10-22 18:59:13 +02:00
self . ptz_metrics [ camera_name ] [ " ptz_start_time " ] . value = 0
self . ptz_metrics [ camera_name ] [ " ptz_stop_time " ] . value = 0
2023-04-26 13:08:53 +02:00
move_request = self . cams [ camera_name ] [ " move_request " ]
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
preset_token = self . cams [ camera_name ] [ " presets " ] [ preset ]
onvif . get_service ( " ptz " ) . GotoPreset (
{
" ProfileToken " : move_request . ProfileToken ,
" PresetToken " : preset_token ,
}
)
2023-10-22 18:59:13 +02:00
2023-04-26 13:08:53 +02:00
self . cams [ camera_name ] [ " active " ] = False
def _zoom ( self , camera_name : str , command : OnvifCommandEnum ) - > None :
if self . cams [ camera_name ] [ " active " ] :
logger . warning (
f " { camera_name } is already performing an action, stopping... "
)
self . _stop ( camera_name )
self . cams [ camera_name ] [ " active " ] = True
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
move_request = self . cams [ camera_name ] [ " move_request " ]
if command == OnvifCommandEnum . zoom_in :
move_request . Velocity = { " Zoom " : { " x " : 0.5 } }
elif command == OnvifCommandEnum . zoom_out :
move_request . Velocity = { " Zoom " : { " x " : - 0.5 } }
onvif . get_service ( " ptz " ) . ContinuousMove ( move_request )
2023-09-27 13:19:10 +02:00
def _zoom_absolute ( self , camera_name : str , zoom , speed ) - > None :
if " zoom-a " not in self . cams [ camera_name ] [ " features " ] :
logger . error ( f " { camera_name } does not support ONVIF AbsoluteMove zooming. " )
return
logger . debug ( f " { camera_name } called AbsoluteMove: zoom: { zoom } " )
if self . cams [ camera_name ] [ " active " ] :
logger . warning (
f " { camera_name } is already performing an action, not moving... "
)
return
self . cams [ camera_name ] [ " active " ] = True
2023-11-02 00:20:26 +01:00
self . ptz_metrics [ camera_name ] [ " ptz_motor_stopped " ] . clear ( )
2023-09-27 13:19:10 +02:00
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera_name } : PTZ start time: { self . ptz_metrics [ camera_name ] [ ' ptz_frame_time ' ] . value } "
2023-09-27 13:19:10 +02:00
)
self . ptz_metrics [ camera_name ] [ " ptz_start_time " ] . value = self . ptz_metrics [
camera_name
] [ " ptz_frame_time " ] . value
self . ptz_metrics [ camera_name ] [ " ptz_stop_time " ] . value = 0
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
move_request = self . cams [ camera_name ] [ " absolute_move_request " ]
# function takes in 0 to 1 for zoom, interpolate to the values of the camera.
zoom = numpy . interp (
zoom ,
[ 0 , 1 ] ,
[
self . cams [ camera_name ] [ " absolute_zoom_range " ] [ " XRange " ] [ " Min " ] ,
self . cams [ camera_name ] [ " absolute_zoom_range " ] [ " XRange " ] [ " Max " ] ,
] ,
)
move_request . Speed = { " Zoom " : speed }
move_request . Position = { " Zoom " : zoom }
2023-10-22 18:59:13 +02:00
logger . debug ( f " { camera_name } : Absolute zoom: { zoom } " )
2023-09-27 13:19:10 +02:00
onvif . get_service ( " ptz " ) . AbsoluteMove ( move_request )
self . cams [ camera_name ] [ " active " ] = False
2023-04-26 13:08:53 +02:00
def handle_command (
self , camera_name : str , command : OnvifCommandEnum , param : str = " "
) - > None :
if camera_name not in self . cams . keys ( ) :
logger . error ( f " Onvif is not setup for { camera_name } " )
return
if not self . cams [ camera_name ] [ " init " ] :
if not self . _init_onvif ( camera_name ) :
return
if command == OnvifCommandEnum . init :
# already init
return
elif command == OnvifCommandEnum . stop :
self . _stop ( camera_name )
elif command == OnvifCommandEnum . preset :
self . _move_to_preset ( camera_name , param )
2024-03-23 17:53:33 +01:00
elif command == OnvifCommandEnum . move_relative :
_ , pan , tilt = param . split ( " _ " )
self . _move_relative ( camera_name , float ( pan ) , float ( tilt ) , 0 , 1 )
2023-04-26 13:08:53 +02:00
elif (
command == OnvifCommandEnum . zoom_in or command == OnvifCommandEnum . zoom_out
) :
self . _zoom ( camera_name , command )
else :
self . _move ( camera_name , command )
def get_camera_info ( self , camera_name : str ) - > dict [ str , any ] :
if camera_name not in self . cams . keys ( ) :
logger . error ( f " Onvif is not setup for { camera_name } " )
return { }
if not self . cams [ camera_name ] [ " init " ] :
self . _init_onvif ( camera_name )
return {
" name " : camera_name ,
" features " : self . cams [ camera_name ] [ " features " ] ,
" presets " : list ( self . cams [ camera_name ] [ " presets " ] . keys ( ) ) ,
}
2023-07-08 14:04:47 +02:00
2023-09-27 13:19:10 +02:00
def get_service_capabilities ( self , camera_name : str ) - > None :
if camera_name not in self . cams . keys ( ) :
logger . error ( f " Onvif is not setup for { camera_name } " )
return { }
if not self . cams [ camera_name ] [ " init " ] :
self . _init_onvif ( camera_name )
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
service_capabilities_request = self . cams [ camera_name ] [
" service_capabilities_request "
]
2024-03-09 15:48:31 +01:00
try :
service_capabilities = onvif . get_service ( " ptz " ) . GetServiceCapabilities (
service_capabilities_request
)
2023-09-27 13:19:10 +02:00
2024-03-09 15:48:31 +01:00
logger . debug (
f " Onvif service capabilities for { camera_name } : { service_capabilities } "
)
2023-09-27 13:19:10 +02:00
2024-03-09 15:48:31 +01:00
# MoveStatus is required for autotracking - should return "true" if supported
return find_by_key ( vars ( service_capabilities ) , " MoveStatus " )
except Exception :
logger . warning (
f " Camera { camera_name } does not support the ONVIF GetServiceCapabilities method. Autotracking will not function correctly and must be disabled in your config. "
)
return False
2023-09-27 13:19:10 +02:00
def get_camera_status ( self , camera_name : str ) - > None :
2023-07-08 14:04:47 +02:00
if camera_name not in self . cams . keys ( ) :
logger . error ( f " Onvif is not setup for { camera_name } " )
return { }
if not self . cams [ camera_name ] [ " init " ] :
self . _init_onvif ( camera_name )
onvif : ONVIFCamera = self . cams [ camera_name ] [ " onvif " ]
status_request = self . cams [ camera_name ] [ " status_request " ]
2023-10-22 16:08:05 +02:00
try :
status = onvif . get_service ( " ptz " ) . GetStatus ( status_request )
except Exception :
pass # We're unsupported, that'll be reported in the next check.
2023-07-08 14:04:47 +02:00
2024-02-02 13:23:14 +01:00
try :
pan_tilt_status = getattr ( status . MoveStatus , " PanTilt " , None )
zoom_status = getattr ( status . MoveStatus , " Zoom " , None )
# if it's not an attribute, see if MoveStatus even exists in the status result
if pan_tilt_status is None :
pan_tilt_status = getattr ( status , " MoveStatus " , None )
# we're unsupported
if pan_tilt_status is None or pan_tilt_status not in [
" IDLE " ,
" MOVING " ,
] :
raise Exception
except Exception :
logger . warning (
f " Camera { camera_name } does not support the ONVIF GetStatus method. Autotracking will not function correctly and must be disabled in your config. "
)
return
2023-09-27 13:19:10 +02:00
2023-12-28 14:48:44 +01:00
if pan_tilt_status == " IDLE " and ( zoom_status is None or zoom_status == " IDLE " ) :
2023-07-08 14:04:47 +02:00
self . cams [ camera_name ] [ " active " ] = False
2023-11-02 00:20:26 +01:00
if not self . ptz_metrics [ camera_name ] [ " ptz_motor_stopped " ] . is_set ( ) :
self . ptz_metrics [ camera_name ] [ " ptz_motor_stopped " ] . set ( )
2023-07-11 13:23:20 +02:00
2023-09-27 13:19:10 +02:00
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera_name } : PTZ stop time: { self . ptz_metrics [ camera_name ] [ ' ptz_frame_time ' ] . value } "
2023-09-27 13:19:10 +02:00
)
2023-07-11 13:23:20 +02:00
2023-09-27 13:19:10 +02:00
self . ptz_metrics [ camera_name ] [ " ptz_stop_time " ] . value = self . ptz_metrics [
camera_name
] [ " ptz_frame_time " ] . value
2023-07-08 14:04:47 +02:00
else :
self . cams [ camera_name ] [ " active " ] = True
2023-11-02 00:20:26 +01:00
if self . ptz_metrics [ camera_name ] [ " ptz_motor_stopped " ] . is_set ( ) :
self . ptz_metrics [ camera_name ] [ " ptz_motor_stopped " ] . clear ( )
2023-07-11 13:23:20 +02:00
2023-09-27 13:19:10 +02:00
logger . debug (
2023-10-22 18:59:13 +02:00
f " { camera_name } : PTZ start time: { self . ptz_metrics [ camera_name ] [ ' ptz_frame_time ' ] . value } "
2023-09-27 13:19:10 +02:00
)
2023-07-11 13:23:20 +02:00
self . ptz_metrics [ camera_name ] [
" ptz_start_time "
2023-09-27 13:19:10 +02:00
] . value = self . ptz_metrics [ camera_name ] [ " ptz_frame_time " ] . value
2023-07-11 13:23:20 +02:00
self . ptz_metrics [ camera_name ] [ " ptz_stop_time " ] . value = 0
2023-07-08 14:04:47 +02:00
2023-09-27 13:19:10 +02:00
if (
self . config . cameras [ camera_name ] . onvif . autotracking . zooming
2023-10-22 18:59:13 +02:00
!= ZoomingModeEnum . disabled
2023-09-27 13:19:10 +02:00
) :
# store absolute zoom level as 0 to 1 interpolated from the values of the camera
self . ptz_metrics [ camera_name ] [ " ptz_zoom_level " ] . value = numpy . interp (
round ( status . Position . Zoom . x , 2 ) ,
[ 0 , 1 ] ,
[
self . cams [ camera_name ] [ " absolute_zoom_range " ] [ " XRange " ] [ " Min " ] ,
self . cams [ camera_name ] [ " absolute_zoom_range " ] [ " XRange " ] [ " Max " ] ,
] ,
)
logger . debug (
2023-10-22 18:59:13 +02:00
f ' { camera_name } : Camera zoom level: { self . ptz_metrics [ camera_name ] [ " ptz_zoom_level " ] . value } '
)
# some hikvision cams won't update MoveStatus, so warn if it hasn't changed
if (
2023-11-02 00:20:26 +01:00
not self . ptz_metrics [ camera_name ] [ " ptz_motor_stopped " ] . is_set ( )
2023-10-22 18:59:13 +02:00
and not self . ptz_metrics [ camera_name ] [ " ptz_reset " ] . is_set ( )
and self . ptz_metrics [ camera_name ] [ " ptz_start_time " ] . value != 0
and self . ptz_metrics [ camera_name ] [ " ptz_frame_time " ] . value
> ( self . ptz_metrics [ camera_name ] [ " ptz_start_time " ] . value + 10 )
and self . ptz_metrics [ camera_name ] [ " ptz_stop_time " ] . value == 0
) :
logger . debug (
f ' Start time: { self . ptz_metrics [ camera_name ] [ " ptz_start_time " ] . value } , Stop time: { self . ptz_metrics [ camera_name ] [ " ptz_stop_time " ] . value } , Frame time: { self . ptz_metrics [ camera_name ] [ " ptz_frame_time " ] . value } '
2023-09-27 13:19:10 +02:00
)
2023-10-22 18:59:13 +02:00
# set the stop time so we don't come back into this again and spam the logs
self . ptz_metrics [ camera_name ] [ " ptz_stop_time " ] . value = self . ptz_metrics [
camera_name
] [ " ptz_frame_time " ] . value
logger . warning ( f " Camera { camera_name } is still in ONVIF ' MOVING ' status. " )