Refactor async ONVIF (#18093)

* use async/await instead of asyncio.run()

* fix autotracking

* create cameras in same event loop that will use them

* more debug

* try using existing event loop instead of creating a new one

* merge dev

* fixes

* run get_camera_info onvifcontroller calls in dedicated loop

* move coroutine call with loop to api

* use asyncio for autotracking move queues

* clean up

* fix calibration

* improve exception logging
This commit is contained in:
Josh Hawkins 2025-05-07 08:53:29 -05:00 committed by GitHub
parent 83188e7ea4
commit da1fb935b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 208 additions and 135 deletions

View File

@ -1,5 +1,6 @@
"""Image and video apis.""" """Image and video apis."""
import asyncio
import glob import glob
import logging import logging
import math import math
@ -110,9 +111,12 @@ def imagestream(
@router.get("/{camera_name}/ptz/info") @router.get("/{camera_name}/ptz/info")
async def camera_ptz_info(request: Request, camera_name: str): async def camera_ptz_info(request: Request, camera_name: str):
if camera_name in request.app.frigate_config.cameras: if camera_name in request.app.frigate_config.cameras:
return JSONResponse( # Schedule get_camera_info in the OnvifController's event loop
content=await request.app.onvif.get_camera_info(camera_name), future = asyncio.run_coroutine_threadsafe(
request.app.onvif.get_camera_info(camera_name), request.app.onvif.loop
) )
result = future.result()
return JSONResponse(content=result)
else: else:
return JSONResponse( return JSONResponse(
content={"success": False, "message": "Camera not found"}, content={"success": False, "message": "Camera not found"},

View File

@ -699,6 +699,10 @@ class FrigateApp:
self.audio_process.terminate() self.audio_process.terminate()
self.audio_process.join() self.audio_process.join()
# stop the onvif controller
if self.onvif_controller:
self.onvif_controller.close()
# ensure the capture processes are done # ensure the capture processes are done
for camera, metrics in self.camera_metrics.items(): for camera, metrics in self.camera_metrics.items():
capture_process = metrics.capture_process capture_process = metrics.capture_process

View File

@ -3,11 +3,9 @@
import asyncio import asyncio
import copy import copy
import logging import logging
import queue
import threading import threading
import time import time
from collections import deque from collections import deque
from functools import partial
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
import cv2 import cv2
@ -169,7 +167,12 @@ class PtzAutoTrackerThread(threading.Thread):
continue continue
if camera_config.onvif.autotracking.enabled: if camera_config.onvif.autotracking.enabled:
self.ptz_autotracker.camera_maintenance(camera) future = asyncio.run_coroutine_threadsafe(
self.ptz_autotracker.camera_maintenance(camera),
self.ptz_autotracker.onvif.loop,
)
# Wait for the coroutine to complete
future.result()
else: else:
# disabled dynamically by mqtt # disabled dynamically by mqtt
if self.ptz_autotracker.tracked_object.get(camera): if self.ptz_autotracker.tracked_object.get(camera):
@ -219,9 +222,13 @@ class PtzAutoTracker:
camera_config.onvif.autotracking.enabled camera_config.onvif.autotracking.enabled
and camera_config.onvif.autotracking.enabled_in_config and camera_config.onvif.autotracking.enabled_in_config
): ):
self._autotracker_setup(camera_config, camera) future = asyncio.run_coroutine_threadsafe(
self._autotracker_setup(camera_config, camera), self.onvif.loop
)
# Wait for the coroutine to complete
future.result()
def _autotracker_setup(self, camera_config: CameraConfig, camera: str): async def _autotracker_setup(self, camera_config: CameraConfig, camera: str):
logger.debug(f"{camera}: Autotracker init") logger.debug(f"{camera}: Autotracker init")
self.object_types[camera] = camera_config.onvif.autotracking.track self.object_types[camera] = camera_config.onvif.autotracking.track
@ -242,8 +249,8 @@ class PtzAutoTracker:
self.intercept[camera] = None self.intercept[camera] = None
self.move_coefficients[camera] = [] self.move_coefficients[camera] = []
self.move_queues[camera] = queue.Queue() self.move_queues[camera] = asyncio.Queue()
self.move_queue_locks[camera] = threading.Lock() self.move_queue_locks[camera] = asyncio.Lock()
# handle onvif constructor failing due to no connection # handle onvif constructor failing due to no connection
if camera not in self.onvif.cams: if camera not in self.onvif.cams:
@ -255,7 +262,7 @@ class PtzAutoTracker:
return return
if not self.onvif.cams[camera]["init"]: if not self.onvif.cams[camera]["init"]:
if not asyncio.run(self.onvif._init_onvif(camera)): if not await self.onvif._init_onvif(camera):
logger.warning( logger.warning(
f"Disabling autotracking for {camera}: Unable to initialize onvif" f"Disabling autotracking for {camera}: Unable to initialize onvif"
) )
@ -271,7 +278,7 @@ class PtzAutoTracker:
self.ptz_metrics[camera].autotracker_enabled.value = False self.ptz_metrics[camera].autotracker_enabled.value = False
return return
move_status_supported = self.onvif.get_service_capabilities(camera) move_status_supported = await self.onvif.get_service_capabilities(camera)
if not ( if not (
isinstance(move_status_supported, bool) and move_status_supported isinstance(move_status_supported, bool) and move_status_supported
@ -287,15 +294,12 @@ class PtzAutoTracker:
return return
if self.onvif.cams[camera]["init"]: if self.onvif.cams[camera]["init"]:
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
# movement thread per camera # movement queue with asyncio on OnvifController loop
self.move_threads[camera] = threading.Thread( asyncio.run_coroutine_threadsafe(
name=f"ptz_move_thread_{camera}", self._process_move_queue(camera), self.onvif.loop
target=partial(self._process_move_queue, camera),
) )
self.move_threads[camera].daemon = True
self.move_threads[camera].start()
if camera_config.onvif.autotracking.movement_weights: if camera_config.onvif.autotracking.movement_weights:
if len(camera_config.onvif.autotracking.movement_weights) == 6: if len(camera_config.onvif.autotracking.movement_weights) == 6:
@ -330,7 +334,7 @@ class PtzAutoTracker:
) )
if camera_config.onvif.autotracking.calibrate_on_startup: if camera_config.onvif.autotracking.calibrate_on_startup:
self._calibrate_camera(camera) await self._calibrate_camera(camera)
self.ptz_metrics[camera].tracking_active.clear() self.ptz_metrics[camera].tracking_active.clear()
self.dispatcher.publish(f"{camera}/ptz_autotracker/active", "OFF", retain=False) self.dispatcher.publish(f"{camera}/ptz_autotracker/active", "OFF", retain=False)
@ -349,7 +353,7 @@ class PtzAutoTracker:
self.config.cameras[camera].onvif.autotracking.movement_weights, self.config.cameras[camera].onvif.autotracking.movement_weights,
) )
def _calibrate_camera(self, camera): async def _calibrate_camera(self, camera):
# move the camera from the preset in steps and measure the time it takes to move that amount # move the camera from the preset in steps and measure the time it takes to move that amount
# this will allow us to predict movement times with a simple linear regression # this will allow us to predict movement times with a simple linear regression
# start with 0 so we can determine a baseline (to be used as the intercept in the regression calc) # start with 0 so we can determine a baseline (to be used as the intercept in the regression calc)
@ -373,25 +377,25 @@ class PtzAutoTracker:
for i in range(2): for i in range(2):
# absolute move to 0 - fully zoomed out # absolute move to 0 - fully zoomed out
self.onvif._zoom_absolute( await self.onvif._zoom_absolute(
camera, camera,
self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Min"], self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Min"],
1, 1,
) )
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value) zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value)
self.onvif._zoom_absolute( await self.onvif._zoom_absolute(
camera, camera,
self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Max"], self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Max"],
1, 1,
) )
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
zoom_in_values.append(self.ptz_metrics[camera].zoom_level.value) zoom_in_values.append(self.ptz_metrics[camera].zoom_level.value)
@ -400,7 +404,7 @@ class PtzAutoTracker:
== ZoomingModeEnum.relative == ZoomingModeEnum.relative
): ):
# relative move to -0.01 # relative move to -0.01
self.onvif._move_relative( await self.onvif._move_relative(
camera, camera,
0, 0,
0, 0,
@ -409,13 +413,13 @@ class PtzAutoTracker:
) )
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value) zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value)
zoom_start_time = time.time() zoom_start_time = time.time()
# relative move to 0.01 # relative move to 0.01
self.onvif._move_relative( await self.onvif._move_relative(
camera, camera,
0, 0,
0, 0,
@ -424,13 +428,13 @@ class PtzAutoTracker:
) )
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
zoom_stop_time = time.time() zoom_stop_time = time.time()
full_relative_start_time = time.time() full_relative_start_time = time.time()
self.onvif._move_relative( await self.onvif._move_relative(
camera, camera,
-1, -1,
-1, -1,
@ -439,11 +443,11 @@ class PtzAutoTracker:
) )
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
full_relative_stop_time = time.time() full_relative_stop_time = time.time()
self.onvif._move_relative( await self.onvif._move_relative(
camera, camera,
1, 1,
1, 1,
@ -452,7 +456,7 @@ class PtzAutoTracker:
) )
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
self.zoom_time[camera] = ( self.zoom_time[camera] = (
full_relative_stop_time - full_relative_start_time full_relative_stop_time - full_relative_start_time
@ -471,7 +475,7 @@ class PtzAutoTracker:
self.ptz_metrics[camera].max_zoom.value = 1 self.ptz_metrics[camera].max_zoom.value = 1
self.ptz_metrics[camera].min_zoom.value = 0 self.ptz_metrics[camera].min_zoom.value = 0
self.onvif._move_to_preset( await self.onvif._move_to_preset(
camera, camera,
self.config.cameras[camera].onvif.autotracking.return_preset.lower(), self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
) )
@ -480,18 +484,18 @@ class PtzAutoTracker:
# Wait until the camera finishes moving # Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
for step in range(num_steps): for step in range(num_steps):
pan = step_sizes[step] pan = step_sizes[step]
tilt = step_sizes[step] tilt = step_sizes[step]
start_time = time.time() start_time = time.time()
self.onvif._move_relative(camera, pan, tilt, 0, 1) await self.onvif._move_relative(camera, pan, tilt, 0, 1)
# Wait until the camera finishes moving # Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
stop_time = time.time() stop_time = time.time()
self.move_metrics[camera].append( self.move_metrics[camera].append(
@ -503,7 +507,7 @@ class PtzAutoTracker:
} }
) )
self.onvif._move_to_preset( await self.onvif._move_to_preset(
camera, camera,
self.config.cameras[camera].onvif.autotracking.return_preset.lower(), self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
) )
@ -512,7 +516,7 @@ class PtzAutoTracker:
# Wait until the camera finishes moving # Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
logger.info( logger.info(
f"Calibration for {camera} in progress: {round((step / num_steps) * 100)}% complete" f"Calibration for {camera} in progress: {round((step / num_steps) * 100)}% complete"
@ -709,18 +713,17 @@ class PtzAutoTracker:
centroid_distance < self.tracked_object_metrics[camera]["distance"] centroid_distance < self.tracked_object_metrics[camera]["distance"]
) )
def _process_move_queue(self, camera): async def _process_move_queue(self, camera):
camera_config = self.config.cameras[camera] move_queue = self.move_queues[camera]
camera_config.frame_shape[1]
camera_config.frame_shape[0]
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
move_data = self.move_queues[camera].get(True, 0.1) # Asynchronously wait for move data with a timeout
except queue.Empty: move_data = await asyncio.wait_for(move_queue.get(), timeout=0.1)
except asyncio.TimeoutError:
continue continue
with self.move_queue_locks[camera]: async with self.move_queue_locks[camera]:
frame_time, pan, tilt, zoom = move_data frame_time, pan, tilt, zoom = move_data
# if we're receiving move requests during a PTZ move, ignore them # if we're receiving move requests during a PTZ move, ignore them
@ -729,8 +732,6 @@ class PtzAutoTracker:
self.ptz_metrics[camera].start_time.value, self.ptz_metrics[camera].start_time.value,
self.ptz_metrics[camera].stop_time.value, self.ptz_metrics[camera].stop_time.value,
): ):
# instead of dequeueing this might be a good place to preemptively move based
# on an estimate - for fast moving objects, etc.
logger.debug( logger.debug(
f"{camera}: Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}" f"{camera}: Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}"
) )
@ -741,25 +742,24 @@ class PtzAutoTracker:
self.config.cameras[camera].onvif.autotracking.zooming self.config.cameras[camera].onvif.autotracking.zooming
== ZoomingModeEnum.relative == ZoomingModeEnum.relative
): ):
self.onvif._move_relative(camera, pan, tilt, zoom, 1) await self.onvif._move_relative(camera, pan, tilt, zoom, 1)
else: else:
if pan != 0 or tilt != 0: if pan != 0 or tilt != 0:
self.onvif._move_relative(camera, pan, tilt, 0, 1) await self.onvif._move_relative(camera, pan, tilt, 0, 1)
# Wait until the camera finishes moving # Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
if ( if (
zoom > 0 zoom > 0
and self.ptz_metrics[camera].zoom_level.value != zoom and self.ptz_metrics[camera].zoom_level.value != zoom
): ):
self.onvif._zoom_absolute(camera, zoom, 1) await self.onvif._zoom_absolute(camera, zoom, 1)
# Wait until the camera finishes moving # Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
if self.config.cameras[camera].onvif.autotracking.movement_weights: if self.config.cameras[camera].onvif.autotracking.movement_weights:
logger.debug( logger.debug(
@ -796,6 +796,10 @@ class PtzAutoTracker:
# calculate new coefficients if we have enough data # calculate new coefficients if we have enough data
self._calculate_move_coefficients(camera) self._calculate_move_coefficients(camera)
# Clean up the queue on exit
while not move_queue.empty():
await move_queue.get()
def _enqueue_move(self, camera, frame_time, pan, tilt, zoom): def _enqueue_move(self, camera, frame_time, pan, tilt, zoom):
def split_value(value, suppress_diff=True): def split_value(value, suppress_diff=True):
clipped = np.clip(value, -1, 1) clipped = np.clip(value, -1, 1)
@ -824,7 +828,9 @@ class PtzAutoTracker:
f"{camera}: Enqueue movement for frame time: {frame_time} pan: {pan}, tilt: {tilt}, zoom: {zoom}" f"{camera}: Enqueue movement for frame time: {frame_time} pan: {pan}, tilt: {tilt}, zoom: {zoom}"
) )
move_data = (frame_time, pan, tilt, zoom) move_data = (frame_time, pan, tilt, zoom)
self.move_queues[camera].put(move_data) self.onvif.loop.call_soon_threadsafe(
self.move_queues[camera].put_nowait, move_data
)
# reset values to not split up large movements # reset values to not split up large movements
pan = 0 pan = 0
@ -1420,7 +1426,7 @@ class PtzAutoTracker:
** (1 / self.zoom_factor[camera]) ** (1 / self.zoom_factor[camera])
} }
def camera_maintenance(self, camera): async def camera_maintenance(self, camera):
# bail and don't check anything if we're calibrating or tracking an object # bail and don't check anything if we're calibrating or tracking an object
if ( if (
not self.autotracker_init[camera] not self.autotracker_init[camera]
@ -1437,7 +1443,7 @@ class PtzAutoTracker:
self._autotracker_setup(self.config.cameras[camera], camera) self._autotracker_setup(self.config.cameras[camera], camera)
# regularly update camera status # regularly update camera status
if not self.ptz_metrics[camera].motor_stopped.is_set(): if not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
# return to preset if tracking is over # return to preset if tracking is over
if ( if (
@ -1455,22 +1461,18 @@ class PtzAutoTracker:
self.tracked_object[camera] = None self.tracked_object[camera] = None
self.tracked_object_history[camera].clear() self.tracked_object_history[camera].clear()
# empty move queue
while not self.move_queues[camera].empty():
self.move_queues[camera].get()
self.ptz_metrics[camera].motor_stopped.wait() self.ptz_metrics[camera].motor_stopped.wait()
logger.debug( logger.debug(
f"{camera}: Time is {self.ptz_metrics[camera].frame_time.value}, returning to preset: {autotracker_config.return_preset}" f"{camera}: Time is {self.ptz_metrics[camera].frame_time.value}, returning to preset: {autotracker_config.return_preset}"
) )
self.onvif._move_to_preset( await self.onvif._move_to_preset(
camera, camera,
autotracker_config.return_preset.lower(), autotracker_config.return_preset.lower(),
) )
# update stored zoom level from preset # update stored zoom level from preset
if not self.ptz_metrics[camera].motor_stopped.is_set(): if not self.ptz_metrics[camera].motor_stopped.is_set():
self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
self.ptz_metrics[camera].tracking_active.clear() self.ptz_metrics[camera].tracking_active.clear()
self.dispatcher.publish( self.dispatcher.publish(

View File

@ -2,6 +2,7 @@
import asyncio import asyncio
import logging import logging
import threading
import time import time
from enum import Enum from enum import Enum
from importlib.util import find_spec from importlib.util import find_spec
@ -39,27 +40,56 @@ class OnvifController:
def __init__( def __init__(
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics] self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics]
) -> None: ) -> None:
self.cams: dict[str, ONVIFCamera] = {} self.cams: dict[str, dict] = {}
self.failed_cams: dict[str, dict] = {} self.failed_cams: dict[str, dict] = {}
self.max_retries = 5 self.max_retries = 5
self.reset_timeout = 900 # 15 minutes self.reset_timeout = 900 # 15 minutes
self.config = config self.config = config
self.ptz_metrics = ptz_metrics self.ptz_metrics = ptz_metrics
# Create a dedicated event loop and run it in a separate thread
self.loop = asyncio.new_event_loop()
self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self.loop_thread.start()
self.camera_configs = {}
for cam_name, cam in config.cameras.items(): for cam_name, cam in config.cameras.items():
if not cam.enabled: if not cam.enabled:
continue continue
if cam.onvif.host: if cam.onvif.host:
result = self._create_onvif_camera(cam_name, cam) self.camera_configs[cam_name] = cam
if result:
self.cams[cam_name] = result
def _create_onvif_camera(self, cam_name: str, cam) -> dict | None: asyncio.run_coroutine_threadsafe(self._init_cameras(), self.loop)
"""Create an ONVIF camera instance and handle failures."""
def _run_event_loop(self) -> None:
"""Run the event loop in a separate thread."""
asyncio.set_event_loop(self.loop)
try: try:
return { self.loop.run_forever()
except Exception as e:
logger.error(f"Onvif event loop terminated unexpectedly: {e}")
async def _init_cameras(self) -> None:
"""Initialize all configured cameras."""
for cam_name in self.camera_configs:
await self._init_single_camera(cam_name)
async def _init_single_camera(self, cam_name: str) -> bool:
"""Initialize a single camera by name.
Args:
cam_name: The name of the camera to initialize
Returns:
bool: True if initialization succeeded, False otherwise
"""
if cam_name not in self.camera_configs:
logger.error(f"No configuration found for camera {cam_name}")
return False
cam = self.camera_configs[cam_name]
try:
self.cams[cam_name] = {
"onvif": ONVIFCamera( "onvif": ONVIFCamera(
cam.onvif.host, cam.onvif.host,
cam.onvif.port, cam.onvif.port,
@ -74,6 +104,7 @@ class OnvifController:
"features": [], "features": [],
"presets": {}, "presets": {},
} }
return True
except (Fault, ONVIFError, TransportError, Exception) as e: except (Fault, ONVIFError, TransportError, Exception) as e:
logger.error(f"Failed to create ONVIF camera instance for {cam_name}: {e}") logger.error(f"Failed to create ONVIF camera instance for {cam_name}: {e}")
# track initial failures # track initial failures
@ -82,7 +113,7 @@ class OnvifController:
"last_error": str(e), "last_error": str(e),
"last_attempt": time.time(), "last_attempt": time.time(),
} }
return None return False
async def _init_onvif(self, camera_name: str) -> bool: async def _init_onvif(self, camera_name: str) -> bool:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"] onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
@ -240,12 +271,12 @@ class OnvifController:
logger.debug( logger.debug(
f"{camera_name}: Relative move request after deleting zoom: {move_request}" f"{camera_name}: Relative move request after deleting zoom: {move_request}"
) )
except Exception: except Exception as e:
self.config.cameras[ self.config.cameras[
camera_name camera_name
].onvif.autotracking.zooming = ZoomingModeEnum.disabled ].onvif.autotracking.zooming = ZoomingModeEnum.disabled
logger.warning( logger.warning(
f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported" f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported. Exception: {e}"
) )
if move_request.Speed is None: if move_request.Speed is None:
@ -295,7 +326,7 @@ class OnvifController:
self.cams[camera_name]["relative_zoom_range"] = ( self.cams[camera_name]["relative_zoom_range"] = (
ptz_config.Spaces.RelativeZoomTranslationSpace[0] ptz_config.Spaces.RelativeZoomTranslationSpace[0]
) )
except Exception: except Exception as e:
if ( if (
self.config.cameras[camera_name].onvif.autotracking.zooming self.config.cameras[camera_name].onvif.autotracking.zooming
== ZoomingModeEnum.relative == ZoomingModeEnum.relative
@ -304,7 +335,7 @@ class OnvifController:
camera_name camera_name
].onvif.autotracking.zooming = ZoomingModeEnum.disabled ].onvif.autotracking.zooming = ZoomingModeEnum.disabled
logger.warning( logger.warning(
f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported" f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported. Exception: {e}"
) )
if configs.DefaultAbsoluteZoomPositionSpace: if configs.DefaultAbsoluteZoomPositionSpace:
@ -319,13 +350,13 @@ class OnvifController:
ptz_config.Spaces.AbsoluteZoomPositionSpace[0] ptz_config.Spaces.AbsoluteZoomPositionSpace[0]
) )
self.cams[camera_name]["zoom_limits"] = configs.ZoomLimits self.cams[camera_name]["zoom_limits"] = configs.ZoomLimits
except Exception: except Exception as e:
if self.config.cameras[camera_name].onvif.autotracking.zooming: if self.config.cameras[camera_name].onvif.autotracking.zooming:
self.config.cameras[ self.config.cameras[
camera_name camera_name
].onvif.autotracking.zooming = ZoomingModeEnum.disabled ].onvif.autotracking.zooming = ZoomingModeEnum.disabled
logger.warning( logger.warning(
f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported" f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported. Exception: {e}"
) )
# set relative pan/tilt space for autotracker # set relative pan/tilt space for autotracker
@ -344,25 +375,23 @@ class OnvifController:
self.cams[camera_name]["init"] = True self.cams[camera_name]["init"] = True
return True return True
def _stop(self, camera_name: str) -> None: async def _stop(self, camera_name: str) -> None:
move_request = self.cams[camera_name]["move_request"] move_request = self.cams[camera_name]["move_request"]
asyncio.run( await self.cams[camera_name]["ptz"].Stop(
self.cams[camera_name]["ptz"].Stop( {
{ "ProfileToken": move_request.ProfileToken,
"ProfileToken": move_request.ProfileToken, "PanTilt": True,
"PanTilt": True, "Zoom": True,
"Zoom": True, }
}
)
) )
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
def _move(self, camera_name: str, command: OnvifCommandEnum) -> None: async def _move(self, camera_name: str, command: OnvifCommandEnum) -> None:
if self.cams[camera_name]["active"]: if self.cams[camera_name]["active"]:
logger.warning( logger.warning(
f"{camera_name} is already performing an action, stopping..." f"{camera_name} is already performing an action, stopping..."
) )
self._stop(camera_name) await self._stop(camera_name)
if "pt" not in self.cams[camera_name]["features"]: if "pt" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF pan/tilt movement.") logger.error(f"{camera_name} does not support ONVIF pan/tilt movement.")
@ -391,11 +420,11 @@ class OnvifController:
} }
try: try:
asyncio.run(self.cams[camera_name]["ptz"].ContinuousMove(move_request)) await self.cams[camera_name]["ptz"].ContinuousMove(move_request)
except (Fault, ONVIFError, TransportError, Exception) as e: except (Fault, ONVIFError, TransportError, Exception) as e:
logger.warning(f"Onvif sending move request to {camera_name} failed: {e}") logger.warning(f"Onvif sending move request to {camera_name} failed: {e}")
def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None: async def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None:
if "pt-r-fov" not in self.cams[camera_name]["features"]: if "pt-r-fov" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).") logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).")
return return
@ -464,7 +493,7 @@ class OnvifController:
} }
move_request.Translation.Zoom.x = zoom move_request.Translation.Zoom.x = zoom
asyncio.run(self.cams[camera_name]["ptz"].RelativeMove(move_request)) await self.cams[camera_name]["ptz"].RelativeMove(move_request)
# reset after the move request # reset after the move request
move_request.Translation.PanTilt.x = 0 move_request.Translation.PanTilt.x = 0
@ -479,7 +508,7 @@ class OnvifController:
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
def _move_to_preset(self, camera_name: str, preset: str) -> None: async def _move_to_preset(self, camera_name: str, preset: str) -> None:
if preset not in self.cams[camera_name]["presets"]: if preset not in self.cams[camera_name]["presets"]:
logger.error(f"{preset} is not a valid preset for {camera_name}") logger.error(f"{preset} is not a valid preset for {camera_name}")
return return
@ -489,23 +518,22 @@ class OnvifController:
self.ptz_metrics[camera_name].stop_time.value = 0 self.ptz_metrics[camera_name].stop_time.value = 0
move_request = self.cams[camera_name]["move_request"] move_request = self.cams[camera_name]["move_request"]
preset_token = self.cams[camera_name]["presets"][preset] preset_token = self.cams[camera_name]["presets"][preset]
asyncio.run(
self.cams[camera_name]["ptz"].GotoPreset( await self.cams[camera_name]["ptz"].GotoPreset(
{ {
"ProfileToken": move_request.ProfileToken, "ProfileToken": move_request.ProfileToken,
"PresetToken": preset_token, "PresetToken": preset_token,
} }
)
) )
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None: async def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None:
if self.cams[camera_name]["active"]: if self.cams[camera_name]["active"]:
logger.warning( logger.warning(
f"{camera_name} is already performing an action, stopping..." f"{camera_name} is already performing an action, stopping..."
) )
self._stop(camera_name) await self._stop(camera_name)
if "zoom" not in self.cams[camera_name]["features"]: if "zoom" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF zooming.") logger.error(f"{camera_name} does not support ONVIF zooming.")
@ -519,9 +547,9 @@ class OnvifController:
elif command == OnvifCommandEnum.zoom_out: elif command == OnvifCommandEnum.zoom_out:
move_request.Velocity = {"Zoom": {"x": -0.5}} move_request.Velocity = {"Zoom": {"x": -0.5}}
asyncio.run(self.cams[camera_name]["ptz"].ContinuousMove(move_request)) await self.cams[camera_name]["ptz"].ContinuousMove(move_request)
def _zoom_absolute(self, camera_name: str, zoom, speed) -> None: async def _zoom_absolute(self, camera_name: str, zoom, speed) -> None:
if "zoom-a" not in self.cams[camera_name]["features"]: if "zoom-a" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF AbsoluteMove zooming.") logger.error(f"{camera_name} does not support ONVIF AbsoluteMove zooming.")
return return
@ -560,19 +588,20 @@ class OnvifController:
logger.debug(f"{camera_name}: Absolute zoom: {zoom}") logger.debug(f"{camera_name}: Absolute zoom: {zoom}")
asyncio.run(self.cams[camera_name]["ptz"].AbsoluteMove(move_request)) await self.cams[camera_name]["ptz"].AbsoluteMove(move_request)
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
def handle_command( async def handle_command_async(
self, camera_name: str, command: OnvifCommandEnum, param: str = "" self, camera_name: str, command: OnvifCommandEnum, param: str = ""
) -> None: ) -> None:
"""Handle ONVIF commands asynchronously"""
if camera_name not in self.cams.keys(): if camera_name not in self.cams.keys():
logger.error(f"ONVIF is not configured for {camera_name}") logger.error(f"ONVIF is not configured for {camera_name}")
return return
if not self.cams[camera_name]["init"]: if not self.cams[camera_name]["init"]:
if not asyncio.run(self._init_onvif(camera_name)): if not await self._init_onvif(camera_name):
return return
try: try:
@ -580,22 +609,43 @@ class OnvifController:
# already init # already init
return return
elif command == OnvifCommandEnum.stop: elif command == OnvifCommandEnum.stop:
self._stop(camera_name) await self._stop(camera_name)
elif command == OnvifCommandEnum.preset: elif command == OnvifCommandEnum.preset:
self._move_to_preset(camera_name, param) await self._move_to_preset(camera_name, param)
elif command == OnvifCommandEnum.move_relative: elif command == OnvifCommandEnum.move_relative:
_, pan, tilt = param.split("_") _, pan, tilt = param.split("_")
self._move_relative(camera_name, float(pan), float(tilt), 0, 1) await self._move_relative(camera_name, float(pan), float(tilt), 0, 1)
elif ( elif (
command == OnvifCommandEnum.zoom_in command == OnvifCommandEnum.zoom_in
or command == OnvifCommandEnum.zoom_out or command == OnvifCommandEnum.zoom_out
): ):
self._zoom(camera_name, command) await self._zoom(camera_name, command)
else: else:
self._move(camera_name, command) await self._move(camera_name, command)
except (Fault, ONVIFError, TransportError, Exception) as e: except (Fault, ONVIFError, TransportError, Exception) as e:
logger.error(f"Unable to handle onvif command: {e}") logger.error(f"Unable to handle onvif command: {e}")
def handle_command(
self, camera_name: str, command: OnvifCommandEnum, param: str = ""
) -> None:
"""
Handle ONVIF commands by scheduling them in the event loop.
This is the synchronous interface that schedules async work.
"""
future = asyncio.run_coroutine_threadsafe(
self.handle_command_async(camera_name, command, param), self.loop
)
try:
# Wait with a timeout to prevent blocking indefinitely
future.result(timeout=10)
except asyncio.TimeoutError:
logger.error(f"Command {command} timed out for camera {camera_name}")
except Exception as e:
logger.error(
f"Error executing command {command} for camera {camera_name}: {e}"
)
async def get_camera_info(self, camera_name: str) -> dict[str, any]: async def get_camera_info(self, camera_name: str) -> dict[str, any]:
""" """
Get ptz capabilities and presets, attempting to reconnect if ONVIF is configured Get ptz capabilities and presets, attempting to reconnect if ONVIF is configured
@ -609,26 +659,23 @@ class OnvifController:
) )
return {} return {}
if camera_name not in self.cams and ( if camera_name not in self.cams.keys() and (
camera_name not in self.config.cameras camera_name not in self.config.cameras
or not self.config.cameras[camera_name].onvif.host or not self.config.cameras[camera_name].onvif.host
): ):
logger.debug(f"ONVIF is not configured for {camera_name}") logger.debug(f"ONVIF is not configured for {camera_name}")
return {} return {}
if camera_name in self.cams and self.cams[camera_name]["init"]: if camera_name in self.cams.keys() and self.cams[camera_name]["init"]:
return { return {
"name": camera_name, "name": camera_name,
"features": self.cams[camera_name]["features"], "features": self.cams[camera_name]["features"],
"presets": list(self.cams[camera_name]["presets"].keys()), "presets": list(self.cams[camera_name]["presets"].keys()),
} }
if camera_name not in self.cams and camera_name in self.config.cameras: if camera_name not in self.cams.keys() and camera_name in self.config.cameras:
cam = self.config.cameras[camera_name] success = await self._init_single_camera(camera_name)
result = self._create_onvif_camera(camera_name, cam) if not success:
if result:
self.cams[camera_name] = result
else:
return {} return {}
# Reset retry count after timeout # Reset retry count after timeout
@ -681,23 +728,21 @@ class OnvifController:
logger.debug(f"Could not initialize ONVIF for {camera_name}") logger.debug(f"Could not initialize ONVIF for {camera_name}")
return {} return {}
def get_service_capabilities(self, camera_name: str) -> None: async def get_service_capabilities(self, camera_name: str) -> None:
if camera_name not in self.cams.keys(): if camera_name not in self.cams.keys():
logger.error(f"ONVIF is not configured for {camera_name}") logger.error(f"ONVIF is not configured for {camera_name}")
return {} return {}
if not self.cams[camera_name]["init"]: if not self.cams[camera_name]["init"]:
asyncio.run(self._init_onvif(camera_name)) await self._init_onvif(camera_name)
service_capabilities_request = self.cams[camera_name][ service_capabilities_request = self.cams[camera_name][
"service_capabilities_request" "service_capabilities_request"
] ]
try: try:
service_capabilities = asyncio.run( service_capabilities = await self.cams[camera_name][
self.cams[camera_name]["ptz"].GetServiceCapabilities( "ptz"
service_capabilities_request ].GetServiceCapabilities(service_capabilities_request)
)
)
logger.debug( logger.debug(
f"Onvif service capabilities for {camera_name}: {service_capabilities}" f"Onvif service capabilities for {camera_name}: {service_capabilities}"
@ -705,25 +750,24 @@ class OnvifController:
# MoveStatus is required for autotracking - should return "true" if supported # MoveStatus is required for autotracking - should return "true" if supported
return find_by_key(vars(service_capabilities), "MoveStatus") return find_by_key(vars(service_capabilities), "MoveStatus")
except Exception: except Exception as e:
logger.warning( logger.warning(
f"Camera {camera_name} does not support the ONVIF GetServiceCapabilities method. Autotracking will not function correctly and must be disabled in your config." f"Camera {camera_name} does not support the ONVIF GetServiceCapabilities method. Autotracking will not function correctly and must be disabled in your config. Exception: {e}"
) )
return False return False
def get_camera_status(self, camera_name: str) -> None: async def get_camera_status(self, camera_name: str) -> None:
if camera_name not in self.cams.keys(): if camera_name not in self.cams.keys():
logger.error(f"ONVIF is not configured for {camera_name}") logger.error(f"ONVIF is not configured for {camera_name}")
return {} return
if not self.cams[camera_name]["init"]: if not self.cams[camera_name]["init"]:
asyncio.run(self._init_onvif(camera_name)) if not await self._init_onvif(camera_name):
return
status_request = self.cams[camera_name]["status_request"] status_request = self.cams[camera_name]["status_request"]
try: try:
status = asyncio.run( status = await self.cams[camera_name]["ptz"].GetStatus(status_request)
self.cams[camera_name]["ptz"].GetStatus(status_request)
)
except Exception: except Exception:
pass # We're unsupported, that'll be reported in the next check. pass # We're unsupported, that'll be reported in the next check.
@ -807,3 +851,22 @@ class OnvifController:
camera_name camera_name
].frame_time.value ].frame_time.value
logger.warning(f"Camera {camera_name} is still in ONVIF 'MOVING' status.") logger.warning(f"Camera {camera_name} is still in ONVIF 'MOVING' status.")
def close(self) -> None:
"""Gracefully shut down the ONVIF controller."""
if not hasattr(self, "loop") or self.loop.is_closed():
logger.debug("ONVIF controller already closed")
return
logger.info("Exiting ONVIF controller...")
def stop_and_cleanup():
try:
self.loop.stop()
except Exception as e:
logger.error(f"Error during loop cleanup: {e}")
# Schedule stop and cleanup in the loop thread
self.loop.call_soon_threadsafe(stop_and_cleanup)
self.loop_thread.join()