From da1fb935b4aeb011e64587203121800a0999186d Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Wed, 7 May 2025 08:53:29 -0500 Subject: [PATCH] Refactor async ONVIF (#18093) * use async/await instead of asyncio.run() * fix autotracking * create cameras in same event loop that will use them * more debug * try using existing event loop instead of creating a new one * merge dev * fixes * run get_camera_info onvifcontroller calls in dedicated loop * move coroutine call with loop to api * use asyncio for autotracking move queues * clean up * fix calibration * improve exception logging --- frigate/api/media.py | 8 +- frigate/app.py | 4 + frigate/ptz/autotrack.py | 122 ++++++++++++----------- frigate/ptz/onvif.py | 209 +++++++++++++++++++++++++-------------- 4 files changed, 208 insertions(+), 135 deletions(-) diff --git a/frigate/api/media.py b/frigate/api/media.py index 7dab6ae60..27d87770a 100644 --- a/frigate/api/media.py +++ b/frigate/api/media.py @@ -1,5 +1,6 @@ """Image and video apis.""" +import asyncio import glob import logging import math @@ -110,9 +111,12 @@ def imagestream( @router.get("/{camera_name}/ptz/info") async def camera_ptz_info(request: Request, camera_name: str): if camera_name in request.app.frigate_config.cameras: - return JSONResponse( - content=await request.app.onvif.get_camera_info(camera_name), + # Schedule get_camera_info in the OnvifController's event loop + future = asyncio.run_coroutine_threadsafe( + request.app.onvif.get_camera_info(camera_name), request.app.onvif.loop ) + result = future.result() + return JSONResponse(content=result) else: return JSONResponse( content={"success": False, "message": "Camera not found"}, diff --git a/frigate/app.py b/frigate/app.py index 683ff7ab5..ac3e6d7da 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -699,6 +699,10 @@ class FrigateApp: self.audio_process.terminate() self.audio_process.join() + # stop the onvif controller + if self.onvif_controller: + self.onvif_controller.close() + # ensure the capture processes are done for camera, metrics in self.camera_metrics.items(): capture_process = metrics.capture_process diff --git a/frigate/ptz/autotrack.py b/frigate/ptz/autotrack.py index c6db85a6c..709165c24 100644 --- a/frigate/ptz/autotrack.py +++ b/frigate/ptz/autotrack.py @@ -3,11 +3,9 @@ import asyncio import copy import logging -import queue import threading import time from collections import deque -from functools import partial from multiprocessing.synchronize import Event as MpEvent import cv2 @@ -169,7 +167,12 @@ class PtzAutoTrackerThread(threading.Thread): continue if camera_config.onvif.autotracking.enabled: - self.ptz_autotracker.camera_maintenance(camera) + future = asyncio.run_coroutine_threadsafe( + self.ptz_autotracker.camera_maintenance(camera), + self.ptz_autotracker.onvif.loop, + ) + # Wait for the coroutine to complete + future.result() else: # disabled dynamically by mqtt if self.ptz_autotracker.tracked_object.get(camera): @@ -219,9 +222,13 @@ class PtzAutoTracker: camera_config.onvif.autotracking.enabled and camera_config.onvif.autotracking.enabled_in_config ): - self._autotracker_setup(camera_config, camera) + future = asyncio.run_coroutine_threadsafe( + self._autotracker_setup(camera_config, camera), self.onvif.loop + ) + # Wait for the coroutine to complete + future.result() - def _autotracker_setup(self, camera_config: CameraConfig, camera: str): + async def _autotracker_setup(self, camera_config: CameraConfig, camera: str): logger.debug(f"{camera}: Autotracker init") self.object_types[camera] = camera_config.onvif.autotracking.track @@ -242,8 +249,8 @@ class PtzAutoTracker: self.intercept[camera] = None self.move_coefficients[camera] = [] - self.move_queues[camera] = queue.Queue() - self.move_queue_locks[camera] = threading.Lock() + self.move_queues[camera] = asyncio.Queue() + self.move_queue_locks[camera] = asyncio.Lock() # handle onvif constructor failing due to no connection if camera not in self.onvif.cams: @@ -255,7 +262,7 @@ class PtzAutoTracker: return if not self.onvif.cams[camera]["init"]: - if not asyncio.run(self.onvif._init_onvif(camera)): + if not await self.onvif._init_onvif(camera): logger.warning( f"Disabling autotracking for {camera}: Unable to initialize onvif" ) @@ -271,7 +278,7 @@ class PtzAutoTracker: self.ptz_metrics[camera].autotracker_enabled.value = False return - move_status_supported = self.onvif.get_service_capabilities(camera) + move_status_supported = await self.onvif.get_service_capabilities(camera) if not ( isinstance(move_status_supported, bool) and move_status_supported @@ -287,15 +294,12 @@ class PtzAutoTracker: return if self.onvif.cams[camera]["init"]: - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) - # movement thread per camera - self.move_threads[camera] = threading.Thread( - name=f"ptz_move_thread_{camera}", - target=partial(self._process_move_queue, camera), + # movement queue with asyncio on OnvifController loop + asyncio.run_coroutine_threadsafe( + self._process_move_queue(camera), self.onvif.loop ) - self.move_threads[camera].daemon = True - self.move_threads[camera].start() if camera_config.onvif.autotracking.movement_weights: if len(camera_config.onvif.autotracking.movement_weights) == 6: @@ -330,7 +334,7 @@ class PtzAutoTracker: ) if camera_config.onvif.autotracking.calibrate_on_startup: - self._calibrate_camera(camera) + await self._calibrate_camera(camera) self.ptz_metrics[camera].tracking_active.clear() self.dispatcher.publish(f"{camera}/ptz_autotracker/active", "OFF", retain=False) @@ -349,7 +353,7 @@ class PtzAutoTracker: self.config.cameras[camera].onvif.autotracking.movement_weights, ) - def _calibrate_camera(self, camera): + async def _calibrate_camera(self, camera): # move the camera from the preset in steps and measure the time it takes to move that amount # this will allow us to predict movement times with a simple linear regression # start with 0 so we can determine a baseline (to be used as the intercept in the regression calc) @@ -373,25 +377,25 @@ class PtzAutoTracker: for i in range(2): # absolute move to 0 - fully zoomed out - self.onvif._zoom_absolute( + await self.onvif._zoom_absolute( camera, self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Min"], 1, ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value) - self.onvif._zoom_absolute( + await self.onvif._zoom_absolute( camera, self.onvif.cams[camera]["absolute_zoom_range"]["XRange"]["Max"], 1, ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) zoom_in_values.append(self.ptz_metrics[camera].zoom_level.value) @@ -400,7 +404,7 @@ class PtzAutoTracker: == ZoomingModeEnum.relative ): # relative move to -0.01 - self.onvif._move_relative( + await self.onvif._move_relative( camera, 0, 0, @@ -409,13 +413,13 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value) zoom_start_time = time.time() # relative move to 0.01 - self.onvif._move_relative( + await self.onvif._move_relative( camera, 0, 0, @@ -424,13 +428,13 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) zoom_stop_time = time.time() full_relative_start_time = time.time() - self.onvif._move_relative( + await self.onvif._move_relative( camera, -1, -1, @@ -439,11 +443,11 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) full_relative_stop_time = time.time() - self.onvif._move_relative( + await self.onvif._move_relative( camera, 1, 1, @@ -452,7 +456,7 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) self.zoom_time[camera] = ( full_relative_stop_time - full_relative_start_time @@ -471,7 +475,7 @@ class PtzAutoTracker: self.ptz_metrics[camera].max_zoom.value = 1 self.ptz_metrics[camera].min_zoom.value = 0 - self.onvif._move_to_preset( + await self.onvif._move_to_preset( camera, self.config.cameras[camera].onvif.autotracking.return_preset.lower(), ) @@ -480,18 +484,18 @@ class PtzAutoTracker: # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) for step in range(num_steps): pan = step_sizes[step] tilt = step_sizes[step] start_time = time.time() - self.onvif._move_relative(camera, pan, tilt, 0, 1) + await self.onvif._move_relative(camera, pan, tilt, 0, 1) # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) stop_time = time.time() self.move_metrics[camera].append( @@ -503,7 +507,7 @@ class PtzAutoTracker: } ) - self.onvif._move_to_preset( + await self.onvif._move_to_preset( camera, self.config.cameras[camera].onvif.autotracking.return_preset.lower(), ) @@ -512,7 +516,7 @@ class PtzAutoTracker: # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) logger.info( f"Calibration for {camera} in progress: {round((step / num_steps) * 100)}% complete" @@ -709,18 +713,17 @@ class PtzAutoTracker: centroid_distance < self.tracked_object_metrics[camera]["distance"] ) - def _process_move_queue(self, camera): - camera_config = self.config.cameras[camera] - camera_config.frame_shape[1] - camera_config.frame_shape[0] + async def _process_move_queue(self, camera): + move_queue = self.move_queues[camera] while not self.stop_event.is_set(): try: - move_data = self.move_queues[camera].get(True, 0.1) - except queue.Empty: + # Asynchronously wait for move data with a timeout + move_data = await asyncio.wait_for(move_queue.get(), timeout=0.1) + except asyncio.TimeoutError: continue - with self.move_queue_locks[camera]: + async with self.move_queue_locks[camera]: frame_time, pan, tilt, zoom = move_data # if we're receiving move requests during a PTZ move, ignore them @@ -729,8 +732,6 @@ class PtzAutoTracker: self.ptz_metrics[camera].start_time.value, self.ptz_metrics[camera].stop_time.value, ): - # instead of dequeueing this might be a good place to preemptively move based - # on an estimate - for fast moving objects, etc. logger.debug( f"{camera}: Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}" ) @@ -741,25 +742,24 @@ class PtzAutoTracker: self.config.cameras[camera].onvif.autotracking.zooming == ZoomingModeEnum.relative ): - self.onvif._move_relative(camera, pan, tilt, zoom, 1) - + await self.onvif._move_relative(camera, pan, tilt, zoom, 1) else: if pan != 0 or tilt != 0: - self.onvif._move_relative(camera, pan, tilt, 0, 1) + await self.onvif._move_relative(camera, pan, tilt, 0, 1) # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) if ( zoom > 0 and self.ptz_metrics[camera].zoom_level.value != zoom ): - self.onvif._zoom_absolute(camera, zoom, 1) + await self.onvif._zoom_absolute(camera, zoom, 1) # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) if self.config.cameras[camera].onvif.autotracking.movement_weights: logger.debug( @@ -796,6 +796,10 @@ class PtzAutoTracker: # calculate new coefficients if we have enough data self._calculate_move_coefficients(camera) + # Clean up the queue on exit + while not move_queue.empty(): + await move_queue.get() + def _enqueue_move(self, camera, frame_time, pan, tilt, zoom): def split_value(value, suppress_diff=True): clipped = np.clip(value, -1, 1) @@ -824,7 +828,9 @@ class PtzAutoTracker: f"{camera}: Enqueue movement for frame time: {frame_time} pan: {pan}, tilt: {tilt}, zoom: {zoom}" ) move_data = (frame_time, pan, tilt, zoom) - self.move_queues[camera].put(move_data) + self.onvif.loop.call_soon_threadsafe( + self.move_queues[camera].put_nowait, move_data + ) # reset values to not split up large movements pan = 0 @@ -1420,7 +1426,7 @@ class PtzAutoTracker: ** (1 / self.zoom_factor[camera]) } - def camera_maintenance(self, camera): + async def camera_maintenance(self, camera): # bail and don't check anything if we're calibrating or tracking an object if ( not self.autotracker_init[camera] @@ -1437,7 +1443,7 @@ class PtzAutoTracker: self._autotracker_setup(self.config.cameras[camera], camera) # regularly update camera status if not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) # return to preset if tracking is over if ( @@ -1455,22 +1461,18 @@ class PtzAutoTracker: self.tracked_object[camera] = None self.tracked_object_history[camera].clear() - # empty move queue - while not self.move_queues[camera].empty(): - self.move_queues[camera].get() - self.ptz_metrics[camera].motor_stopped.wait() logger.debug( f"{camera}: Time is {self.ptz_metrics[camera].frame_time.value}, returning to preset: {autotracker_config.return_preset}" ) - self.onvif._move_to_preset( + await self.onvif._move_to_preset( camera, autotracker_config.return_preset.lower(), ) # update stored zoom level from preset if not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) self.ptz_metrics[camera].tracking_active.clear() self.dispatcher.publish( diff --git a/frigate/ptz/onvif.py b/frigate/ptz/onvif.py index 8b133469c..b01d9ed96 100644 --- a/frigate/ptz/onvif.py +++ b/frigate/ptz/onvif.py @@ -2,6 +2,7 @@ import asyncio import logging +import threading import time from enum import Enum from importlib.util import find_spec @@ -39,27 +40,56 @@ class OnvifController: def __init__( self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics] ) -> None: - self.cams: dict[str, ONVIFCamera] = {} + self.cams: dict[str, dict] = {} self.failed_cams: dict[str, dict] = {} self.max_retries = 5 self.reset_timeout = 900 # 15 minutes - self.config = config self.ptz_metrics = ptz_metrics + # Create a dedicated event loop and run it in a separate thread + self.loop = asyncio.new_event_loop() + self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) + self.loop_thread.start() + + self.camera_configs = {} for cam_name, cam in config.cameras.items(): if not cam.enabled: continue - if cam.onvif.host: - result = self._create_onvif_camera(cam_name, cam) - if result: - self.cams[cam_name] = result + self.camera_configs[cam_name] = cam - def _create_onvif_camera(self, cam_name: str, cam) -> dict | None: - """Create an ONVIF camera instance and handle failures.""" + asyncio.run_coroutine_threadsafe(self._init_cameras(), self.loop) + + def _run_event_loop(self) -> None: + """Run the event loop in a separate thread.""" + asyncio.set_event_loop(self.loop) try: - return { + self.loop.run_forever() + except Exception as e: + logger.error(f"Onvif event loop terminated unexpectedly: {e}") + + async def _init_cameras(self) -> None: + """Initialize all configured cameras.""" + for cam_name in self.camera_configs: + await self._init_single_camera(cam_name) + + async def _init_single_camera(self, cam_name: str) -> bool: + """Initialize a single camera by name. + + Args: + cam_name: The name of the camera to initialize + + Returns: + bool: True if initialization succeeded, False otherwise + """ + if cam_name not in self.camera_configs: + logger.error(f"No configuration found for camera {cam_name}") + return False + + cam = self.camera_configs[cam_name] + try: + self.cams[cam_name] = { "onvif": ONVIFCamera( cam.onvif.host, cam.onvif.port, @@ -74,6 +104,7 @@ class OnvifController: "features": [], "presets": {}, } + return True except (Fault, ONVIFError, TransportError, Exception) as e: logger.error(f"Failed to create ONVIF camera instance for {cam_name}: {e}") # track initial failures @@ -82,7 +113,7 @@ class OnvifController: "last_error": str(e), "last_attempt": time.time(), } - return None + return False async def _init_onvif(self, camera_name: str) -> bool: onvif: ONVIFCamera = self.cams[camera_name]["onvif"] @@ -240,12 +271,12 @@ class OnvifController: logger.debug( f"{camera_name}: Relative move request after deleting zoom: {move_request}" ) - except Exception: + except Exception as e: self.config.cameras[ camera_name ].onvif.autotracking.zooming = ZoomingModeEnum.disabled logger.warning( - f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported" + f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported. Exception: {e}" ) if move_request.Speed is None: @@ -295,7 +326,7 @@ class OnvifController: self.cams[camera_name]["relative_zoom_range"] = ( ptz_config.Spaces.RelativeZoomTranslationSpace[0] ) - except Exception: + except Exception as e: if ( self.config.cameras[camera_name].onvif.autotracking.zooming == ZoomingModeEnum.relative @@ -304,7 +335,7 @@ class OnvifController: camera_name ].onvif.autotracking.zooming = ZoomingModeEnum.disabled logger.warning( - f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported" + f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported. Exception: {e}" ) if configs.DefaultAbsoluteZoomPositionSpace: @@ -319,13 +350,13 @@ class OnvifController: ptz_config.Spaces.AbsoluteZoomPositionSpace[0] ) self.cams[camera_name]["zoom_limits"] = configs.ZoomLimits - except Exception: + except Exception as e: if self.config.cameras[camera_name].onvif.autotracking.zooming: self.config.cameras[ camera_name ].onvif.autotracking.zooming = ZoomingModeEnum.disabled logger.warning( - f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported" + f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported. Exception: {e}" ) # set relative pan/tilt space for autotracker @@ -344,25 +375,23 @@ class OnvifController: self.cams[camera_name]["init"] = True return True - def _stop(self, camera_name: str) -> None: + async def _stop(self, camera_name: str) -> None: move_request = self.cams[camera_name]["move_request"] - asyncio.run( - self.cams[camera_name]["ptz"].Stop( - { - "ProfileToken": move_request.ProfileToken, - "PanTilt": True, - "Zoom": True, - } - ) + await self.cams[camera_name]["ptz"].Stop( + { + "ProfileToken": move_request.ProfileToken, + "PanTilt": True, + "Zoom": True, + } ) self.cams[camera_name]["active"] = False - def _move(self, camera_name: str, command: OnvifCommandEnum) -> None: + async def _move(self, camera_name: str, command: OnvifCommandEnum) -> None: if self.cams[camera_name]["active"]: logger.warning( f"{camera_name} is already performing an action, stopping..." ) - self._stop(camera_name) + await self._stop(camera_name) if "pt" not in self.cams[camera_name]["features"]: logger.error(f"{camera_name} does not support ONVIF pan/tilt movement.") @@ -391,11 +420,11 @@ class OnvifController: } try: - asyncio.run(self.cams[camera_name]["ptz"].ContinuousMove(move_request)) + await self.cams[camera_name]["ptz"].ContinuousMove(move_request) except (Fault, ONVIFError, TransportError, Exception) as e: logger.warning(f"Onvif sending move request to {camera_name} failed: {e}") - def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None: + async def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None: if "pt-r-fov" not in self.cams[camera_name]["features"]: logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).") return @@ -464,7 +493,7 @@ class OnvifController: } move_request.Translation.Zoom.x = zoom - asyncio.run(self.cams[camera_name]["ptz"].RelativeMove(move_request)) + await self.cams[camera_name]["ptz"].RelativeMove(move_request) # reset after the move request move_request.Translation.PanTilt.x = 0 @@ -479,7 +508,7 @@ class OnvifController: self.cams[camera_name]["active"] = False - def _move_to_preset(self, camera_name: str, preset: str) -> None: + async def _move_to_preset(self, camera_name: str, preset: str) -> None: if preset not in self.cams[camera_name]["presets"]: logger.error(f"{preset} is not a valid preset for {camera_name}") return @@ -489,23 +518,22 @@ class OnvifController: self.ptz_metrics[camera_name].stop_time.value = 0 move_request = self.cams[camera_name]["move_request"] preset_token = self.cams[camera_name]["presets"][preset] - asyncio.run( - self.cams[camera_name]["ptz"].GotoPreset( - { - "ProfileToken": move_request.ProfileToken, - "PresetToken": preset_token, - } - ) + + await self.cams[camera_name]["ptz"].GotoPreset( + { + "ProfileToken": move_request.ProfileToken, + "PresetToken": preset_token, + } ) self.cams[camera_name]["active"] = False - def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None: + async def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None: if self.cams[camera_name]["active"]: logger.warning( f"{camera_name} is already performing an action, stopping..." ) - self._stop(camera_name) + await self._stop(camera_name) if "zoom" not in self.cams[camera_name]["features"]: logger.error(f"{camera_name} does not support ONVIF zooming.") @@ -519,9 +547,9 @@ class OnvifController: elif command == OnvifCommandEnum.zoom_out: move_request.Velocity = {"Zoom": {"x": -0.5}} - asyncio.run(self.cams[camera_name]["ptz"].ContinuousMove(move_request)) + await self.cams[camera_name]["ptz"].ContinuousMove(move_request) - def _zoom_absolute(self, camera_name: str, zoom, speed) -> None: + async def _zoom_absolute(self, camera_name: str, zoom, speed) -> None: if "zoom-a" not in self.cams[camera_name]["features"]: logger.error(f"{camera_name} does not support ONVIF AbsoluteMove zooming.") return @@ -560,19 +588,20 @@ class OnvifController: logger.debug(f"{camera_name}: Absolute zoom: {zoom}") - asyncio.run(self.cams[camera_name]["ptz"].AbsoluteMove(move_request)) + await self.cams[camera_name]["ptz"].AbsoluteMove(move_request) self.cams[camera_name]["active"] = False - def handle_command( + async def handle_command_async( self, camera_name: str, command: OnvifCommandEnum, param: str = "" ) -> None: + """Handle ONVIF commands asynchronously""" if camera_name not in self.cams.keys(): logger.error(f"ONVIF is not configured for {camera_name}") return if not self.cams[camera_name]["init"]: - if not asyncio.run(self._init_onvif(camera_name)): + if not await self._init_onvif(camera_name): return try: @@ -580,22 +609,43 @@ class OnvifController: # already init return elif command == OnvifCommandEnum.stop: - self._stop(camera_name) + await self._stop(camera_name) elif command == OnvifCommandEnum.preset: - self._move_to_preset(camera_name, param) + await self._move_to_preset(camera_name, param) elif command == OnvifCommandEnum.move_relative: _, pan, tilt = param.split("_") - self._move_relative(camera_name, float(pan), float(tilt), 0, 1) + await self._move_relative(camera_name, float(pan), float(tilt), 0, 1) elif ( command == OnvifCommandEnum.zoom_in or command == OnvifCommandEnum.zoom_out ): - self._zoom(camera_name, command) + await self._zoom(camera_name, command) else: - self._move(camera_name, command) + await self._move(camera_name, command) except (Fault, ONVIFError, TransportError, Exception) as e: logger.error(f"Unable to handle onvif command: {e}") + def handle_command( + self, camera_name: str, command: OnvifCommandEnum, param: str = "" + ) -> None: + """ + Handle ONVIF commands by scheduling them in the event loop. + This is the synchronous interface that schedules async work. + """ + future = asyncio.run_coroutine_threadsafe( + self.handle_command_async(camera_name, command, param), self.loop + ) + + try: + # Wait with a timeout to prevent blocking indefinitely + future.result(timeout=10) + except asyncio.TimeoutError: + logger.error(f"Command {command} timed out for camera {camera_name}") + except Exception as e: + logger.error( + f"Error executing command {command} for camera {camera_name}: {e}" + ) + async def get_camera_info(self, camera_name: str) -> dict[str, any]: """ Get ptz capabilities and presets, attempting to reconnect if ONVIF is configured @@ -609,26 +659,23 @@ class OnvifController: ) return {} - if camera_name not in self.cams and ( + if camera_name not in self.cams.keys() and ( camera_name not in self.config.cameras or not self.config.cameras[camera_name].onvif.host ): logger.debug(f"ONVIF is not configured for {camera_name}") return {} - if camera_name in self.cams and self.cams[camera_name]["init"]: + if camera_name in self.cams.keys() and self.cams[camera_name]["init"]: return { "name": camera_name, "features": self.cams[camera_name]["features"], "presets": list(self.cams[camera_name]["presets"].keys()), } - if camera_name not in self.cams and camera_name in self.config.cameras: - cam = self.config.cameras[camera_name] - result = self._create_onvif_camera(camera_name, cam) - if result: - self.cams[camera_name] = result - else: + if camera_name not in self.cams.keys() and camera_name in self.config.cameras: + success = await self._init_single_camera(camera_name) + if not success: return {} # Reset retry count after timeout @@ -681,23 +728,21 @@ class OnvifController: logger.debug(f"Could not initialize ONVIF for {camera_name}") return {} - def get_service_capabilities(self, camera_name: str) -> None: + async def get_service_capabilities(self, camera_name: str) -> None: if camera_name not in self.cams.keys(): logger.error(f"ONVIF is not configured for {camera_name}") return {} if not self.cams[camera_name]["init"]: - asyncio.run(self._init_onvif(camera_name)) + await self._init_onvif(camera_name) service_capabilities_request = self.cams[camera_name][ "service_capabilities_request" ] try: - service_capabilities = asyncio.run( - self.cams[camera_name]["ptz"].GetServiceCapabilities( - service_capabilities_request - ) - ) + service_capabilities = await self.cams[camera_name][ + "ptz" + ].GetServiceCapabilities(service_capabilities_request) logger.debug( f"Onvif service capabilities for {camera_name}: {service_capabilities}" @@ -705,25 +750,24 @@ class OnvifController: # MoveStatus is required for autotracking - should return "true" if supported return find_by_key(vars(service_capabilities), "MoveStatus") - except Exception: + except Exception as e: logger.warning( - f"Camera {camera_name} does not support the ONVIF GetServiceCapabilities method. Autotracking will not function correctly and must be disabled in your config." + f"Camera {camera_name} does not support the ONVIF GetServiceCapabilities method. Autotracking will not function correctly and must be disabled in your config. Exception: {e}" ) return False - def get_camera_status(self, camera_name: str) -> None: + async def get_camera_status(self, camera_name: str) -> None: if camera_name not in self.cams.keys(): logger.error(f"ONVIF is not configured for {camera_name}") - return {} + return if not self.cams[camera_name]["init"]: - asyncio.run(self._init_onvif(camera_name)) + if not await self._init_onvif(camera_name): + return status_request = self.cams[camera_name]["status_request"] try: - status = asyncio.run( - self.cams[camera_name]["ptz"].GetStatus(status_request) - ) + status = await self.cams[camera_name]["ptz"].GetStatus(status_request) except Exception: pass # We're unsupported, that'll be reported in the next check. @@ -807,3 +851,22 @@ class OnvifController: camera_name ].frame_time.value logger.warning(f"Camera {camera_name} is still in ONVIF 'MOVING' status.") + + def close(self) -> None: + """Gracefully shut down the ONVIF controller.""" + if not hasattr(self, "loop") or self.loop.is_closed(): + logger.debug("ONVIF controller already closed") + return + + logger.info("Exiting ONVIF controller...") + + def stop_and_cleanup(): + try: + self.loop.stop() + except Exception as e: + logger.error(f"Error during loop cleanup: {e}") + + # Schedule stop and cleanup in the loop thread + self.loop.call_soon_threadsafe(stop_and_cleanup) + + self.loop_thread.join()