Add service manager infrastructure (#14150)

* Add service manager infrastructure

The changes are (This will be a bit long):
- A ServiceManager class that spawns a background thread and deals with
  service lifecycle management. The idea is that service lifecycle code
  will run in async functions, so a single thread is enough to manage
  any (reasonable) amount of services.

- A Service class, that offers start(), stop() and restart() methods
  that simply notify the service manager to... well. Start, stop or
  restart a service.

(!) Warning: Note that this differs from mp.Process.start/stop in that
  the service commands are sent asynchronously and will complete
  "eventually". This is good because it means that business logic is
  fast when booting up and shutting down, but we need to make sure
  that code does not rely on start() and stop() being instant
  (Mainly pid assignments).

  Subclasses of the Service class should use the on_start and on_stop
  methods to monitor for service events. These will be run by the
  service manager thread, so we need to be careful not to block
  execution here. Standard async stuff.

(!) Note on service names: Service names should be unique within a
  ServiceManager. Make sure that you pass the name you want to
  super().__init__(name="...") if you plan to spawn multiple instances
  of a service.

- A ServiceProcess class: A Service that wraps a multiprocessing.Process
  into a Service. It offers a run() method subclasses can override and
  can support in-place restarting using the service manager.

And finally, I lied a bit about this whole thing using a single thread.
I can't find any way to run python multiprocessing in async, so there is
a MultiprocessingWaiter thread that waits for multiprocessing events and
notifies any pending futures. This was uhhh... fun? No, not really.
But it works. Using this part of the code just involves calling the
provided wait method. See the implementation of ServiceProcess for more
details.

Mirror util.Process hooks onto service process

Remove Service.__name attribute

Do not serialize process object on ServiceProcess start.

asd

* Update frigate dictionary

* Convert AudioProcessor to service process
This commit is contained in:
gtsiam 2024-10-21 18:00:38 +03:00 committed by GitHub
parent 560dc68120
commit 4bb420d049
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 782 additions and 12 deletions

View File

@ -42,6 +42,7 @@ codeproject
colormap colormap
colorspace colorspace
comms comms
coro
ctypeslib ctypeslib
CUDA CUDA
Cuvid Cuvid
@ -59,6 +60,7 @@ dsize
dtype dtype
ECONNRESET ECONNRESET
edgetpu edgetpu
fastapi
faststart faststart
fflags fflags
ffprobe ffprobe
@ -237,6 +239,7 @@ sleeptime
SNDMORE SNDMORE
socs socs
sqliteq sqliteq
sqlitevecq
ssdlite ssdlite
statm statm
stimeout stimeout
@ -271,6 +274,7 @@ unraid
unreviewed unreviewed
userdata userdata
usermod usermod
uvicorn
vaapi vaapi
vainfo vainfo
variations variations

View File

@ -63,6 +63,7 @@ from frigate.record.cleanup import RecordingCleanup
from frigate.record.export import migrate_exports from frigate.record.export import migrate_exports
from frigate.record.record import manage_recordings from frigate.record.record import manage_recordings
from frigate.review.review import manage_review_segments from frigate.review.review import manage_review_segments
from frigate.service_manager import ServiceManager
from frigate.stats.emitter import StatsEmitter from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
@ -78,7 +79,6 @@ logger = logging.getLogger(__name__)
class FrigateApp: class FrigateApp:
def __init__(self, config: FrigateConfig) -> None: def __init__(self, config: FrigateConfig) -> None:
self.audio_process: Optional[mp.Process] = None
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue() self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {} self.detectors: dict[str, ObjectDetectProcess] = {}
@ -449,9 +449,8 @@ class FrigateApp:
] ]
if audio_cameras: if audio_cameras:
self.audio_process = AudioProcessor(audio_cameras, self.camera_metrics) proc = AudioProcessor(audio_cameras, self.camera_metrics).start(wait=True)
self.audio_process.start() self.processes["audio_detector"] = proc.pid or 0
self.processes["audio_detector"] = self.audio_process.pid or 0
def start_timeline_processor(self) -> None: def start_timeline_processor(self) -> None:
self.timeline_processor = TimelineProcessor( self.timeline_processor = TimelineProcessor(
@ -639,11 +638,6 @@ class FrigateApp:
ReviewSegment.end_time == None ReviewSegment.end_time == None
).execute() ).execute()
# stop the audio process
if self.audio_process:
self.audio_process.terminate()
self.audio_process.join()
# ensure the capture processes are done # ensure the capture processes are done
for camera, metrics in self.camera_metrics.items(): for camera, metrics in self.camera_metrics.items():
capture_process = metrics.capture_process capture_process = metrics.capture_process
@ -712,4 +706,6 @@ class FrigateApp:
shm.close() shm.close()
shm.unlink() shm.unlink()
ServiceManager.current().shutdown(wait=True)
os._exit(os.EX_OK) os._exit(os.EX_OK)

View File

@ -9,7 +9,6 @@ from typing import Tuple
import numpy as np import numpy as np
import requests import requests
import frigate.util as util
from frigate.camera import CameraMetrics from frigate.camera import CameraMetrics
from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
@ -26,6 +25,7 @@ from frigate.const import (
from frigate.ffmpeg_presets import parse_preset_input from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe from frigate.log import LogPipe
from frigate.object_detection import load_labels from frigate.object_detection import load_labels
from frigate.service_manager import ServiceProcess
from frigate.util.builtin import get_ffmpeg_arg_list from frigate.util.builtin import get_ffmpeg_arg_list
from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg
@ -63,13 +63,15 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]:
) )
class AudioProcessor(util.Process): class AudioProcessor(ServiceProcess):
name = "frigate.audio_manager"
def __init__( def __init__(
self, self,
cameras: list[CameraConfig], cameras: list[CameraConfig],
camera_metrics: dict[str, CameraMetrics], camera_metrics: dict[str, CameraMetrics],
): ):
super().__init__(name="frigate.audio_manager", daemon=True) super().__init__()
self.camera_metrics = camera_metrics self.camera_metrics = camera_metrics
self.cameras = cameras self.cameras = cameras

View File

@ -59,3 +59,7 @@ ignore_errors = false
[mypy-frigate.watchdog] [mypy-frigate.watchdog]
ignore_errors = false ignore_errors = false
disallow_untyped_calls = false disallow_untyped_calls = false
[mypy-frigate.service_manager.*]
ignore_errors = false

View File

@ -0,0 +1,4 @@
from .multiprocessing import ServiceProcess
from .service import Service, ServiceManager
__all__ = ["Service", "ServiceProcess", "ServiceManager"]

View File

@ -0,0 +1,164 @@
import asyncio
import faulthandler
import logging
import multiprocessing as mp
import signal
import sys
import threading
from abc import ABC, abstractmethod
from asyncio.exceptions import TimeoutError
from logging.handlers import QueueHandler
from types import FrameType
from typing import Optional
import frigate.log
from .multiprocessing_waiter import wait as mp_wait
from .service import Service, ServiceManager
DEFAULT_STOP_TIMEOUT = 10 # seconds
class BaseServiceProcess(Service, ABC):
"""A Service the manages a multiprocessing.Process."""
_process: Optional[mp.Process]
def __init__(
self,
*,
name: Optional[str] = None,
manager: Optional[ServiceManager] = None,
) -> None:
super().__init__(name=name, manager=manager)
self._process = None
async def on_start(self) -> None:
if self._process is not None:
if self._process.is_alive():
return # Already started.
else:
self._process.close()
# At this point, the process is either stopped or dead, so we can recreate it.
self._process = mp.Process(target=self._run)
self._process.name = self.name
self._process.daemon = True
self.before_start()
self._process.start()
self.after_start()
self.manager.logger.info(f"Started {self.name} (pid: {self._process.pid})")
async def on_stop(
self,
*,
force: bool = False,
timeout: Optional[float] = None,
) -> None:
if timeout is None:
timeout = DEFAULT_STOP_TIMEOUT
if self._process is None:
return # Already stopped.
running = True
if not force:
self._process.terminate()
try:
await asyncio.wait_for(mp_wait(self._process), timeout)
running = False
except TimeoutError:
self.manager.logger.warning(
f"{self.name} is still running after "
f"{timeout} seconds. Killing."
)
if running:
self._process.kill()
await mp_wait(self._process)
self._process.close()
self._process = None
self.manager.logger.info(f"{self.name} stopped")
@property
def pid(self) -> Optional[int]:
return self._process.pid if self._process else None
def _run(self) -> None:
self.before_run()
self.run()
self.after_run()
def before_start(self) -> None:
pass
def after_start(self) -> None:
pass
def before_run(self) -> None:
pass
def after_run(self) -> None:
pass
@abstractmethod
def run(self) -> None:
pass
def __getstate__(self) -> dict:
return {
k: v
for k, v in self.__dict__.items()
if not (k.startswith("_Service__") or k == "_process")
}
class ServiceProcess(BaseServiceProcess):
logger: logging.Logger
@property
def stop_event(self) -> threading.Event:
# Lazily create the stop_event. This allows the signal handler to tell if anyone is
# monitoring the stop event, and to raise a SystemExit if not.
if "stop_event" not in self.__dict__:
stop_event = threading.Event()
self.__dict__["stop_event"] = stop_event
else:
stop_event = self.__dict__["stop_event"]
assert isinstance(stop_event, threading.Event)
return stop_event
def before_start(self) -> None:
if frigate.log.log_listener is None:
raise RuntimeError("Logging has not yet been set up.")
self.__log_queue = frigate.log.log_listener.queue
def before_run(self) -> None:
super().before_run()
faulthandler.enable()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
# Get the stop_event through the dict to bypass lazy initialization.
stop_event = self.__dict__.get("stop_event")
if stop_event is not None:
# Someone is monitoring stop_event. We should set it.
stop_event.set()
else:
# Nobody is monitoring stop_event. We should raise SystemExit.
sys.exit()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
self.logger = logging.getLogger(self.name)
logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue))
del self.__log_queue

View File

@ -0,0 +1,150 @@
import asyncio
import functools
import logging
import multiprocessing as mp
import queue
import threading
from multiprocessing.connection import Connection
from multiprocessing.connection import wait as mp_wait
from socket import socket
from typing import Any, Optional, Union
logger = logging.getLogger(__name__)
class MultiprocessingWaiter(threading.Thread):
"""A background thread that manages futures for the multiprocessing.connection.wait() method."""
def __init__(self) -> None:
super().__init__(daemon=True)
# Queue of objects to wait for and futures to set results for.
self._queue: queue.Queue[tuple[Any, asyncio.Future[None]]] = queue.Queue()
# This is required to get mp_wait() to wake up when new objects to wait for are received.
receive, send = mp.Pipe(duplex=False)
self._receive_connection = receive
self._send_connection = send
def wait_for_sentinel(self, sentinel: Any) -> asyncio.Future[None]:
"""Create an asyncio.Future tracking a sentinel for multiprocessing.connection.wait()
Warning: This method is NOT thread-safe.
"""
# This would be incredibly stupid, but you never know.
assert sentinel != self._receive_connection
# Send the future to the background thread for processing.
future = asyncio.get_running_loop().create_future()
self._queue.put((sentinel, future))
# Notify the background thread.
#
# This is the non-thread-safe part, but since this method is not really meant to be called
# by users, we can get away with not adding a lock at this point (to avoid adding 2 locks).
self._send_connection.send_bytes(b".")
return future
def run(self) -> None:
logger.debug("Started background thread")
wait_dict: dict[Any, set[asyncio.Future[None]]] = {
self._receive_connection: set()
}
while True:
for ready_obj in mp_wait(wait_dict.keys()):
# Make sure we never remove the receive connection from the wait dict
if ready_obj is self._receive_connection:
continue
logger.debug(
f"Sentinel {ready_obj!r} is ready. "
f"Notifying {len(wait_dict[ready_obj])} future(s)."
)
# Go over all the futures attached to this object and mark them as ready.
for fut in wait_dict.pop(ready_obj):
if fut.cancelled():
logger.debug(
f"A future for sentinel {ready_obj!r} is ready, "
"but the future is cancelled. Skipping."
)
else:
fut.get_loop().call_soon_threadsafe(
# Note: We need to check fut.cancelled() again, since it might
# have been set before the event loop's definition of "soon".
functools.partial(
lambda fut: fut.cancelled() or fut.set_result(None), fut
)
)
# Check for cancellations in the remaining futures.
done_objects = []
for obj, fut_set in wait_dict.items():
if obj is self._receive_connection:
continue
# Find any cancelled futures and remove them.
cancelled = [fut for fut in fut_set if fut.cancelled()]
fut_set.difference_update(cancelled)
logger.debug(
f"Removing {len(cancelled)} future(s) from sentinel: {obj!r}"
)
# Mark objects with no remaining futures for removal.
if len(fut_set) == 0:
done_objects.append(obj)
# Remove any objects that are done after removing cancelled futures.
for obj in done_objects:
logger.debug(
f"Sentinel {obj!r} no longer has any futures waiting for it."
)
del wait_dict[obj]
# Get new objects to wait for from the queue.
while True:
try:
obj, fut = self._queue.get_nowait()
self._receive_connection.recv_bytes(maxlength=1)
self._queue.task_done()
logger.debug(f"Received new sentinel: {obj!r}")
wait_dict.setdefault(obj, set()).add(fut)
except queue.Empty:
break
waiter_lock = threading.Lock()
waiter_thread: Optional[MultiprocessingWaiter] = None
async def wait(object: Union[mp.Process, Connection, socket]) -> None:
"""Wait for the supplied object to be ready.
Under the hood, this uses multiprocessing.connection.wait() and a background thread manage the
returned futures.
"""
global waiter_thread, waiter_lock
sentinel: Union[Connection, socket, int]
if isinstance(object, mp.Process):
sentinel = object.sentinel
elif isinstance(object, Connection) or isinstance(object, socket):
sentinel = object
else:
raise ValueError(f"Cannot wait for object of type {type(object).__qualname__}")
with waiter_lock:
if waiter_thread is None:
# Start a new waiter thread.
waiter_thread = MultiprocessingWaiter()
waiter_thread.start()
# Create the future while still holding the lock,
# since wait_for_sentinel() is not thread safe.
fut = waiter_thread.wait_for_sentinel(sentinel)
await fut

View File

@ -0,0 +1,446 @@
from __future__ import annotations
import asyncio
import atexit
import logging
import threading
from abc import ABC, abstractmethod
from contextvars import ContextVar
from dataclasses import dataclass
from functools import partial
from typing import Coroutine, Optional, Union, cast
from typing_extensions import Self
class Service(ABC):
"""An abstract service instance."""
def __init__(
self,
*,
name: Optional[str] = None,
manager: Optional[ServiceManager] = None,
):
if name:
self.__dict__["name"] = name
self.__manager = manager or ServiceManager.current()
self.__lock = asyncio.Lock(loop=self.__manager._event_loop)
self.__manager._register(self)
@property
def name(self) -> str:
try:
return cast(str, self.__dict__["name"])
except KeyError:
return type(self).__qualname__
@property
def manager(self) -> ServiceManager:
"""The service manager this service is registered with."""
try:
return self.__manager
except AttributeError:
raise RuntimeError("Cannot access associated service manager")
def start(
self,
*,
wait: bool = False,
wait_timeout: Optional[float] = None,
) -> Self:
"""Start this service.
:param wait: If set, this function will block until the task is complete.
:param wait_timeout: If set, this function will not return until the task is complete or the
specified timeout has elapsed.
"""
self.manager.run_task(
self.on_start(),
wait=wait,
wait_timeout=wait_timeout,
lock=self.__lock,
)
return self
def stop(
self,
*,
force: bool = False,
timeout: Optional[float] = None,
wait: bool = False,
wait_timeout: Optional[float] = None,
) -> Self:
"""Stop this service.
:param force: If set, the service will be killed immediately.
:param timeout: Maximum amount of time to wait before force-killing the service.
:param wait: If set, this function will block until the task is complete.
:param wait_timeout: If set, this function will not return until the task is complete or the
specified timeout has elapsed.
"""
self.manager.run_task(
self.on_stop(force=force, timeout=timeout),
wait=wait,
wait_timeout=wait_timeout,
lock=self.__lock,
)
return self
def restart(
self,
*,
force: bool = False,
stop_timeout: Optional[float] = None,
wait: bool = False,
wait_timeout: Optional[float] = None,
) -> Self:
"""Restart this service.
:param force: If set, the service will be killed immediately.
:param timeout: Maximum amount of time to wait before force-killing the service.
:param wait: If set, this function will block until the task is complete.
:param wait_timeout: If set, this function will not return until the task is complete or the
specified timeout has elapsed.
"""
self.manager.run_task(
self.on_restart(force=force, stop_timeout=stop_timeout),
wait=wait,
wait_timeout=wait_timeout,
lock=self.__lock,
)
return self
@abstractmethod
async def on_start(self) -> None:
pass
@abstractmethod
async def on_stop(
self,
*,
force: bool = False,
timeout: Optional[float] = None,
) -> None:
pass
async def on_restart(
self,
*,
force: bool = False,
stop_timeout: Optional[float] = None,
) -> None:
await self.on_stop(force=force, timeout=stop_timeout)
await self.on_start()
default_service_manager_lock = threading.Lock()
default_service_manager: Optional[ServiceManager] = None
current_service_manager: ContextVar[ServiceManager] = ContextVar(
"current_service_manager"
)
@dataclass
class Command:
"""A coroutine to execute in the service manager thread.
Attributes:
coro: The coroutine to execute.
lock: An async lock to acquire before calling the coroutine.
done: If specified, the service manager will set this event after the command completes.
"""
coro: Coroutine
lock: Optional[asyncio.Lock] = None
done: Optional[threading.Event] = None
class ServiceManager:
"""A set of services, along with the global state required to manage them efficiently.
Typically users of the service infrastructure will not interact with a service manager directly,
but rather through individual Service subclasses that will automatically manage a service
manager instance.
Each service manager instance has a background thread in which service lifecycle tasks are
executed in an async executor. This is done to avoid head-of-line blocking in the business logic
that spins up individual services. This thread is automatically started when the service manager
is created and stopped either manually, or on application exit.
All (public) service manager methods are thread-safe.
"""
_name: str
_logger: logging.Logger
# The set of services this service manager knows about.
_services: dict[str, Service]
_services_lock: threading.Lock
# Commands will be queued with associated event loop. Queueing `None` signals shutdown.
_command_queue: asyncio.Queue[Union[Command, None]]
_event_loop: asyncio.AbstractEventLoop
# The pending command counter is used to ensure all commands have been queued before shutdown.
_pending_commands: AtomicCounter
# The set of pending tasks after they have been received by the background thread and spawned.
_tasks: set
# Fired after the async runtime starts. Object initialization completes after this is set.
_setup_event: threading.Event
# Will be acquired to ensure the shutdown sentinel is sent only once. Never released.
_shutdown_lock: threading.Lock
def __init__(self, *, name: Optional[str] = None):
self._name = name if name is not None else (__package__ or __name__)
self._logger = logging.getLogger(self.name)
self._services = dict()
self._services_lock = threading.Lock()
self._pending_commands = AtomicCounter()
self._tasks = set()
self._shutdown_lock = threading.Lock()
# --- Start the manager thread and wait for it to be ready. ---
self._setup_event = threading.Event()
async def start_manager() -> None:
self._event_loop = asyncio.get_running_loop()
self._command_queue = asyncio.Queue()
self._setup_event.set()
await self._monitor_command_queue()
self._manager_thread = threading.Thread(
name=self.name,
target=lambda: asyncio.run(start_manager()),
daemon=True,
)
self._manager_thread.start()
atexit.register(partial(self.shutdown, wait=True))
self._setup_event.wait()
@property
def name(self) -> str:
"""The name of this service manager. Primarily intended for logging purposes."""
return self._name
@property
def logger(self) -> logging.Logger:
"""The logger used by this service manager."""
return self._logger
@classmethod
def current(cls) -> ServiceManager:
"""The service manager set in the current context (async task or thread).
A global default service manager will be automatically created on first access."""
global default_service_manager
current = current_service_manager.get(None)
if current is None:
with default_service_manager_lock:
if default_service_manager is None:
default_service_manager = cls()
current = default_service_manager
current_service_manager.set(current)
return current
def make_current(self) -> None:
"""Make this the current service manager."""
current_service_manager.set(self)
def run_task(
self,
coro: Coroutine,
*,
wait: bool = False,
wait_timeout: Optional[float] = None,
lock: Optional[asyncio.Lock] = None,
) -> None:
"""Run an async task in the service manager thread.
:param wait: If set, this function will block until the task is complete.
:param wait_timeout: If set, this function will not return until the task is complete or the
specified timeout has elapsed.
"""
if not isinstance(coro, Coroutine):
raise TypeError(f"Cannot schedule task for object of type {type(coro)}")
cmd = Command(coro=coro, lock=lock)
if wait or wait_timeout is not None:
cmd.done = threading.Event()
self._send_command(cmd)
if cmd.done is not None:
cmd.done.wait(timeout=wait_timeout)
def shutdown(
self, *, wait: bool = False, wait_timeout: Optional[float] = None
) -> None:
"""Shutdown the service manager thread.
After the shutdown process completes, any subsequent calls to the service manager will
produce an error.
:param wait: If set, this function will block until the shutdown process is complete.
:param wait_timeout: If set, this function will not return until the shutdown process is
complete or the specified timeout has elapsed.
"""
if self._shutdown_lock.acquire(blocking=False):
self._send_command(None)
if wait:
self._manager_thread.join(timeout=wait_timeout)
def _ensure_running(self) -> None:
self._setup_event.wait()
if not self._manager_thread.is_alive():
raise RuntimeError(f"ServiceManager {self.name} is not running")
def _send_command(self, command: Union[Command, None]) -> None:
self._ensure_running()
async def queue_command() -> None:
await self._command_queue.put(command)
self._pending_commands.sub()
self._pending_commands.add()
asyncio.run_coroutine_threadsafe(queue_command(), self._event_loop)
def _register(self, service: Service) -> None:
"""Register a service with the service manager. This is done by the service constructor."""
self._ensure_running()
with self._services_lock:
name_conflict: Optional[Service] = next(
(
existing
for name, existing in self._services.items()
if name == service.name
),
None,
)
if name_conflict is service:
raise RuntimeError(f"Attempt to re-register service: {service.name}")
elif name_conflict is not None:
raise RuntimeError(f"Duplicate service name: {service.name}")
self.logger.debug(f"Registering service: {service.name}")
self._services[service.name] = service
def _run_command(self, command: Command) -> None:
"""Execute a command and add it to the tasks set."""
def task_done(task: asyncio.Task) -> None:
exc = task.exception()
if exc:
self.logger.exception("Exception in service manager task", exc_info=exc)
self._tasks.discard(task)
if command.done is not None:
command.done.set()
async def task_harness() -> None:
if command.lock is not None:
async with command.lock:
await command.coro
else:
await command.coro
task = asyncio.create_task(task_harness())
task.add_done_callback(task_done)
self._tasks.add(task)
async def _monitor_command_queue(self) -> None:
"""The main function of the background thread."""
self.logger.info("Started service manager")
# Main command processing loop.
while (command := await self._command_queue.get()) is not None:
self._run_command(command)
# Send a stop command to all services. We don't have a status command yet, so we can just
# stop everything and be done with it.
with self._services_lock:
self.logger.debug(f"Stopping {len(self._services)} services")
for service in self._services.values():
service.stop()
# Wait for all commands to finish executing.
await self._shutdown()
self.logger.info("Exiting service manager")
async def _shutdown(self) -> None:
"""Ensure all commands have been queued & executed."""
while True:
command = None
try:
# Try and get a command from the queue.
command = self._command_queue.get_nowait()
except asyncio.QueueEmpty:
if self._pending_commands.value > 0:
# If there are pending commands to queue, await them.
command = await self._command_queue.get()
elif self._tasks:
# If there are still pending tasks, wait for them. These tasks might queue
# commands though, so we have to loop again.
await asyncio.wait(self._tasks)
else:
# Nothing is pending at this point, so we're done here.
break
# If we got a command, run it.
if command is not None:
self._run_command(command)
class AtomicCounter:
"""A lock-protected atomic counter."""
# Modern CPUs have atomics, but python doesn't seem to include them in the standard library.
# Besides, the performance penalty is negligible compared to, well, using python.
# So this will do just fine.
def __init__(self, initial: int = 0):
self._lock = threading.Lock()
self._value = initial
def add(self, value: int = 1) -> None:
with self._lock:
self._value += value
def sub(self, value: int = 1) -> None:
with self._lock:
self._value -= value
@property
def value(self) -> int:
with self._lock:
return self._value