mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-02 00:07:11 +01:00
562 lines
21 KiB
Python
562 lines
21 KiB
Python
"""Configure and control camera via onvif."""
|
|
|
|
import logging
|
|
import site
|
|
from enum import Enum
|
|
|
|
import numpy
|
|
from onvif import ONVIFCamera, ONVIFError
|
|
|
|
from frigate.config import FrigateConfig, ZoomingModeEnum
|
|
from frigate.types import PTZMetricsTypes
|
|
from frigate.util.builtin import find_by_key
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OnvifCommandEnum(str, Enum):
|
|
"""Holds all possible move commands"""
|
|
|
|
init = "init"
|
|
move_down = "move_down"
|
|
move_left = "move_left"
|
|
move_right = "move_right"
|
|
move_up = "move_up"
|
|
preset = "preset"
|
|
stop = "stop"
|
|
zoom_in = "zoom_in"
|
|
zoom_out = "zoom_out"
|
|
|
|
|
|
class OnvifController:
|
|
def __init__(
|
|
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetricsTypes]
|
|
) -> None:
|
|
self.cams: dict[str, ONVIFCamera] = {}
|
|
self.config = config
|
|
self.ptz_metrics = ptz_metrics
|
|
|
|
for cam_name, cam in config.cameras.items():
|
|
if not cam.enabled:
|
|
continue
|
|
|
|
if cam.onvif.host:
|
|
try:
|
|
self.cams[cam_name] = {
|
|
"onvif": ONVIFCamera(
|
|
cam.onvif.host,
|
|
cam.onvif.port,
|
|
cam.onvif.user,
|
|
cam.onvif.password,
|
|
wsdl_dir=site.getsitepackages()[0].replace(
|
|
"dist-packages", "site-packages"
|
|
)
|
|
+ "/wsdl",
|
|
),
|
|
"init": False,
|
|
"active": False,
|
|
"features": [],
|
|
"presets": {},
|
|
}
|
|
except ONVIFError as e:
|
|
logger.error(f"Onvif connection to {cam.name} failed: {e}")
|
|
|
|
def _init_onvif(self, camera_name: str) -> bool:
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
|
|
# create init services
|
|
media = onvif.create_media_service()
|
|
|
|
try:
|
|
profile = media.GetProfiles()[0]
|
|
except ONVIFError as e:
|
|
logger.error(f"Unable to connect to camera: {camera_name}: {e}")
|
|
return False
|
|
|
|
ptz = onvif.create_ptz_service()
|
|
|
|
request = ptz.create_type("GetConfigurations")
|
|
configs = ptz.GetConfigurations(request)[0]
|
|
|
|
request = ptz.create_type("GetConfigurationOptions")
|
|
request.ConfigurationToken = profile.PTZConfiguration.token
|
|
ptz_config = ptz.GetConfigurationOptions(request)
|
|
logger.debug(f"Onvif config for {camera_name}: {ptz_config}")
|
|
|
|
service_capabilities_request = ptz.create_type("GetServiceCapabilities")
|
|
self.cams[camera_name][
|
|
"service_capabilities_request"
|
|
] = service_capabilities_request
|
|
|
|
fov_space_id = next(
|
|
(
|
|
i
|
|
for i, space in enumerate(
|
|
ptz_config.Spaces.RelativePanTiltTranslationSpace
|
|
)
|
|
if "TranslationSpaceFov" in space["URI"]
|
|
),
|
|
None,
|
|
)
|
|
|
|
# autoracking relative panning/tilting needs a relative zoom value set to 0
|
|
# if camera supports relative movement
|
|
if self.config.cameras[camera_name].onvif.autotracking.zooming:
|
|
zoom_space_id = next(
|
|
(
|
|
i
|
|
for i, space in enumerate(
|
|
ptz_config.Spaces.RelativeZoomTranslationSpace
|
|
)
|
|
if "TranslationGenericSpace" in space["URI"]
|
|
),
|
|
None,
|
|
)
|
|
|
|
# setup continuous moving request
|
|
move_request = ptz.create_type("ContinuousMove")
|
|
move_request.ProfileToken = profile.token
|
|
self.cams[camera_name]["move_request"] = move_request
|
|
|
|
# setup relative moving request for autotracking
|
|
move_request = ptz.create_type("RelativeMove")
|
|
move_request.ProfileToken = profile.token
|
|
if move_request.Translation is None and fov_space_id is not None:
|
|
move_request.Translation = ptz.GetStatus(
|
|
{"ProfileToken": profile.token}
|
|
).Position
|
|
move_request.Translation.PanTilt.space = ptz_config["Spaces"][
|
|
"RelativePanTiltTranslationSpace"
|
|
][fov_space_id]["URI"]
|
|
|
|
# try setting relative zoom translation space
|
|
try:
|
|
if (
|
|
self.config.cameras[camera_name].onvif.autotracking.zooming
|
|
== ZoomingModeEnum.relative
|
|
):
|
|
if zoom_space_id is not None:
|
|
move_request.Translation.Zoom.space = ptz_config["Spaces"][
|
|
"RelativeZoomTranslationSpace"
|
|
][0]["URI"]
|
|
except Exception:
|
|
if (
|
|
self.config.cameras[camera_name].onvif.autotracking.zooming
|
|
== ZoomingModeEnum.relative
|
|
):
|
|
self.config.cameras[
|
|
camera_name
|
|
].onvif.autotracking.zooming = ZoomingModeEnum.disabled
|
|
logger.warning(
|
|
f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported"
|
|
)
|
|
|
|
if move_request.Speed is None:
|
|
move_request.Speed = ptz.GetStatus({"ProfileToken": profile.token}).Position
|
|
self.cams[camera_name]["relative_move_request"] = move_request
|
|
|
|
# setup absolute moving request for autotracking zooming
|
|
move_request = ptz.create_type("AbsoluteMove")
|
|
move_request.ProfileToken = profile.token
|
|
self.cams[camera_name]["absolute_move_request"] = move_request
|
|
|
|
# status request for autotracking
|
|
status_request = ptz.create_type("GetStatus")
|
|
status_request.ProfileToken = profile.token
|
|
self.cams[camera_name]["status_request"] = status_request
|
|
status = ptz.GetStatus(status_request)
|
|
logger.debug(f"Onvif status config for {camera_name}: {status}")
|
|
|
|
# setup existing presets
|
|
try:
|
|
presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token})
|
|
except ONVIFError as e:
|
|
logger.warning(f"Unable to get presets from camera: {camera_name}: {e}")
|
|
presets = []
|
|
|
|
for preset in presets:
|
|
self.cams[camera_name]["presets"][
|
|
(getattr(preset, "Name") or f"preset {preset['token']}").lower()
|
|
] = preset["token"]
|
|
|
|
# get list of supported features
|
|
ptz_config = ptz.GetConfigurationOptions(request)
|
|
supported_features = []
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.ContinuousPanTiltVelocitySpace:
|
|
supported_features.append("pt")
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.ContinuousZoomVelocitySpace:
|
|
supported_features.append("zoom")
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.RelativePanTiltTranslationSpace:
|
|
supported_features.append("pt-r")
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.RelativeZoomTranslationSpace:
|
|
supported_features.append("zoom-r")
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.AbsoluteZoomPositionSpace:
|
|
supported_features.append("zoom-a")
|
|
try:
|
|
# get camera's zoom limits from onvif config
|
|
self.cams[camera_name][
|
|
"absolute_zoom_range"
|
|
] = ptz_config.Spaces.AbsoluteZoomPositionSpace[0]
|
|
self.cams[camera_name]["zoom_limits"] = configs.ZoomLimits
|
|
except Exception:
|
|
if self.config.cameras[camera_name].onvif.autotracking.zooming:
|
|
self.config.cameras[camera_name].onvif.autotracking.zooming = False
|
|
logger.warning(
|
|
f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported"
|
|
)
|
|
|
|
# set relative pan/tilt space for autotracker
|
|
if fov_space_id is not None:
|
|
supported_features.append("pt-r-fov")
|
|
self.cams[camera_name][
|
|
"relative_fov_range"
|
|
] = ptz_config.Spaces.RelativePanTiltTranslationSpace[fov_space_id]
|
|
|
|
self.cams[camera_name]["features"] = supported_features
|
|
|
|
self.cams[camera_name]["init"] = True
|
|
return True
|
|
|
|
def _stop(self, camera_name: str) -> None:
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
onvif.get_service("ptz").Stop(
|
|
{
|
|
"ProfileToken": move_request.ProfileToken,
|
|
"PanTilt": True,
|
|
"Zoom": True,
|
|
}
|
|
)
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def _move(self, camera_name: str, command: OnvifCommandEnum) -> None:
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, stopping..."
|
|
)
|
|
self._stop(camera_name)
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
|
|
if command == OnvifCommandEnum.move_left:
|
|
move_request.Velocity = {"PanTilt": {"x": -0.5, "y": 0}}
|
|
elif command == OnvifCommandEnum.move_right:
|
|
move_request.Velocity = {"PanTilt": {"x": 0.5, "y": 0}}
|
|
elif command == OnvifCommandEnum.move_up:
|
|
move_request.Velocity = {
|
|
"PanTilt": {
|
|
"x": 0,
|
|
"y": 0.5,
|
|
}
|
|
}
|
|
elif command == OnvifCommandEnum.move_down:
|
|
move_request.Velocity = {
|
|
"PanTilt": {
|
|
"x": 0,
|
|
"y": -0.5,
|
|
}
|
|
}
|
|
|
|
onvif.get_service("ptz").ContinuousMove(move_request)
|
|
|
|
def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None:
|
|
if "pt-r-fov" not in self.cams[camera_name]["features"]:
|
|
logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).")
|
|
return
|
|
|
|
logger.debug(f"{camera_name} called RelativeMove: pan: {pan} tilt: {tilt}")
|
|
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, not moving..."
|
|
)
|
|
return
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
|
logger.debug(
|
|
f"PTZ start time: {self.ptz_metrics[camera_name]['ptz_frame_time'].value}"
|
|
)
|
|
self.ptz_metrics[camera_name]["ptz_start_time"].value = self.ptz_metrics[
|
|
camera_name
|
|
]["ptz_frame_time"].value
|
|
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["relative_move_request"]
|
|
|
|
# function takes in -1 to 1 for pan and tilt, interpolate to the values of the camera.
|
|
# The onvif spec says this can report as +INF and -INF, so this may need to be modified
|
|
pan = numpy.interp(
|
|
pan,
|
|
[-1, 1],
|
|
[
|
|
self.cams[camera_name]["relative_fov_range"]["XRange"]["Min"],
|
|
self.cams[camera_name]["relative_fov_range"]["XRange"]["Max"],
|
|
],
|
|
)
|
|
tilt = numpy.interp(
|
|
tilt,
|
|
[-1, 1],
|
|
[
|
|
self.cams[camera_name]["relative_fov_range"]["YRange"]["Min"],
|
|
self.cams[camera_name]["relative_fov_range"]["YRange"]["Max"],
|
|
],
|
|
)
|
|
|
|
move_request.Speed = {
|
|
"PanTilt": {
|
|
"x": speed,
|
|
"y": speed,
|
|
},
|
|
}
|
|
|
|
move_request.Translation.PanTilt.x = pan
|
|
move_request.Translation.PanTilt.y = tilt
|
|
|
|
if "zoom-r" in self.cams[camera_name]["features"]:
|
|
move_request.Speed = {
|
|
"PanTilt": {
|
|
"x": speed,
|
|
"y": speed,
|
|
},
|
|
"Zoom": {"x": speed},
|
|
}
|
|
move_request.Translation.Zoom.x = zoom
|
|
|
|
onvif.get_service("ptz").RelativeMove(move_request)
|
|
|
|
# reset after the move request
|
|
move_request.Translation.PanTilt.x = 0
|
|
move_request.Translation.PanTilt.y = 0
|
|
|
|
if "zoom-r" in self.cams[camera_name]["features"]:
|
|
move_request.Translation.Zoom.x = 0
|
|
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def _move_to_preset(self, camera_name: str, preset: str) -> None:
|
|
if preset not in self.cams[camera_name]["presets"]:
|
|
logger.error(f"{preset} is not a valid preset for {camera_name}")
|
|
return
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
preset_token = self.cams[camera_name]["presets"][preset]
|
|
onvif.get_service("ptz").GotoPreset(
|
|
{
|
|
"ProfileToken": move_request.ProfileToken,
|
|
"PresetToken": preset_token,
|
|
}
|
|
)
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None:
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, stopping..."
|
|
)
|
|
self._stop(camera_name)
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
|
|
if command == OnvifCommandEnum.zoom_in:
|
|
move_request.Velocity = {"Zoom": {"x": 0.5}}
|
|
elif command == OnvifCommandEnum.zoom_out:
|
|
move_request.Velocity = {"Zoom": {"x": -0.5}}
|
|
|
|
onvif.get_service("ptz").ContinuousMove(move_request)
|
|
|
|
def _zoom_absolute(self, camera_name: str, zoom, speed) -> None:
|
|
if "zoom-a" not in self.cams[camera_name]["features"]:
|
|
logger.error(f"{camera_name} does not support ONVIF AbsoluteMove zooming.")
|
|
return
|
|
|
|
logger.debug(f"{camera_name} called AbsoluteMove: zoom: {zoom}")
|
|
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, not moving..."
|
|
)
|
|
return
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
|
logger.debug(
|
|
f"PTZ start time: {self.ptz_metrics[camera_name]['ptz_frame_time'].value}"
|
|
)
|
|
self.ptz_metrics[camera_name]["ptz_start_time"].value = self.ptz_metrics[
|
|
camera_name
|
|
]["ptz_frame_time"].value
|
|
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["absolute_move_request"]
|
|
|
|
# function takes in 0 to 1 for zoom, interpolate to the values of the camera.
|
|
zoom = numpy.interp(
|
|
zoom,
|
|
[0, 1],
|
|
[
|
|
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
|
|
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
|
|
],
|
|
)
|
|
|
|
move_request.Speed = {"Zoom": speed}
|
|
move_request.Position = {"Zoom": zoom}
|
|
|
|
logger.debug(f"Absolute zoom: {zoom}")
|
|
|
|
onvif.get_service("ptz").AbsoluteMove(move_request)
|
|
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def handle_command(
|
|
self, camera_name: str, command: OnvifCommandEnum, param: str = ""
|
|
) -> None:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
if not self._init_onvif(camera_name):
|
|
return
|
|
|
|
if command == OnvifCommandEnum.init:
|
|
# already init
|
|
return
|
|
elif command == OnvifCommandEnum.stop:
|
|
self._stop(camera_name)
|
|
elif command == OnvifCommandEnum.preset:
|
|
self._move_to_preset(camera_name, param)
|
|
elif (
|
|
command == OnvifCommandEnum.zoom_in or command == OnvifCommandEnum.zoom_out
|
|
):
|
|
self._zoom(camera_name, command)
|
|
else:
|
|
self._move(camera_name, command)
|
|
|
|
def get_camera_info(self, camera_name: str) -> dict[str, any]:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return {}
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
self._init_onvif(camera_name)
|
|
|
|
return {
|
|
"name": camera_name,
|
|
"features": self.cams[camera_name]["features"],
|
|
"presets": list(self.cams[camera_name]["presets"].keys()),
|
|
}
|
|
|
|
def get_service_capabilities(self, camera_name: str) -> None:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return {}
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
self._init_onvif(camera_name)
|
|
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
service_capabilities_request = self.cams[camera_name][
|
|
"service_capabilities_request"
|
|
]
|
|
service_capabilities = onvif.get_service("ptz").GetServiceCapabilities(
|
|
service_capabilities_request
|
|
)
|
|
|
|
logger.debug(
|
|
f"Onvif service capabilities for {camera_name}: {service_capabilities}"
|
|
)
|
|
|
|
# MoveStatus is required for autotracking - should return "true" if supported
|
|
return find_by_key(vars(service_capabilities), "MoveStatus")
|
|
|
|
def get_camera_status(self, camera_name: str) -> None:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return {}
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
self._init_onvif(camera_name)
|
|
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
status_request = self.cams[camera_name]["status_request"]
|
|
status = onvif.get_service("ptz").GetStatus(status_request)
|
|
|
|
# there doesn't seem to be an onvif standard with this optional parameter
|
|
# some cameras can report MoveStatus with or without PanTilt or Zoom attributes
|
|
pan_tilt_status = getattr(status.MoveStatus, "PanTilt", None)
|
|
zoom_status = getattr(status.MoveStatus, "Zoom", None)
|
|
|
|
# if it's not an attribute, see if MoveStatus even exists in the status result
|
|
if pan_tilt_status is None:
|
|
pan_tilt_status = getattr(status, "MoveStatus", None)
|
|
|
|
# we're unsupported
|
|
if pan_tilt_status is None or pan_tilt_status.lower() not in [
|
|
"idle",
|
|
"moving",
|
|
]:
|
|
logger.error(
|
|
f"Camera {camera_name} does not support the ONVIF GetStatus method. Autotracking will not function correctly and must be disabled in your config."
|
|
)
|
|
return
|
|
|
|
if pan_tilt_status.lower() == "idle" and (
|
|
zoom_status is None or zoom_status.lower() == "idle"
|
|
):
|
|
self.cams[camera_name]["active"] = False
|
|
if not self.ptz_metrics[camera_name]["ptz_stopped"].is_set():
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
|
|
|
logger.debug(
|
|
f"PTZ stop time: {self.ptz_metrics[camera_name]['ptz_frame_time'].value}"
|
|
)
|
|
|
|
self.ptz_metrics[camera_name]["ptz_stop_time"].value = self.ptz_metrics[
|
|
camera_name
|
|
]["ptz_frame_time"].value
|
|
else:
|
|
self.cams[camera_name]["active"] = True
|
|
if self.ptz_metrics[camera_name]["ptz_stopped"].is_set():
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
|
|
|
logger.debug(
|
|
f"PTZ start time: {self.ptz_metrics[camera_name]['ptz_frame_time'].value}"
|
|
)
|
|
|
|
self.ptz_metrics[camera_name][
|
|
"ptz_start_time"
|
|
].value = self.ptz_metrics[camera_name]["ptz_frame_time"].value
|
|
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
|
|
|
if (
|
|
self.config.cameras[camera_name].onvif.autotracking.zooming
|
|
== ZoomingModeEnum.absolute
|
|
):
|
|
# store absolute zoom level as 0 to 1 interpolated from the values of the camera
|
|
self.ptz_metrics[camera_name]["ptz_zoom_level"].value = numpy.interp(
|
|
round(status.Position.Zoom.x, 2),
|
|
[0, 1],
|
|
[
|
|
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
|
|
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
|
|
],
|
|
)
|
|
logger.debug(
|
|
f'Camera zoom level: {self.ptz_metrics[camera_name]["ptz_zoom_level"].value}'
|
|
)
|