mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-26 19:06:11 +01:00
381 lines
14 KiB
Python
381 lines
14 KiB
Python
"""Configure and control camera via onvif."""
|
|
|
|
import datetime
|
|
import logging
|
|
import site
|
|
from enum import Enum
|
|
|
|
import numpy
|
|
from onvif import ONVIFCamera, ONVIFError
|
|
|
|
from frigate.config import FrigateConfig
|
|
from frigate.types import PTZMetricsTypes
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OnvifCommandEnum(str, Enum):
|
|
"""Holds all possible move commands"""
|
|
|
|
init = "init"
|
|
move_down = "move_down"
|
|
move_left = "move_left"
|
|
move_right = "move_right"
|
|
move_up = "move_up"
|
|
preset = "preset"
|
|
stop = "stop"
|
|
zoom_in = "zoom_in"
|
|
zoom_out = "zoom_out"
|
|
|
|
|
|
class OnvifController:
|
|
def __init__(
|
|
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetricsTypes]
|
|
) -> None:
|
|
self.cams: dict[str, ONVIFCamera] = {}
|
|
self.ptz_metrics = ptz_metrics
|
|
|
|
for cam_name, cam in config.cameras.items():
|
|
if not cam.enabled:
|
|
continue
|
|
|
|
if cam.onvif.host:
|
|
try:
|
|
self.cams[cam_name] = {
|
|
"onvif": ONVIFCamera(
|
|
cam.onvif.host,
|
|
cam.onvif.port,
|
|
cam.onvif.user,
|
|
cam.onvif.password,
|
|
wsdl_dir=site.getsitepackages()[0].replace(
|
|
"dist-packages", "site-packages"
|
|
)
|
|
+ "/wsdl",
|
|
),
|
|
"init": False,
|
|
"active": False,
|
|
"features": [],
|
|
"presets": {},
|
|
}
|
|
except ONVIFError as e:
|
|
logger.error(f"Onvif connection to {cam.name} failed: {e}")
|
|
|
|
def _init_onvif(self, camera_name: str) -> bool:
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
|
|
# create init services
|
|
media = onvif.create_media_service()
|
|
|
|
try:
|
|
profile = media.GetProfiles()[0]
|
|
except ONVIFError as e:
|
|
logger.error(f"Unable to connect to camera: {camera_name}: {e}")
|
|
return False
|
|
|
|
ptz = onvif.create_ptz_service()
|
|
request = ptz.create_type("GetConfigurationOptions")
|
|
request.ConfigurationToken = profile.PTZConfiguration.token
|
|
ptz_config = ptz.GetConfigurationOptions(request)
|
|
|
|
fov_space_id = next(
|
|
(
|
|
i
|
|
for i, space in enumerate(
|
|
ptz_config.Spaces.RelativePanTiltTranslationSpace
|
|
)
|
|
if "TranslationSpaceFov" in space["URI"]
|
|
),
|
|
None,
|
|
)
|
|
|
|
# setup continuous moving request
|
|
move_request = ptz.create_type("ContinuousMove")
|
|
move_request.ProfileToken = profile.token
|
|
self.cams[camera_name]["move_request"] = move_request
|
|
|
|
# setup relative moving request for autotracking
|
|
move_request = ptz.create_type("RelativeMove")
|
|
move_request.ProfileToken = profile.token
|
|
if move_request.Translation is None and fov_space_id is not None:
|
|
move_request.Translation = ptz.GetStatus(
|
|
{"ProfileToken": profile.token}
|
|
).Position
|
|
move_request.Translation.PanTilt.space = ptz_config["Spaces"][
|
|
"RelativePanTiltTranslationSpace"
|
|
][fov_space_id]["URI"]
|
|
move_request.Translation.Zoom.space = ptz_config["Spaces"][
|
|
"RelativeZoomTranslationSpace"
|
|
][0]["URI"]
|
|
if move_request.Speed is None:
|
|
move_request.Speed = ptz.GetStatus({"ProfileToken": profile.token}).Position
|
|
self.cams[camera_name]["relative_move_request"] = move_request
|
|
|
|
# setup relative moving request for autotracking
|
|
move_request = ptz.create_type("AbsoluteMove")
|
|
move_request.ProfileToken = profile.token
|
|
self.cams[camera_name]["absolute_move_request"] = move_request
|
|
|
|
# status request for autotracking
|
|
status_request = ptz.create_type("GetStatus")
|
|
status_request.ProfileToken = profile.token
|
|
self.cams[camera_name]["status_request"] = status_request
|
|
|
|
# setup existing presets
|
|
try:
|
|
presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token})
|
|
except ONVIFError as e:
|
|
logger.warning(f"Unable to get presets from camera: {camera_name}: {e}")
|
|
presets = []
|
|
|
|
for preset in presets:
|
|
self.cams[camera_name]["presets"][preset["Name"].lower()] = preset["token"]
|
|
|
|
# get list of supported features
|
|
ptz_config = ptz.GetConfigurationOptions(request)
|
|
supported_features = []
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.ContinuousPanTiltVelocitySpace:
|
|
supported_features.append("pt")
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.ContinuousZoomVelocitySpace:
|
|
supported_features.append("zoom")
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.RelativePanTiltTranslationSpace:
|
|
supported_features.append("pt-r")
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.RelativeZoomTranslationSpace:
|
|
supported_features.append("zoom-r")
|
|
|
|
if fov_space_id is not None:
|
|
supported_features.append("pt-r-fov")
|
|
self.cams[camera_name][
|
|
"relative_fov_range"
|
|
] = ptz_config.Spaces.RelativePanTiltTranslationSpace[fov_space_id]
|
|
|
|
self.cams[camera_name]["relative_fov_supported"] = fov_space_id is not None
|
|
|
|
self.cams[camera_name]["features"] = supported_features
|
|
|
|
self.cams[camera_name]["init"] = True
|
|
return True
|
|
|
|
def _stop(self, camera_name: str) -> None:
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
onvif.get_service("ptz").Stop(
|
|
{
|
|
"ProfileToken": move_request.ProfileToken,
|
|
"PanTilt": True,
|
|
"Zoom": True,
|
|
}
|
|
)
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def _move(self, camera_name: str, command: OnvifCommandEnum) -> None:
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, stopping..."
|
|
)
|
|
self._stop(camera_name)
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
|
|
if command == OnvifCommandEnum.move_left:
|
|
move_request.Velocity = {"PanTilt": {"x": -0.5, "y": 0}}
|
|
elif command == OnvifCommandEnum.move_right:
|
|
move_request.Velocity = {"PanTilt": {"x": 0.5, "y": 0}}
|
|
elif command == OnvifCommandEnum.move_up:
|
|
move_request.Velocity = {
|
|
"PanTilt": {
|
|
"x": 0,
|
|
"y": 0.5,
|
|
}
|
|
}
|
|
elif command == OnvifCommandEnum.move_down:
|
|
move_request.Velocity = {
|
|
"PanTilt": {
|
|
"x": 0,
|
|
"y": -0.5,
|
|
}
|
|
}
|
|
|
|
onvif.get_service("ptz").ContinuousMove(move_request)
|
|
|
|
def _move_relative(self, camera_name: str, pan, tilt, speed) -> None:
|
|
if not self.cams[camera_name]["relative_fov_supported"]:
|
|
logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).")
|
|
return
|
|
|
|
logger.debug(f"{camera_name} called RelativeMove: pan: {pan} tilt: {tilt}")
|
|
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, not moving..."
|
|
)
|
|
return
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
|
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
|
|
self.ptz_metrics[camera_name][
|
|
"ptz_start_time"
|
|
].value = datetime.datetime.now().timestamp()
|
|
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["relative_move_request"]
|
|
|
|
# function takes in -1 to 1 for pan and tilt, interpolate to the values of the camera.
|
|
# The onvif spec says this can report as +INF and -INF, so this may need to be modified
|
|
pan = numpy.interp(
|
|
pan,
|
|
[-1, 1],
|
|
[
|
|
self.cams[camera_name]["relative_fov_range"]["XRange"]["Min"],
|
|
self.cams[camera_name]["relative_fov_range"]["XRange"]["Max"],
|
|
],
|
|
)
|
|
tilt = numpy.interp(
|
|
tilt,
|
|
[-1, 1],
|
|
[
|
|
self.cams[camera_name]["relative_fov_range"]["YRange"]["Min"],
|
|
self.cams[camera_name]["relative_fov_range"]["YRange"]["Max"],
|
|
],
|
|
)
|
|
|
|
move_request.Speed = {
|
|
"PanTilt": {
|
|
"x": speed,
|
|
"y": speed,
|
|
},
|
|
"Zoom": 0,
|
|
}
|
|
|
|
move_request.Translation.PanTilt.x = pan
|
|
move_request.Translation.PanTilt.y = tilt
|
|
move_request.Translation.Zoom.x = 0
|
|
|
|
onvif.get_service("ptz").RelativeMove(move_request)
|
|
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def _move_to_preset(self, camera_name: str, preset: str) -> None:
|
|
if preset not in self.cams[camera_name]["presets"]:
|
|
logger.error(f"{preset} is not a valid preset for {camera_name}")
|
|
return
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
preset_token = self.cams[camera_name]["presets"][preset]
|
|
onvif.get_service("ptz").GotoPreset(
|
|
{
|
|
"ProfileToken": move_request.ProfileToken,
|
|
"PresetToken": preset_token,
|
|
}
|
|
)
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None:
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, stopping..."
|
|
)
|
|
self._stop(camera_name)
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
|
|
if command == OnvifCommandEnum.zoom_in:
|
|
move_request.Velocity = {"Zoom": {"x": 0.5}}
|
|
elif command == OnvifCommandEnum.zoom_out:
|
|
move_request.Velocity = {"Zoom": {"x": -0.5}}
|
|
|
|
onvif.get_service("ptz").ContinuousMove(move_request)
|
|
|
|
def handle_command(
|
|
self, camera_name: str, command: OnvifCommandEnum, param: str = ""
|
|
) -> None:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
if not self._init_onvif(camera_name):
|
|
return
|
|
|
|
if command == OnvifCommandEnum.init:
|
|
# already init
|
|
return
|
|
elif command == OnvifCommandEnum.stop:
|
|
self._stop(camera_name)
|
|
elif command == OnvifCommandEnum.preset:
|
|
self._move_to_preset(camera_name, param)
|
|
elif (
|
|
command == OnvifCommandEnum.zoom_in or command == OnvifCommandEnum.zoom_out
|
|
):
|
|
self._zoom(camera_name, command)
|
|
else:
|
|
self._move(camera_name, command)
|
|
|
|
def get_camera_info(self, camera_name: str) -> dict[str, any]:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return {}
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
self._init_onvif(camera_name)
|
|
|
|
return {
|
|
"name": camera_name,
|
|
"features": self.cams[camera_name]["features"],
|
|
"presets": list(self.cams[camera_name]["presets"].keys()),
|
|
}
|
|
|
|
def get_camera_status(self, camera_name: str) -> dict[str, any]:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return {}
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
self._init_onvif(camera_name)
|
|
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
status_request = self.cams[camera_name]["status_request"]
|
|
status = onvif.get_service("ptz").GetStatus(status_request)
|
|
|
|
if status.MoveStatus.PanTilt == "IDLE" and status.MoveStatus.Zoom == "IDLE":
|
|
self.cams[camera_name]["active"] = False
|
|
if not self.ptz_metrics[camera_name]["ptz_stopped"].is_set():
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
|
|
|
logger.debug(f"PTZ stop time: {datetime.datetime.now().timestamp()}")
|
|
|
|
self.ptz_metrics[camera_name][
|
|
"ptz_stop_time"
|
|
].value = datetime.datetime.now().timestamp()
|
|
else:
|
|
self.cams[camera_name]["active"] = True
|
|
if self.ptz_metrics[camera_name]["ptz_stopped"].is_set():
|
|
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
|
|
|
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
|
|
|
|
self.ptz_metrics[camera_name][
|
|
"ptz_start_time"
|
|
].value = datetime.datetime.now().timestamp()
|
|
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
|
|
|
return {
|
|
"pan": status.Position.PanTilt.x,
|
|
"tilt": status.Position.PanTilt.y,
|
|
"zoom": status.Position.Zoom.x,
|
|
"pantilt_moving": status.MoveStatus.PanTilt,
|
|
"zoom_moving": status.MoveStatus.Zoom,
|
|
}
|