From 4bb420d049b31fa9cd6807df254e7caf85e6672c Mon Sep 17 00:00:00 2001 From: gtsiam Date: Mon, 21 Oct 2024 18:00:38 +0300 Subject: [PATCH] Add service manager infrastructure (#14150) * Add service manager infrastructure The changes are (This will be a bit long): - A ServiceManager class that spawns a background thread and deals with service lifecycle management. The idea is that service lifecycle code will run in async functions, so a single thread is enough to manage any (reasonable) amount of services. - A Service class, that offers start(), stop() and restart() methods that simply notify the service manager to... well. Start, stop or restart a service. (!) Warning: Note that this differs from mp.Process.start/stop in that the service commands are sent asynchronously and will complete "eventually". This is good because it means that business logic is fast when booting up and shutting down, but we need to make sure that code does not rely on start() and stop() being instant (Mainly pid assignments). Subclasses of the Service class should use the on_start and on_stop methods to monitor for service events. These will be run by the service manager thread, so we need to be careful not to block execution here. Standard async stuff. (!) Note on service names: Service names should be unique within a ServiceManager. Make sure that you pass the name you want to super().__init__(name="...") if you plan to spawn multiple instances of a service. - A ServiceProcess class: A Service that wraps a multiprocessing.Process into a Service. It offers a run() method subclasses can override and can support in-place restarting using the service manager. And finally, I lied a bit about this whole thing using a single thread. I can't find any way to run python multiprocessing in async, so there is a MultiprocessingWaiter thread that waits for multiprocessing events and notifies any pending futures. This was uhhh... fun? No, not really. But it works. Using this part of the code just involves calling the provided wait method. See the implementation of ServiceProcess for more details. Mirror util.Process hooks onto service process Remove Service.__name attribute Do not serialize process object on ServiceProcess start. asd * Update frigate dictionary * Convert AudioProcessor to service process --- .cspell/frigate-dictionary.txt | 4 + frigate/app.py | 14 +- frigate/events/audio.py | 8 +- frigate/mypy.ini | 4 + frigate/service_manager/__init__.py | 4 + frigate/service_manager/multiprocessing.py | 164 +++++++ .../service_manager/multiprocessing_waiter.py | 150 ++++++ frigate/service_manager/service.py | 446 ++++++++++++++++++ 8 files changed, 782 insertions(+), 12 deletions(-) create mode 100644 frigate/service_manager/__init__.py create mode 100644 frigate/service_manager/multiprocessing.py create mode 100644 frigate/service_manager/multiprocessing_waiter.py create mode 100644 frigate/service_manager/service.py diff --git a/.cspell/frigate-dictionary.txt b/.cspell/frigate-dictionary.txt index 0cbcc4beb..b019f8492 100644 --- a/.cspell/frigate-dictionary.txt +++ b/.cspell/frigate-dictionary.txt @@ -42,6 +42,7 @@ codeproject colormap colorspace comms +coro ctypeslib CUDA Cuvid @@ -59,6 +60,7 @@ dsize dtype ECONNRESET edgetpu +fastapi faststart fflags ffprobe @@ -237,6 +239,7 @@ sleeptime SNDMORE socs sqliteq +sqlitevecq ssdlite statm stimeout @@ -271,6 +274,7 @@ unraid unreviewed userdata usermod +uvicorn vaapi vainfo variations diff --git a/frigate/app.py b/frigate/app.py index 0cf76699c..bc4f626e0 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -63,6 +63,7 @@ from frigate.record.cleanup import RecordingCleanup from frigate.record.export import migrate_exports from frigate.record.record import manage_recordings from frigate.review.review import manage_review_segments +from frigate.service_manager import ServiceManager from frigate.stats.emitter import StatsEmitter from frigate.stats.util import stats_init from frigate.storage import StorageMaintainer @@ -78,7 +79,6 @@ logger = logging.getLogger(__name__) class FrigateApp: def __init__(self, config: FrigateConfig) -> None: - self.audio_process: Optional[mp.Process] = None self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} @@ -449,9 +449,8 @@ class FrigateApp: ] if audio_cameras: - self.audio_process = AudioProcessor(audio_cameras, self.camera_metrics) - self.audio_process.start() - self.processes["audio_detector"] = self.audio_process.pid or 0 + proc = AudioProcessor(audio_cameras, self.camera_metrics).start(wait=True) + self.processes["audio_detector"] = proc.pid or 0 def start_timeline_processor(self) -> None: self.timeline_processor = TimelineProcessor( @@ -639,11 +638,6 @@ class FrigateApp: ReviewSegment.end_time == None ).execute() - # stop the audio process - if self.audio_process: - self.audio_process.terminate() - self.audio_process.join() - # ensure the capture processes are done for camera, metrics in self.camera_metrics.items(): capture_process = metrics.capture_process @@ -712,4 +706,6 @@ class FrigateApp: shm.close() shm.unlink() + ServiceManager.current().shutdown(wait=True) + os._exit(os.EX_OK) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 66a27fcd0..45706dcc8 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -9,7 +9,6 @@ from typing import Tuple import numpy as np import requests -import frigate.util as util from frigate.camera import CameraMetrics from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum @@ -26,6 +25,7 @@ from frigate.const import ( from frigate.ffmpeg_presets import parse_preset_input from frigate.log import LogPipe from frigate.object_detection import load_labels +from frigate.service_manager import ServiceProcess from frigate.util.builtin import get_ffmpeg_arg_list from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg @@ -63,13 +63,15 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: ) -class AudioProcessor(util.Process): +class AudioProcessor(ServiceProcess): + name = "frigate.audio_manager" + def __init__( self, cameras: list[CameraConfig], camera_metrics: dict[str, CameraMetrics], ): - super().__init__(name="frigate.audio_manager", daemon=True) + super().__init__() self.camera_metrics = camera_metrics self.cameras = cameras diff --git a/frigate/mypy.ini b/frigate/mypy.ini index d8f849334..dd726f454 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -59,3 +59,7 @@ ignore_errors = false [mypy-frigate.watchdog] ignore_errors = false disallow_untyped_calls = false + + +[mypy-frigate.service_manager.*] +ignore_errors = false diff --git a/frigate/service_manager/__init__.py b/frigate/service_manager/__init__.py new file mode 100644 index 000000000..2da23b8b0 --- /dev/null +++ b/frigate/service_manager/__init__.py @@ -0,0 +1,4 @@ +from .multiprocessing import ServiceProcess +from .service import Service, ServiceManager + +__all__ = ["Service", "ServiceProcess", "ServiceManager"] diff --git a/frigate/service_manager/multiprocessing.py b/frigate/service_manager/multiprocessing.py new file mode 100644 index 000000000..d0b169275 --- /dev/null +++ b/frigate/service_manager/multiprocessing.py @@ -0,0 +1,164 @@ +import asyncio +import faulthandler +import logging +import multiprocessing as mp +import signal +import sys +import threading +from abc import ABC, abstractmethod +from asyncio.exceptions import TimeoutError +from logging.handlers import QueueHandler +from types import FrameType +from typing import Optional + +import frigate.log + +from .multiprocessing_waiter import wait as mp_wait +from .service import Service, ServiceManager + +DEFAULT_STOP_TIMEOUT = 10 # seconds + + +class BaseServiceProcess(Service, ABC): + """A Service the manages a multiprocessing.Process.""" + + _process: Optional[mp.Process] + + def __init__( + self, + *, + name: Optional[str] = None, + manager: Optional[ServiceManager] = None, + ) -> None: + super().__init__(name=name, manager=manager) + + self._process = None + + async def on_start(self) -> None: + if self._process is not None: + if self._process.is_alive(): + return # Already started. + else: + self._process.close() + + # At this point, the process is either stopped or dead, so we can recreate it. + self._process = mp.Process(target=self._run) + self._process.name = self.name + self._process.daemon = True + self.before_start() + self._process.start() + self.after_start() + + self.manager.logger.info(f"Started {self.name} (pid: {self._process.pid})") + + async def on_stop( + self, + *, + force: bool = False, + timeout: Optional[float] = None, + ) -> None: + if timeout is None: + timeout = DEFAULT_STOP_TIMEOUT + + if self._process is None: + return # Already stopped. + + running = True + + if not force: + self._process.terminate() + try: + await asyncio.wait_for(mp_wait(self._process), timeout) + running = False + except TimeoutError: + self.manager.logger.warning( + f"{self.name} is still running after " + f"{timeout} seconds. Killing." + ) + + if running: + self._process.kill() + await mp_wait(self._process) + + self._process.close() + self._process = None + + self.manager.logger.info(f"{self.name} stopped") + + @property + def pid(self) -> Optional[int]: + return self._process.pid if self._process else None + + def _run(self) -> None: + self.before_run() + self.run() + self.after_run() + + def before_start(self) -> None: + pass + + def after_start(self) -> None: + pass + + def before_run(self) -> None: + pass + + def after_run(self) -> None: + pass + + @abstractmethod + def run(self) -> None: + pass + + def __getstate__(self) -> dict: + return { + k: v + for k, v in self.__dict__.items() + if not (k.startswith("_Service__") or k == "_process") + } + + +class ServiceProcess(BaseServiceProcess): + logger: logging.Logger + + @property + def stop_event(self) -> threading.Event: + # Lazily create the stop_event. This allows the signal handler to tell if anyone is + # monitoring the stop event, and to raise a SystemExit if not. + if "stop_event" not in self.__dict__: + stop_event = threading.Event() + self.__dict__["stop_event"] = stop_event + else: + stop_event = self.__dict__["stop_event"] + assert isinstance(stop_event, threading.Event) + + return stop_event + + def before_start(self) -> None: + if frigate.log.log_listener is None: + raise RuntimeError("Logging has not yet been set up.") + self.__log_queue = frigate.log.log_listener.queue + + def before_run(self) -> None: + super().before_run() + + faulthandler.enable() + + def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: + # Get the stop_event through the dict to bypass lazy initialization. + stop_event = self.__dict__.get("stop_event") + if stop_event is not None: + # Someone is monitoring stop_event. We should set it. + stop_event.set() + else: + # Nobody is monitoring stop_event. We should raise SystemExit. + sys.exit() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + self.logger = logging.getLogger(self.name) + + logging.basicConfig(handlers=[], force=True) + logging.getLogger().addHandler(QueueHandler(self.__log_queue)) + del self.__log_queue diff --git a/frigate/service_manager/multiprocessing_waiter.py b/frigate/service_manager/multiprocessing_waiter.py new file mode 100644 index 000000000..8acdf583c --- /dev/null +++ b/frigate/service_manager/multiprocessing_waiter.py @@ -0,0 +1,150 @@ +import asyncio +import functools +import logging +import multiprocessing as mp +import queue +import threading +from multiprocessing.connection import Connection +from multiprocessing.connection import wait as mp_wait +from socket import socket +from typing import Any, Optional, Union + +logger = logging.getLogger(__name__) + + +class MultiprocessingWaiter(threading.Thread): + """A background thread that manages futures for the multiprocessing.connection.wait() method.""" + + def __init__(self) -> None: + super().__init__(daemon=True) + + # Queue of objects to wait for and futures to set results for. + self._queue: queue.Queue[tuple[Any, asyncio.Future[None]]] = queue.Queue() + + # This is required to get mp_wait() to wake up when new objects to wait for are received. + receive, send = mp.Pipe(duplex=False) + self._receive_connection = receive + self._send_connection = send + + def wait_for_sentinel(self, sentinel: Any) -> asyncio.Future[None]: + """Create an asyncio.Future tracking a sentinel for multiprocessing.connection.wait() + + Warning: This method is NOT thread-safe. + """ + # This would be incredibly stupid, but you never know. + assert sentinel != self._receive_connection + + # Send the future to the background thread for processing. + future = asyncio.get_running_loop().create_future() + self._queue.put((sentinel, future)) + + # Notify the background thread. + # + # This is the non-thread-safe part, but since this method is not really meant to be called + # by users, we can get away with not adding a lock at this point (to avoid adding 2 locks). + self._send_connection.send_bytes(b".") + + return future + + def run(self) -> None: + logger.debug("Started background thread") + + wait_dict: dict[Any, set[asyncio.Future[None]]] = { + self._receive_connection: set() + } + while True: + for ready_obj in mp_wait(wait_dict.keys()): + # Make sure we never remove the receive connection from the wait dict + if ready_obj is self._receive_connection: + continue + + logger.debug( + f"Sentinel {ready_obj!r} is ready. " + f"Notifying {len(wait_dict[ready_obj])} future(s)." + ) + + # Go over all the futures attached to this object and mark them as ready. + for fut in wait_dict.pop(ready_obj): + if fut.cancelled(): + logger.debug( + f"A future for sentinel {ready_obj!r} is ready, " + "but the future is cancelled. Skipping." + ) + else: + fut.get_loop().call_soon_threadsafe( + # Note: We need to check fut.cancelled() again, since it might + # have been set before the event loop's definition of "soon". + functools.partial( + lambda fut: fut.cancelled() or fut.set_result(None), fut + ) + ) + + # Check for cancellations in the remaining futures. + done_objects = [] + for obj, fut_set in wait_dict.items(): + if obj is self._receive_connection: + continue + + # Find any cancelled futures and remove them. + cancelled = [fut for fut in fut_set if fut.cancelled()] + fut_set.difference_update(cancelled) + logger.debug( + f"Removing {len(cancelled)} future(s) from sentinel: {obj!r}" + ) + + # Mark objects with no remaining futures for removal. + if len(fut_set) == 0: + done_objects.append(obj) + + # Remove any objects that are done after removing cancelled futures. + for obj in done_objects: + logger.debug( + f"Sentinel {obj!r} no longer has any futures waiting for it." + ) + del wait_dict[obj] + + # Get new objects to wait for from the queue. + while True: + try: + obj, fut = self._queue.get_nowait() + self._receive_connection.recv_bytes(maxlength=1) + self._queue.task_done() + + logger.debug(f"Received new sentinel: {obj!r}") + + wait_dict.setdefault(obj, set()).add(fut) + except queue.Empty: + break + + +waiter_lock = threading.Lock() +waiter_thread: Optional[MultiprocessingWaiter] = None + + +async def wait(object: Union[mp.Process, Connection, socket]) -> None: + """Wait for the supplied object to be ready. + + Under the hood, this uses multiprocessing.connection.wait() and a background thread manage the + returned futures. + """ + global waiter_thread, waiter_lock + + sentinel: Union[Connection, socket, int] + if isinstance(object, mp.Process): + sentinel = object.sentinel + elif isinstance(object, Connection) or isinstance(object, socket): + sentinel = object + else: + raise ValueError(f"Cannot wait for object of type {type(object).__qualname__}") + + with waiter_lock: + if waiter_thread is None: + # Start a new waiter thread. + waiter_thread = MultiprocessingWaiter() + waiter_thread.start() + + # Create the future while still holding the lock, + # since wait_for_sentinel() is not thread safe. + fut = waiter_thread.wait_for_sentinel(sentinel) + + await fut diff --git a/frigate/service_manager/service.py b/frigate/service_manager/service.py new file mode 100644 index 000000000..62be6205b --- /dev/null +++ b/frigate/service_manager/service.py @@ -0,0 +1,446 @@ +from __future__ import annotations + +import asyncio +import atexit +import logging +import threading +from abc import ABC, abstractmethod +from contextvars import ContextVar +from dataclasses import dataclass +from functools import partial +from typing import Coroutine, Optional, Union, cast + +from typing_extensions import Self + + +class Service(ABC): + """An abstract service instance.""" + + def __init__( + self, + *, + name: Optional[str] = None, + manager: Optional[ServiceManager] = None, + ): + if name: + self.__dict__["name"] = name + + self.__manager = manager or ServiceManager.current() + self.__lock = asyncio.Lock(loop=self.__manager._event_loop) + self.__manager._register(self) + + @property + def name(self) -> str: + try: + return cast(str, self.__dict__["name"]) + except KeyError: + return type(self).__qualname__ + + @property + def manager(self) -> ServiceManager: + """The service manager this service is registered with.""" + try: + return self.__manager + except AttributeError: + raise RuntimeError("Cannot access associated service manager") + + def start( + self, + *, + wait: bool = False, + wait_timeout: Optional[float] = None, + ) -> Self: + """Start this service. + + :param wait: If set, this function will block until the task is complete. + :param wait_timeout: If set, this function will not return until the task is complete or the + specified timeout has elapsed. + """ + + self.manager.run_task( + self.on_start(), + wait=wait, + wait_timeout=wait_timeout, + lock=self.__lock, + ) + + return self + + def stop( + self, + *, + force: bool = False, + timeout: Optional[float] = None, + wait: bool = False, + wait_timeout: Optional[float] = None, + ) -> Self: + """Stop this service. + + :param force: If set, the service will be killed immediately. + :param timeout: Maximum amount of time to wait before force-killing the service. + + :param wait: If set, this function will block until the task is complete. + :param wait_timeout: If set, this function will not return until the task is complete or the + specified timeout has elapsed. + """ + + self.manager.run_task( + self.on_stop(force=force, timeout=timeout), + wait=wait, + wait_timeout=wait_timeout, + lock=self.__lock, + ) + + return self + + def restart( + self, + *, + force: bool = False, + stop_timeout: Optional[float] = None, + wait: bool = False, + wait_timeout: Optional[float] = None, + ) -> Self: + """Restart this service. + + :param force: If set, the service will be killed immediately. + :param timeout: Maximum amount of time to wait before force-killing the service. + + :param wait: If set, this function will block until the task is complete. + :param wait_timeout: If set, this function will not return until the task is complete or the + specified timeout has elapsed. + """ + + self.manager.run_task( + self.on_restart(force=force, stop_timeout=stop_timeout), + wait=wait, + wait_timeout=wait_timeout, + lock=self.__lock, + ) + + return self + + @abstractmethod + async def on_start(self) -> None: + pass + + @abstractmethod + async def on_stop( + self, + *, + force: bool = False, + timeout: Optional[float] = None, + ) -> None: + pass + + async def on_restart( + self, + *, + force: bool = False, + stop_timeout: Optional[float] = None, + ) -> None: + await self.on_stop(force=force, timeout=stop_timeout) + await self.on_start() + + +default_service_manager_lock = threading.Lock() +default_service_manager: Optional[ServiceManager] = None + +current_service_manager: ContextVar[ServiceManager] = ContextVar( + "current_service_manager" +) + + +@dataclass +class Command: + """A coroutine to execute in the service manager thread. + + Attributes: + coro: The coroutine to execute. + lock: An async lock to acquire before calling the coroutine. + done: If specified, the service manager will set this event after the command completes. + """ + + coro: Coroutine + lock: Optional[asyncio.Lock] = None + done: Optional[threading.Event] = None + + +class ServiceManager: + """A set of services, along with the global state required to manage them efficiently. + + Typically users of the service infrastructure will not interact with a service manager directly, + but rather through individual Service subclasses that will automatically manage a service + manager instance. + + Each service manager instance has a background thread in which service lifecycle tasks are + executed in an async executor. This is done to avoid head-of-line blocking in the business logic + that spins up individual services. This thread is automatically started when the service manager + is created and stopped either manually, or on application exit. + + All (public) service manager methods are thread-safe. + """ + + _name: str + _logger: logging.Logger + + # The set of services this service manager knows about. + _services: dict[str, Service] + _services_lock: threading.Lock + + # Commands will be queued with associated event loop. Queueing `None` signals shutdown. + _command_queue: asyncio.Queue[Union[Command, None]] + _event_loop: asyncio.AbstractEventLoop + + # The pending command counter is used to ensure all commands have been queued before shutdown. + _pending_commands: AtomicCounter + + # The set of pending tasks after they have been received by the background thread and spawned. + _tasks: set + + # Fired after the async runtime starts. Object initialization completes after this is set. + _setup_event: threading.Event + + # Will be acquired to ensure the shutdown sentinel is sent only once. Never released. + _shutdown_lock: threading.Lock + + def __init__(self, *, name: Optional[str] = None): + self._name = name if name is not None else (__package__ or __name__) + self._logger = logging.getLogger(self.name) + + self._services = dict() + self._services_lock = threading.Lock() + + self._pending_commands = AtomicCounter() + self._tasks = set() + + self._shutdown_lock = threading.Lock() + + # --- Start the manager thread and wait for it to be ready. --- + + self._setup_event = threading.Event() + + async def start_manager() -> None: + self._event_loop = asyncio.get_running_loop() + self._command_queue = asyncio.Queue() + + self._setup_event.set() + await self._monitor_command_queue() + + self._manager_thread = threading.Thread( + name=self.name, + target=lambda: asyncio.run(start_manager()), + daemon=True, + ) + + self._manager_thread.start() + atexit.register(partial(self.shutdown, wait=True)) + + self._setup_event.wait() + + @property + def name(self) -> str: + """The name of this service manager. Primarily intended for logging purposes.""" + return self._name + + @property + def logger(self) -> logging.Logger: + """The logger used by this service manager.""" + return self._logger + + @classmethod + def current(cls) -> ServiceManager: + """The service manager set in the current context (async task or thread). + + A global default service manager will be automatically created on first access.""" + + global default_service_manager + + current = current_service_manager.get(None) + if current is None: + with default_service_manager_lock: + if default_service_manager is None: + default_service_manager = cls() + + current = default_service_manager + current_service_manager.set(current) + return current + + def make_current(self) -> None: + """Make this the current service manager.""" + + current_service_manager.set(self) + + def run_task( + self, + coro: Coroutine, + *, + wait: bool = False, + wait_timeout: Optional[float] = None, + lock: Optional[asyncio.Lock] = None, + ) -> None: + """Run an async task in the service manager thread. + + :param wait: If set, this function will block until the task is complete. + :param wait_timeout: If set, this function will not return until the task is complete or the + specified timeout has elapsed. + """ + + if not isinstance(coro, Coroutine): + raise TypeError(f"Cannot schedule task for object of type {type(coro)}") + + cmd = Command(coro=coro, lock=lock) + if wait or wait_timeout is not None: + cmd.done = threading.Event() + + self._send_command(cmd) + + if cmd.done is not None: + cmd.done.wait(timeout=wait_timeout) + + def shutdown( + self, *, wait: bool = False, wait_timeout: Optional[float] = None + ) -> None: + """Shutdown the service manager thread. + + After the shutdown process completes, any subsequent calls to the service manager will + produce an error. + + :param wait: If set, this function will block until the shutdown process is complete. + :param wait_timeout: If set, this function will not return until the shutdown process is + complete or the specified timeout has elapsed. + """ + + if self._shutdown_lock.acquire(blocking=False): + self._send_command(None) + if wait: + self._manager_thread.join(timeout=wait_timeout) + + def _ensure_running(self) -> None: + self._setup_event.wait() + if not self._manager_thread.is_alive(): + raise RuntimeError(f"ServiceManager {self.name} is not running") + + def _send_command(self, command: Union[Command, None]) -> None: + self._ensure_running() + + async def queue_command() -> None: + await self._command_queue.put(command) + self._pending_commands.sub() + + self._pending_commands.add() + asyncio.run_coroutine_threadsafe(queue_command(), self._event_loop) + + def _register(self, service: Service) -> None: + """Register a service with the service manager. This is done by the service constructor.""" + + self._ensure_running() + with self._services_lock: + name_conflict: Optional[Service] = next( + ( + existing + for name, existing in self._services.items() + if name == service.name + ), + None, + ) + + if name_conflict is service: + raise RuntimeError(f"Attempt to re-register service: {service.name}") + elif name_conflict is not None: + raise RuntimeError(f"Duplicate service name: {service.name}") + + self.logger.debug(f"Registering service: {service.name}") + self._services[service.name] = service + + def _run_command(self, command: Command) -> None: + """Execute a command and add it to the tasks set.""" + + def task_done(task: asyncio.Task) -> None: + exc = task.exception() + if exc: + self.logger.exception("Exception in service manager task", exc_info=exc) + self._tasks.discard(task) + if command.done is not None: + command.done.set() + + async def task_harness() -> None: + if command.lock is not None: + async with command.lock: + await command.coro + else: + await command.coro + + task = asyncio.create_task(task_harness()) + task.add_done_callback(task_done) + self._tasks.add(task) + + async def _monitor_command_queue(self) -> None: + """The main function of the background thread.""" + + self.logger.info("Started service manager") + + # Main command processing loop. + while (command := await self._command_queue.get()) is not None: + self._run_command(command) + + # Send a stop command to all services. We don't have a status command yet, so we can just + # stop everything and be done with it. + with self._services_lock: + self.logger.debug(f"Stopping {len(self._services)} services") + for service in self._services.values(): + service.stop() + + # Wait for all commands to finish executing. + await self._shutdown() + + self.logger.info("Exiting service manager") + + async def _shutdown(self) -> None: + """Ensure all commands have been queued & executed.""" + + while True: + command = None + try: + # Try and get a command from the queue. + command = self._command_queue.get_nowait() + except asyncio.QueueEmpty: + if self._pending_commands.value > 0: + # If there are pending commands to queue, await them. + command = await self._command_queue.get() + elif self._tasks: + # If there are still pending tasks, wait for them. These tasks might queue + # commands though, so we have to loop again. + await asyncio.wait(self._tasks) + else: + # Nothing is pending at this point, so we're done here. + break + + # If we got a command, run it. + if command is not None: + self._run_command(command) + + +class AtomicCounter: + """A lock-protected atomic counter.""" + + # Modern CPUs have atomics, but python doesn't seem to include them in the standard library. + # Besides, the performance penalty is negligible compared to, well, using python. + # So this will do just fine. + + def __init__(self, initial: int = 0): + self._lock = threading.Lock() + self._value = initial + + def add(self, value: int = 1) -> None: + with self._lock: + self._value += value + + def sub(self, value: int = 1) -> None: + with self._lock: + self._value -= value + + @property + def value(self) -> int: + with self._lock: + return self._value