blakeblackshear.frigate/frigate/ptz/onvif.py
Josh Hawkins 88fc0fac8f
Basic PTZ object autotracking functionality (#6913)
* Basic functionality

* Threaded motion estimator

* Revert "Threaded motion estimator"

This reverts commit 3171801607.

* Don't detect motion when ptz is moving

* fix motion logic

* fix mypy error

* Add threaded queue for movement for slower ptzs

* Move queues per camera

* Move autotracker start to app.py

* iou value for tracked object

* mqtt callback

* tracked object should be initially motionless

* only draw thicker box if autotracking is enabled

* Init if enabled when initially disabled in config

* Fix init

* Thread names

* Always use motion estimator

* docs

* clarify fov support

* remove size ratio

* use mp event instead of value for ptz status

* update autotrack at half fps

* fix merge conflict

* fix event type for mypy

* clean up

* Clean up

* remove unused code

* merge conflict fix

* docs: update link to object_detectors page

* Update docs/docs/configuration/autotracking.md

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>

* clarify wording

* pass actual instances directly

* default return preset

* fix type

* Error message when onvif init fails

* disable autotracking if onvif init fails

* disable autotracking if onvif init fails

* ptz module

* verify required_zones in config

* update util after dev merge

---------

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2023-07-08 07:04:47 -05:00

367 lines
13 KiB
Python

"""Configure and control camera via onvif."""
import logging
import site
from enum import Enum
import numpy
from onvif import ONVIFCamera, ONVIFError
from frigate.config import FrigateConfig
from frigate.types import CameraMetricsTypes
logger = logging.getLogger(__name__)
class OnvifCommandEnum(str, Enum):
"""Holds all possible move commands"""
init = "init"
move_down = "move_down"
move_left = "move_left"
move_right = "move_right"
move_up = "move_up"
preset = "preset"
stop = "stop"
zoom_in = "zoom_in"
zoom_out = "zoom_out"
class OnvifController:
def __init__(
self, config: FrigateConfig, camera_metrics: dict[str, CameraMetricsTypes]
) -> None:
self.cams: dict[str, ONVIFCamera] = {}
self.camera_metrics = camera_metrics
for cam_name, cam in config.cameras.items():
if not cam.enabled:
continue
if cam.onvif.host:
try:
self.cams[cam_name] = {
"onvif": ONVIFCamera(
cam.onvif.host,
cam.onvif.port,
cam.onvif.user,
cam.onvif.password,
wsdl_dir=site.getsitepackages()[0].replace(
"dist-packages", "site-packages"
)
+ "/wsdl",
),
"init": False,
"active": False,
"presets": {},
}
except ONVIFError as e:
logger.error(f"Onvif connection to {cam.name} failed: {e}")
def _init_onvif(self, camera_name: str) -> bool:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
# create init services
media = onvif.create_media_service()
try:
profile = media.GetProfiles()[0]
except ONVIFError as e:
logger.error(f"Unable to connect to camera: {camera_name}: {e}")
return False
ptz = onvif.create_ptz_service()
request = ptz.create_type("GetConfigurationOptions")
request.ConfigurationToken = profile.PTZConfiguration.token
ptz_config = ptz.GetConfigurationOptions(request)
fov_space_id = next(
(
i
for i, space in enumerate(
ptz_config.Spaces.RelativePanTiltTranslationSpace
)
if "TranslationSpaceFov" in space["URI"]
),
None,
)
# setup continuous moving request
move_request = ptz.create_type("ContinuousMove")
move_request.ProfileToken = profile.token
self.cams[camera_name]["move_request"] = move_request
# setup relative moving request for autotracking
move_request = ptz.create_type("RelativeMove")
move_request.ProfileToken = profile.token
if move_request.Translation is None and fov_space_id is not None:
move_request.Translation = ptz.GetStatus(
{"ProfileToken": profile.token}
).Position
move_request.Translation.PanTilt.space = ptz_config["Spaces"][
"RelativePanTiltTranslationSpace"
][fov_space_id]["URI"]
move_request.Translation.Zoom.space = ptz_config["Spaces"][
"RelativeZoomTranslationSpace"
][0]["URI"]
if move_request.Speed is None:
move_request.Speed = ptz.GetStatus({"ProfileToken": profile.token}).Position
self.cams[camera_name]["relative_move_request"] = move_request
# setup relative moving request for autotracking
move_request = ptz.create_type("AbsoluteMove")
move_request.ProfileToken = profile.token
self.cams[camera_name]["absolute_move_request"] = move_request
# status request for autotracking
status_request = ptz.create_type("GetStatus")
status_request.ProfileToken = profile.token
self.cams[camera_name]["status_request"] = status_request
# setup existing presets
try:
presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token})
except ONVIFError as e:
logger.warning(f"Unable to get presets from camera: {camera_name}: {e}")
presets = []
for preset in presets:
self.cams[camera_name]["presets"][preset["Name"].lower()] = preset["token"]
# get list of supported features
ptz_config = ptz.GetConfigurationOptions(request)
supported_features = []
if ptz_config.Spaces and ptz_config.Spaces.ContinuousPanTiltVelocitySpace:
supported_features.append("pt")
if ptz_config.Spaces and ptz_config.Spaces.ContinuousZoomVelocitySpace:
supported_features.append("zoom")
if ptz_config.Spaces and ptz_config.Spaces.RelativePanTiltTranslationSpace:
supported_features.append("pt-r")
if ptz_config.Spaces and ptz_config.Spaces.RelativeZoomTranslationSpace:
supported_features.append("zoom-r")
if fov_space_id is not None:
supported_features.append("pt-r-fov")
self.cams[camera_name][
"relative_fov_range"
] = ptz_config.Spaces.RelativePanTiltTranslationSpace[fov_space_id]
self.cams[camera_name]["relative_fov_supported"] = fov_space_id is not None
self.cams[camera_name]["features"] = supported_features
self.cams[camera_name]["init"] = True
return True
def _stop(self, camera_name: str) -> None:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"]
onvif.get_service("ptz").Stop(
{
"ProfileToken": move_request.ProfileToken,
"PanTilt": True,
"Zoom": True,
}
)
self.cams[camera_name]["active"] = False
def _move(self, camera_name: str, command: OnvifCommandEnum) -> None:
if self.cams[camera_name]["active"]:
logger.warning(
f"{camera_name} is already performing an action, stopping..."
)
self._stop(camera_name)
self.cams[camera_name]["active"] = True
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"]
if command == OnvifCommandEnum.move_left:
move_request.Velocity = {"PanTilt": {"x": -0.5, "y": 0}}
elif command == OnvifCommandEnum.move_right:
move_request.Velocity = {"PanTilt": {"x": 0.5, "y": 0}}
elif command == OnvifCommandEnum.move_up:
move_request.Velocity = {
"PanTilt": {
"x": 0,
"y": 0.5,
}
}
elif command == OnvifCommandEnum.move_down:
move_request.Velocity = {
"PanTilt": {
"x": 0,
"y": -0.5,
}
}
onvif.get_service("ptz").ContinuousMove(move_request)
def _move_relative(self, camera_name: str, pan, tilt, speed) -> None:
if not self.cams[camera_name]["relative_fov_supported"]:
logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).")
return
logger.debug(f"{camera_name} called RelativeMove: pan: {pan} tilt: {tilt}")
self.get_camera_status(camera_name)
if self.cams[camera_name]["active"]:
logger.warning(
f"{camera_name} is already performing an action, not moving..."
)
return
self.cams[camera_name]["active"] = True
self.camera_metrics[camera_name]["ptz_stopped"].clear()
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["relative_move_request"]
# function takes in -1 to 1 for pan and tilt, interpolate to the values of the camera.
# The onvif spec says this can report as +INF and -INF, so this may need to be modified
pan = numpy.interp(
pan,
[-1, 1],
[
self.cams[camera_name]["relative_fov_range"]["XRange"]["Min"],
self.cams[camera_name]["relative_fov_range"]["XRange"]["Max"],
],
)
tilt = numpy.interp(
tilt,
[-1, 1],
[
self.cams[camera_name]["relative_fov_range"]["YRange"]["Min"],
self.cams[camera_name]["relative_fov_range"]["YRange"]["Max"],
],
)
move_request.Speed = {
"PanTilt": {
"x": speed,
"y": speed,
},
"Zoom": 0,
}
# move pan and tilt separately
move_request.Translation.PanTilt.x = pan
move_request.Translation.PanTilt.y = 0
move_request.Translation.Zoom.x = 0
onvif.get_service("ptz").RelativeMove(move_request)
move_request.Translation.PanTilt.x = 0
move_request.Translation.PanTilt.y = tilt
move_request.Translation.Zoom.x = 0
onvif.get_service("ptz").RelativeMove(move_request)
self.cams[camera_name]["active"] = False
def _move_to_preset(self, camera_name: str, preset: str) -> None:
if preset not in self.cams[camera_name]["presets"]:
logger.error(f"{preset} is not a valid preset for {camera_name}")
return
self.cams[camera_name]["active"] = True
self.camera_metrics[camera_name]["ptz_stopped"].clear()
move_request = self.cams[camera_name]["move_request"]
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
preset_token = self.cams[camera_name]["presets"][preset]
onvif.get_service("ptz").GotoPreset(
{
"ProfileToken": move_request.ProfileToken,
"PresetToken": preset_token,
}
)
self.camera_metrics[camera_name]["ptz_stopped"].set()
self.cams[camera_name]["active"] = False
def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None:
if self.cams[camera_name]["active"]:
logger.warning(
f"{camera_name} is already performing an action, stopping..."
)
self._stop(camera_name)
self.cams[camera_name]["active"] = True
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"]
if command == OnvifCommandEnum.zoom_in:
move_request.Velocity = {"Zoom": {"x": 0.5}}
elif command == OnvifCommandEnum.zoom_out:
move_request.Velocity = {"Zoom": {"x": -0.5}}
onvif.get_service("ptz").ContinuousMove(move_request)
def handle_command(
self, camera_name: str, command: OnvifCommandEnum, param: str = ""
) -> None:
if camera_name not in self.cams.keys():
logger.error(f"Onvif is not setup for {camera_name}")
return
if not self.cams[camera_name]["init"]:
if not self._init_onvif(camera_name):
return
if command == OnvifCommandEnum.init:
# already init
return
elif command == OnvifCommandEnum.stop:
self._stop(camera_name)
elif command == OnvifCommandEnum.preset:
self._move_to_preset(camera_name, param)
elif (
command == OnvifCommandEnum.zoom_in or command == OnvifCommandEnum.zoom_out
):
self._zoom(camera_name, command)
else:
self._move(camera_name, command)
def get_camera_info(self, camera_name: str) -> dict[str, any]:
if camera_name not in self.cams.keys():
logger.error(f"Onvif is not setup for {camera_name}")
return {}
if not self.cams[camera_name]["init"]:
self._init_onvif(camera_name)
return {
"name": camera_name,
"features": self.cams[camera_name]["features"],
"presets": list(self.cams[camera_name]["presets"].keys()),
}
def get_camera_status(self, camera_name: str) -> dict[str, any]:
if camera_name not in self.cams.keys():
logger.error(f"Onvif is not setup for {camera_name}")
return {}
if not self.cams[camera_name]["init"]:
self._init_onvif(camera_name)
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
status_request = self.cams[camera_name]["status_request"]
status = onvif.get_service("ptz").GetStatus(status_request)
if status.MoveStatus.PanTilt == "IDLE" or status.MoveStatus.Zoom == "IDLE":
self.cams[camera_name]["active"] = False
self.camera_metrics[camera_name]["ptz_stopped"].set()
else:
self.cams[camera_name]["active"] = True
self.camera_metrics[camera_name]["ptz_stopped"].clear()
return {
"pan": status.Position.PanTilt.x,
"tilt": status.Position.PanTilt.y,
"zoom": status.Position.Zoom.x,
"pantilt_moving": status.MoveStatus.PanTilt,
"zoom_moving": status.MoveStatus.Zoom,
}