"""Configure and control camera via onvif.""" import datetime import logging import site from enum import Enum import numpy from onvif import ONVIFCamera, ONVIFError from frigate.config import FrigateConfig from frigate.types import PTZMetricsTypes logger = logging.getLogger(__name__) class OnvifCommandEnum(str, Enum): """Holds all possible move commands""" init = "init" move_down = "move_down" move_left = "move_left" move_right = "move_right" move_up = "move_up" preset = "preset" stop = "stop" zoom_in = "zoom_in" zoom_out = "zoom_out" class OnvifController: def __init__( self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetricsTypes] ) -> None: self.cams: dict[str, ONVIFCamera] = {} self.ptz_metrics = ptz_metrics for cam_name, cam in config.cameras.items(): if not cam.enabled: continue if cam.onvif.host: try: self.cams[cam_name] = { "onvif": ONVIFCamera( cam.onvif.host, cam.onvif.port, cam.onvif.user, cam.onvif.password, wsdl_dir=site.getsitepackages()[0].replace( "dist-packages", "site-packages" ) + "/wsdl", ), "init": False, "active": False, "presets": {}, } except ONVIFError as e: logger.error(f"Onvif connection to {cam.name} failed: {e}") def _init_onvif(self, camera_name: str) -> bool: onvif: ONVIFCamera = self.cams[camera_name]["onvif"] # create init services media = onvif.create_media_service() try: profile = media.GetProfiles()[0] except ONVIFError as e: logger.error(f"Unable to connect to camera: {camera_name}: {e}") return False ptz = onvif.create_ptz_service() request = ptz.create_type("GetConfigurationOptions") request.ConfigurationToken = profile.PTZConfiguration.token ptz_config = ptz.GetConfigurationOptions(request) fov_space_id = next( ( i for i, space in enumerate( ptz_config.Spaces.RelativePanTiltTranslationSpace ) if "TranslationSpaceFov" in space["URI"] ), None, ) # setup continuous moving request move_request = ptz.create_type("ContinuousMove") move_request.ProfileToken = profile.token self.cams[camera_name]["move_request"] = move_request # setup relative moving request for autotracking move_request = ptz.create_type("RelativeMove") move_request.ProfileToken = profile.token if move_request.Translation is None and fov_space_id is not None: move_request.Translation = ptz.GetStatus( {"ProfileToken": profile.token} ).Position move_request.Translation.PanTilt.space = ptz_config["Spaces"][ "RelativePanTiltTranslationSpace" ][fov_space_id]["URI"] move_request.Translation.Zoom.space = ptz_config["Spaces"][ "RelativeZoomTranslationSpace" ][0]["URI"] if move_request.Speed is None: move_request.Speed = ptz.GetStatus({"ProfileToken": profile.token}).Position self.cams[camera_name]["relative_move_request"] = move_request # setup relative moving request for autotracking move_request = ptz.create_type("AbsoluteMove") move_request.ProfileToken = profile.token self.cams[camera_name]["absolute_move_request"] = move_request # status request for autotracking status_request = ptz.create_type("GetStatus") status_request.ProfileToken = profile.token self.cams[camera_name]["status_request"] = status_request # setup existing presets try: presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token}) except ONVIFError as e: logger.warning(f"Unable to get presets from camera: {camera_name}: {e}") presets = [] for preset in presets: self.cams[camera_name]["presets"][preset["Name"].lower()] = preset["token"] # get list of supported features ptz_config = ptz.GetConfigurationOptions(request) supported_features = [] if ptz_config.Spaces and ptz_config.Spaces.ContinuousPanTiltVelocitySpace: supported_features.append("pt") if ptz_config.Spaces and ptz_config.Spaces.ContinuousZoomVelocitySpace: supported_features.append("zoom") if ptz_config.Spaces and ptz_config.Spaces.RelativePanTiltTranslationSpace: supported_features.append("pt-r") if ptz_config.Spaces and ptz_config.Spaces.RelativeZoomTranslationSpace: supported_features.append("zoom-r") if fov_space_id is not None: supported_features.append("pt-r-fov") self.cams[camera_name][ "relative_fov_range" ] = ptz_config.Spaces.RelativePanTiltTranslationSpace[fov_space_id] self.cams[camera_name]["relative_fov_supported"] = fov_space_id is not None self.cams[camera_name]["features"] = supported_features self.cams[camera_name]["init"] = True return True def _stop(self, camera_name: str) -> None: onvif: ONVIFCamera = self.cams[camera_name]["onvif"] move_request = self.cams[camera_name]["move_request"] onvif.get_service("ptz").Stop( { "ProfileToken": move_request.ProfileToken, "PanTilt": True, "Zoom": True, } ) self.cams[camera_name]["active"] = False def _move(self, camera_name: str, command: OnvifCommandEnum) -> None: if self.cams[camera_name]["active"]: logger.warning( f"{camera_name} is already performing an action, stopping..." ) self._stop(camera_name) self.cams[camera_name]["active"] = True onvif: ONVIFCamera = self.cams[camera_name]["onvif"] move_request = self.cams[camera_name]["move_request"] if command == OnvifCommandEnum.move_left: move_request.Velocity = {"PanTilt": {"x": -0.5, "y": 0}} elif command == OnvifCommandEnum.move_right: move_request.Velocity = {"PanTilt": {"x": 0.5, "y": 0}} elif command == OnvifCommandEnum.move_up: move_request.Velocity = { "PanTilt": { "x": 0, "y": 0.5, } } elif command == OnvifCommandEnum.move_down: move_request.Velocity = { "PanTilt": { "x": 0, "y": -0.5, } } onvif.get_service("ptz").ContinuousMove(move_request) def _move_relative(self, camera_name: str, pan, tilt, speed) -> None: if not self.cams[camera_name]["relative_fov_supported"]: logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).") return logger.debug(f"{camera_name} called RelativeMove: pan: {pan} tilt: {tilt}") if self.cams[camera_name]["active"]: logger.warning( f"{camera_name} is already performing an action, not moving..." ) return self.cams[camera_name]["active"] = True self.ptz_metrics[camera_name]["ptz_stopped"].clear() logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}") self.ptz_metrics[camera_name][ "ptz_start_time" ].value = datetime.datetime.now().timestamp() self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0 onvif: ONVIFCamera = self.cams[camera_name]["onvif"] move_request = self.cams[camera_name]["relative_move_request"] # function takes in -1 to 1 for pan and tilt, interpolate to the values of the camera. # The onvif spec says this can report as +INF and -INF, so this may need to be modified pan = numpy.interp( pan, [-1, 1], [ self.cams[camera_name]["relative_fov_range"]["XRange"]["Min"], self.cams[camera_name]["relative_fov_range"]["XRange"]["Max"], ], ) tilt = numpy.interp( tilt, [-1, 1], [ self.cams[camera_name]["relative_fov_range"]["YRange"]["Min"], self.cams[camera_name]["relative_fov_range"]["YRange"]["Max"], ], ) move_request.Speed = { "PanTilt": { "x": speed, "y": speed, }, "Zoom": 0, } move_request.Translation.PanTilt.x = pan move_request.Translation.PanTilt.y = tilt move_request.Translation.Zoom.x = 0 onvif.get_service("ptz").RelativeMove(move_request) self.cams[camera_name]["active"] = False def _move_to_preset(self, camera_name: str, preset: str) -> None: if preset not in self.cams[camera_name]["presets"]: logger.error(f"{preset} is not a valid preset for {camera_name}") return self.cams[camera_name]["active"] = True self.ptz_metrics[camera_name]["ptz_stopped"].clear() move_request = self.cams[camera_name]["move_request"] onvif: ONVIFCamera = self.cams[camera_name]["onvif"] preset_token = self.cams[camera_name]["presets"][preset] onvif.get_service("ptz").GotoPreset( { "ProfileToken": move_request.ProfileToken, "PresetToken": preset_token, } ) self.ptz_metrics[camera_name]["ptz_stopped"].set() self.cams[camera_name]["active"] = False def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None: if self.cams[camera_name]["active"]: logger.warning( f"{camera_name} is already performing an action, stopping..." ) self._stop(camera_name) self.cams[camera_name]["active"] = True onvif: ONVIFCamera = self.cams[camera_name]["onvif"] move_request = self.cams[camera_name]["move_request"] if command == OnvifCommandEnum.zoom_in: move_request.Velocity = {"Zoom": {"x": 0.5}} elif command == OnvifCommandEnum.zoom_out: move_request.Velocity = {"Zoom": {"x": -0.5}} onvif.get_service("ptz").ContinuousMove(move_request) def handle_command( self, camera_name: str, command: OnvifCommandEnum, param: str = "" ) -> None: if camera_name not in self.cams.keys(): logger.error(f"Onvif is not setup for {camera_name}") return if not self.cams[camera_name]["init"]: if not self._init_onvif(camera_name): return if command == OnvifCommandEnum.init: # already init return elif command == OnvifCommandEnum.stop: self._stop(camera_name) elif command == OnvifCommandEnum.preset: self._move_to_preset(camera_name, param) elif ( command == OnvifCommandEnum.zoom_in or command == OnvifCommandEnum.zoom_out ): self._zoom(camera_name, command) else: self._move(camera_name, command) def get_camera_info(self, camera_name: str) -> dict[str, any]: if camera_name not in self.cams.keys(): logger.error(f"Onvif is not setup for {camera_name}") return {} if not self.cams[camera_name]["init"]: self._init_onvif(camera_name) return { "name": camera_name, "features": self.cams[camera_name]["features"], "presets": list(self.cams[camera_name]["presets"].keys()), } def get_camera_status(self, camera_name: str) -> dict[str, any]: if camera_name not in self.cams.keys(): logger.error(f"Onvif is not setup for {camera_name}") return {} if not self.cams[camera_name]["init"]: self._init_onvif(camera_name) onvif: ONVIFCamera = self.cams[camera_name]["onvif"] status_request = self.cams[camera_name]["status_request"] status = onvif.get_service("ptz").GetStatus(status_request) if status.MoveStatus.PanTilt == "IDLE" and status.MoveStatus.Zoom == "IDLE": self.cams[camera_name]["active"] = False if not self.ptz_metrics[camera_name]["ptz_stopped"].is_set(): self.ptz_metrics[camera_name]["ptz_stopped"].set() logger.debug(f"PTZ stop time: {datetime.datetime.now().timestamp()}") self.ptz_metrics[camera_name][ "ptz_stop_time" ].value = datetime.datetime.now().timestamp() else: self.cams[camera_name]["active"] = True if self.ptz_metrics[camera_name]["ptz_stopped"].is_set(): self.ptz_metrics[camera_name]["ptz_stopped"].clear() logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}") self.ptz_metrics[camera_name][ "ptz_start_time" ].value = datetime.datetime.now().timestamp() self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0 return { "pan": status.Position.PanTilt.x, "tilt": status.Position.PanTilt.y, "zoom": status.Position.Zoom.x, "pantilt_moving": status.MoveStatus.PanTilt, "zoom_moving": status.MoveStatus.Zoom, }