mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-26 19:06:11 +01:00
ab50d0b006
* Add isort and ruff linter Both linters are pretty common among modern python code bases. The isort tool provides stable sorting and grouping, as well as pruning of unused imports. Ruff is a modern linter, that is very fast due to being written in rust. It can detect many common issues in a python codebase. Removes the pylint dev requirement, since ruff replaces it. * treewide: fix issues detected by ruff * treewide: fix bare except clauses * .devcontainer: Set up isort * treewide: optimize imports * treewide: apply black * treewide: make regex patterns raw strings This is necessary for escape sequences to be properly recognized.
219 lines
7.4 KiB
Python
219 lines
7.4 KiB
Python
"""Configure and control camera via onvif."""
|
|
|
|
import logging
|
|
import site
|
|
from enum import Enum
|
|
|
|
from onvif import ONVIFCamera, ONVIFError
|
|
|
|
from frigate.config import FrigateConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OnvifCommandEnum(str, Enum):
|
|
"""Holds all possible move commands"""
|
|
|
|
init = "init"
|
|
move_down = "move_down"
|
|
move_left = "move_left"
|
|
move_right = "move_right"
|
|
move_up = "move_up"
|
|
preset = "preset"
|
|
stop = "stop"
|
|
zoom_in = "zoom_in"
|
|
zoom_out = "zoom_out"
|
|
|
|
|
|
class OnvifController:
|
|
def __init__(self, config: FrigateConfig) -> None:
|
|
self.cams: dict[str, ONVIFCamera] = {}
|
|
|
|
for cam_name, cam in config.cameras.items():
|
|
if not cam.enabled:
|
|
continue
|
|
|
|
if cam.onvif.host:
|
|
try:
|
|
self.cams[cam_name] = {
|
|
"onvif": ONVIFCamera(
|
|
cam.onvif.host,
|
|
cam.onvif.port,
|
|
cam.onvif.user,
|
|
cam.onvif.password,
|
|
wsdl_dir=site.getsitepackages()[0].replace(
|
|
"dist-packages", "site-packages"
|
|
)
|
|
+ "/wsdl",
|
|
),
|
|
"init": False,
|
|
"active": False,
|
|
"presets": {},
|
|
}
|
|
except ONVIFError as e:
|
|
logger.error(f"Onvif connection to {cam.name} failed: {e}")
|
|
|
|
def _init_onvif(self, camera_name: str) -> bool:
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
|
|
# create init services
|
|
media = onvif.create_media_service()
|
|
|
|
try:
|
|
profile = media.GetProfiles()[0]
|
|
except ONVIFError as e:
|
|
logger.error(f"Unable to connect to camera: {camera_name}: {e}")
|
|
return False
|
|
|
|
ptz = onvif.create_ptz_service()
|
|
request = ptz.create_type("GetConfigurationOptions")
|
|
request.ConfigurationToken = profile.PTZConfiguration.token
|
|
|
|
# setup moving request
|
|
move_request = ptz.create_type("ContinuousMove")
|
|
move_request.ProfileToken = profile.token
|
|
self.cams[camera_name]["move_request"] = move_request
|
|
|
|
# setup existing presets
|
|
try:
|
|
presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token})
|
|
except ONVIFError as e:
|
|
logger.warning(f"Unable to get presets from camera: {camera_name}: {e}")
|
|
presets = []
|
|
|
|
for preset in presets:
|
|
self.cams[camera_name]["presets"][preset["Name"].lower()] = preset["token"]
|
|
|
|
# get list of supported features
|
|
ptz_config = ptz.GetConfigurationOptions(request)
|
|
supported_features = []
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.ContinuousPanTiltVelocitySpace:
|
|
supported_features.append("pt")
|
|
|
|
if ptz_config.Spaces and ptz_config.Spaces.ContinuousZoomVelocitySpace:
|
|
supported_features.append("zoom")
|
|
|
|
self.cams[camera_name]["features"] = supported_features
|
|
|
|
self.cams[camera_name]["init"] = True
|
|
return True
|
|
|
|
def _stop(self, camera_name: str) -> None:
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
onvif.get_service("ptz").Stop(
|
|
{
|
|
"ProfileToken": move_request.ProfileToken,
|
|
"PanTilt": True,
|
|
"Zoom": True,
|
|
}
|
|
)
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def _move(self, camera_name: str, command: OnvifCommandEnum) -> None:
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, stopping..."
|
|
)
|
|
self._stop(camera_name)
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
|
|
if command == OnvifCommandEnum.move_left:
|
|
move_request.Velocity = {"PanTilt": {"x": -0.5, "y": 0}}
|
|
elif command == OnvifCommandEnum.move_right:
|
|
move_request.Velocity = {"PanTilt": {"x": 0.5, "y": 0}}
|
|
elif command == OnvifCommandEnum.move_up:
|
|
move_request.Velocity = {
|
|
"PanTilt": {
|
|
"x": 0,
|
|
"y": 0.5,
|
|
}
|
|
}
|
|
elif command == OnvifCommandEnum.move_down:
|
|
move_request.Velocity = {
|
|
"PanTilt": {
|
|
"x": 0,
|
|
"y": -0.5,
|
|
}
|
|
}
|
|
|
|
onvif.get_service("ptz").ContinuousMove(move_request)
|
|
|
|
def _move_to_preset(self, camera_name: str, preset: str) -> None:
|
|
if preset not in self.cams[camera_name]["presets"]:
|
|
logger.error(f"{preset} is not a valid preset for {camera_name}")
|
|
return
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
preset_token = self.cams[camera_name]["presets"][preset]
|
|
onvif.get_service("ptz").GotoPreset(
|
|
{
|
|
"ProfileToken": move_request.ProfileToken,
|
|
"PresetToken": preset_token,
|
|
}
|
|
)
|
|
self.cams[camera_name]["active"] = False
|
|
|
|
def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None:
|
|
if self.cams[camera_name]["active"]:
|
|
logger.warning(
|
|
f"{camera_name} is already performing an action, stopping..."
|
|
)
|
|
self._stop(camera_name)
|
|
|
|
self.cams[camera_name]["active"] = True
|
|
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
|
move_request = self.cams[camera_name]["move_request"]
|
|
|
|
if command == OnvifCommandEnum.zoom_in:
|
|
move_request.Velocity = {"Zoom": {"x": 0.5}}
|
|
elif command == OnvifCommandEnum.zoom_out:
|
|
move_request.Velocity = {"Zoom": {"x": -0.5}}
|
|
|
|
onvif.get_service("ptz").ContinuousMove(move_request)
|
|
|
|
def handle_command(
|
|
self, camera_name: str, command: OnvifCommandEnum, param: str = ""
|
|
) -> None:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
if not self._init_onvif(camera_name):
|
|
return
|
|
|
|
if command == OnvifCommandEnum.init:
|
|
# already init
|
|
return
|
|
elif command == OnvifCommandEnum.stop:
|
|
self._stop(camera_name)
|
|
elif command == OnvifCommandEnum.preset:
|
|
self._move_to_preset(camera_name, param)
|
|
elif (
|
|
command == OnvifCommandEnum.zoom_in or command == OnvifCommandEnum.zoom_out
|
|
):
|
|
self._zoom(camera_name, command)
|
|
else:
|
|
self._move(camera_name, command)
|
|
|
|
def get_camera_info(self, camera_name: str) -> dict[str, any]:
|
|
if camera_name not in self.cams.keys():
|
|
logger.error(f"Onvif is not setup for {camera_name}")
|
|
return {}
|
|
|
|
if not self.cams[camera_name]["init"]:
|
|
self._init_onvif(camera_name)
|
|
|
|
return {
|
|
"name": camera_name,
|
|
"features": self.cams[camera_name]["features"],
|
|
"presets": list(self.cams[camera_name]["presets"].keys()),
|
|
}
|