mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-12-19 19:06:16 +01:00
4bb420d049
* Add service manager infrastructure The changes are (This will be a bit long): - A ServiceManager class that spawns a background thread and deals with service lifecycle management. The idea is that service lifecycle code will run in async functions, so a single thread is enough to manage any (reasonable) amount of services. - A Service class, that offers start(), stop() and restart() methods that simply notify the service manager to... well. Start, stop or restart a service. (!) Warning: Note that this differs from mp.Process.start/stop in that the service commands are sent asynchronously and will complete "eventually". This is good because it means that business logic is fast when booting up and shutting down, but we need to make sure that code does not rely on start() and stop() being instant (Mainly pid assignments). Subclasses of the Service class should use the on_start and on_stop methods to monitor for service events. These will be run by the service manager thread, so we need to be careful not to block execution here. Standard async stuff. (!) Note on service names: Service names should be unique within a ServiceManager. Make sure that you pass the name you want to super().__init__(name="...") if you plan to spawn multiple instances of a service. - A ServiceProcess class: A Service that wraps a multiprocessing.Process into a Service. It offers a run() method subclasses can override and can support in-place restarting using the service manager. And finally, I lied a bit about this whole thing using a single thread. I can't find any way to run python multiprocessing in async, so there is a MultiprocessingWaiter thread that waits for multiprocessing events and notifies any pending futures. This was uhhh... fun? No, not really. But it works. Using this part of the code just involves calling the provided wait method. See the implementation of ServiceProcess for more details. Mirror util.Process hooks onto service process Remove Service.__name attribute Do not serialize process object on ServiceProcess start. asd * Update frigate dictionary * Convert AudioProcessor to service process
447 lines
14 KiB
Python
447 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import atexit
|
|
import logging
|
|
import threading
|
|
from abc import ABC, abstractmethod
|
|
from contextvars import ContextVar
|
|
from dataclasses import dataclass
|
|
from functools import partial
|
|
from typing import Coroutine, Optional, Union, cast
|
|
|
|
from typing_extensions import Self
|
|
|
|
|
|
class Service(ABC):
|
|
"""An abstract service instance."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
name: Optional[str] = None,
|
|
manager: Optional[ServiceManager] = None,
|
|
):
|
|
if name:
|
|
self.__dict__["name"] = name
|
|
|
|
self.__manager = manager or ServiceManager.current()
|
|
self.__lock = asyncio.Lock(loop=self.__manager._event_loop)
|
|
self.__manager._register(self)
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
try:
|
|
return cast(str, self.__dict__["name"])
|
|
except KeyError:
|
|
return type(self).__qualname__
|
|
|
|
@property
|
|
def manager(self) -> ServiceManager:
|
|
"""The service manager this service is registered with."""
|
|
try:
|
|
return self.__manager
|
|
except AttributeError:
|
|
raise RuntimeError("Cannot access associated service manager")
|
|
|
|
def start(
|
|
self,
|
|
*,
|
|
wait: bool = False,
|
|
wait_timeout: Optional[float] = None,
|
|
) -> Self:
|
|
"""Start this service.
|
|
|
|
:param wait: If set, this function will block until the task is complete.
|
|
:param wait_timeout: If set, this function will not return until the task is complete or the
|
|
specified timeout has elapsed.
|
|
"""
|
|
|
|
self.manager.run_task(
|
|
self.on_start(),
|
|
wait=wait,
|
|
wait_timeout=wait_timeout,
|
|
lock=self.__lock,
|
|
)
|
|
|
|
return self
|
|
|
|
def stop(
|
|
self,
|
|
*,
|
|
force: bool = False,
|
|
timeout: Optional[float] = None,
|
|
wait: bool = False,
|
|
wait_timeout: Optional[float] = None,
|
|
) -> Self:
|
|
"""Stop this service.
|
|
|
|
:param force: If set, the service will be killed immediately.
|
|
:param timeout: Maximum amount of time to wait before force-killing the service.
|
|
|
|
:param wait: If set, this function will block until the task is complete.
|
|
:param wait_timeout: If set, this function will not return until the task is complete or the
|
|
specified timeout has elapsed.
|
|
"""
|
|
|
|
self.manager.run_task(
|
|
self.on_stop(force=force, timeout=timeout),
|
|
wait=wait,
|
|
wait_timeout=wait_timeout,
|
|
lock=self.__lock,
|
|
)
|
|
|
|
return self
|
|
|
|
def restart(
|
|
self,
|
|
*,
|
|
force: bool = False,
|
|
stop_timeout: Optional[float] = None,
|
|
wait: bool = False,
|
|
wait_timeout: Optional[float] = None,
|
|
) -> Self:
|
|
"""Restart this service.
|
|
|
|
:param force: If set, the service will be killed immediately.
|
|
:param timeout: Maximum amount of time to wait before force-killing the service.
|
|
|
|
:param wait: If set, this function will block until the task is complete.
|
|
:param wait_timeout: If set, this function will not return until the task is complete or the
|
|
specified timeout has elapsed.
|
|
"""
|
|
|
|
self.manager.run_task(
|
|
self.on_restart(force=force, stop_timeout=stop_timeout),
|
|
wait=wait,
|
|
wait_timeout=wait_timeout,
|
|
lock=self.__lock,
|
|
)
|
|
|
|
return self
|
|
|
|
@abstractmethod
|
|
async def on_start(self) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def on_stop(
|
|
self,
|
|
*,
|
|
force: bool = False,
|
|
timeout: Optional[float] = None,
|
|
) -> None:
|
|
pass
|
|
|
|
async def on_restart(
|
|
self,
|
|
*,
|
|
force: bool = False,
|
|
stop_timeout: Optional[float] = None,
|
|
) -> None:
|
|
await self.on_stop(force=force, timeout=stop_timeout)
|
|
await self.on_start()
|
|
|
|
|
|
default_service_manager_lock = threading.Lock()
|
|
default_service_manager: Optional[ServiceManager] = None
|
|
|
|
current_service_manager: ContextVar[ServiceManager] = ContextVar(
|
|
"current_service_manager"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Command:
|
|
"""A coroutine to execute in the service manager thread.
|
|
|
|
Attributes:
|
|
coro: The coroutine to execute.
|
|
lock: An async lock to acquire before calling the coroutine.
|
|
done: If specified, the service manager will set this event after the command completes.
|
|
"""
|
|
|
|
coro: Coroutine
|
|
lock: Optional[asyncio.Lock] = None
|
|
done: Optional[threading.Event] = None
|
|
|
|
|
|
class ServiceManager:
|
|
"""A set of services, along with the global state required to manage them efficiently.
|
|
|
|
Typically users of the service infrastructure will not interact with a service manager directly,
|
|
but rather through individual Service subclasses that will automatically manage a service
|
|
manager instance.
|
|
|
|
Each service manager instance has a background thread in which service lifecycle tasks are
|
|
executed in an async executor. This is done to avoid head-of-line blocking in the business logic
|
|
that spins up individual services. This thread is automatically started when the service manager
|
|
is created and stopped either manually, or on application exit.
|
|
|
|
All (public) service manager methods are thread-safe.
|
|
"""
|
|
|
|
_name: str
|
|
_logger: logging.Logger
|
|
|
|
# The set of services this service manager knows about.
|
|
_services: dict[str, Service]
|
|
_services_lock: threading.Lock
|
|
|
|
# Commands will be queued with associated event loop. Queueing `None` signals shutdown.
|
|
_command_queue: asyncio.Queue[Union[Command, None]]
|
|
_event_loop: asyncio.AbstractEventLoop
|
|
|
|
# The pending command counter is used to ensure all commands have been queued before shutdown.
|
|
_pending_commands: AtomicCounter
|
|
|
|
# The set of pending tasks after they have been received by the background thread and spawned.
|
|
_tasks: set
|
|
|
|
# Fired after the async runtime starts. Object initialization completes after this is set.
|
|
_setup_event: threading.Event
|
|
|
|
# Will be acquired to ensure the shutdown sentinel is sent only once. Never released.
|
|
_shutdown_lock: threading.Lock
|
|
|
|
def __init__(self, *, name: Optional[str] = None):
|
|
self._name = name if name is not None else (__package__ or __name__)
|
|
self._logger = logging.getLogger(self.name)
|
|
|
|
self._services = dict()
|
|
self._services_lock = threading.Lock()
|
|
|
|
self._pending_commands = AtomicCounter()
|
|
self._tasks = set()
|
|
|
|
self._shutdown_lock = threading.Lock()
|
|
|
|
# --- Start the manager thread and wait for it to be ready. ---
|
|
|
|
self._setup_event = threading.Event()
|
|
|
|
async def start_manager() -> None:
|
|
self._event_loop = asyncio.get_running_loop()
|
|
self._command_queue = asyncio.Queue()
|
|
|
|
self._setup_event.set()
|
|
await self._monitor_command_queue()
|
|
|
|
self._manager_thread = threading.Thread(
|
|
name=self.name,
|
|
target=lambda: asyncio.run(start_manager()),
|
|
daemon=True,
|
|
)
|
|
|
|
self._manager_thread.start()
|
|
atexit.register(partial(self.shutdown, wait=True))
|
|
|
|
self._setup_event.wait()
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
"""The name of this service manager. Primarily intended for logging purposes."""
|
|
return self._name
|
|
|
|
@property
|
|
def logger(self) -> logging.Logger:
|
|
"""The logger used by this service manager."""
|
|
return self._logger
|
|
|
|
@classmethod
|
|
def current(cls) -> ServiceManager:
|
|
"""The service manager set in the current context (async task or thread).
|
|
|
|
A global default service manager will be automatically created on first access."""
|
|
|
|
global default_service_manager
|
|
|
|
current = current_service_manager.get(None)
|
|
if current is None:
|
|
with default_service_manager_lock:
|
|
if default_service_manager is None:
|
|
default_service_manager = cls()
|
|
|
|
current = default_service_manager
|
|
current_service_manager.set(current)
|
|
return current
|
|
|
|
def make_current(self) -> None:
|
|
"""Make this the current service manager."""
|
|
|
|
current_service_manager.set(self)
|
|
|
|
def run_task(
|
|
self,
|
|
coro: Coroutine,
|
|
*,
|
|
wait: bool = False,
|
|
wait_timeout: Optional[float] = None,
|
|
lock: Optional[asyncio.Lock] = None,
|
|
) -> None:
|
|
"""Run an async task in the service manager thread.
|
|
|
|
:param wait: If set, this function will block until the task is complete.
|
|
:param wait_timeout: If set, this function will not return until the task is complete or the
|
|
specified timeout has elapsed.
|
|
"""
|
|
|
|
if not isinstance(coro, Coroutine):
|
|
raise TypeError(f"Cannot schedule task for object of type {type(coro)}")
|
|
|
|
cmd = Command(coro=coro, lock=lock)
|
|
if wait or wait_timeout is not None:
|
|
cmd.done = threading.Event()
|
|
|
|
self._send_command(cmd)
|
|
|
|
if cmd.done is not None:
|
|
cmd.done.wait(timeout=wait_timeout)
|
|
|
|
def shutdown(
|
|
self, *, wait: bool = False, wait_timeout: Optional[float] = None
|
|
) -> None:
|
|
"""Shutdown the service manager thread.
|
|
|
|
After the shutdown process completes, any subsequent calls to the service manager will
|
|
produce an error.
|
|
|
|
:param wait: If set, this function will block until the shutdown process is complete.
|
|
:param wait_timeout: If set, this function will not return until the shutdown process is
|
|
complete or the specified timeout has elapsed.
|
|
"""
|
|
|
|
if self._shutdown_lock.acquire(blocking=False):
|
|
self._send_command(None)
|
|
if wait:
|
|
self._manager_thread.join(timeout=wait_timeout)
|
|
|
|
def _ensure_running(self) -> None:
|
|
self._setup_event.wait()
|
|
if not self._manager_thread.is_alive():
|
|
raise RuntimeError(f"ServiceManager {self.name} is not running")
|
|
|
|
def _send_command(self, command: Union[Command, None]) -> None:
|
|
self._ensure_running()
|
|
|
|
async def queue_command() -> None:
|
|
await self._command_queue.put(command)
|
|
self._pending_commands.sub()
|
|
|
|
self._pending_commands.add()
|
|
asyncio.run_coroutine_threadsafe(queue_command(), self._event_loop)
|
|
|
|
def _register(self, service: Service) -> None:
|
|
"""Register a service with the service manager. This is done by the service constructor."""
|
|
|
|
self._ensure_running()
|
|
with self._services_lock:
|
|
name_conflict: Optional[Service] = next(
|
|
(
|
|
existing
|
|
for name, existing in self._services.items()
|
|
if name == service.name
|
|
),
|
|
None,
|
|
)
|
|
|
|
if name_conflict is service:
|
|
raise RuntimeError(f"Attempt to re-register service: {service.name}")
|
|
elif name_conflict is not None:
|
|
raise RuntimeError(f"Duplicate service name: {service.name}")
|
|
|
|
self.logger.debug(f"Registering service: {service.name}")
|
|
self._services[service.name] = service
|
|
|
|
def _run_command(self, command: Command) -> None:
|
|
"""Execute a command and add it to the tasks set."""
|
|
|
|
def task_done(task: asyncio.Task) -> None:
|
|
exc = task.exception()
|
|
if exc:
|
|
self.logger.exception("Exception in service manager task", exc_info=exc)
|
|
self._tasks.discard(task)
|
|
if command.done is not None:
|
|
command.done.set()
|
|
|
|
async def task_harness() -> None:
|
|
if command.lock is not None:
|
|
async with command.lock:
|
|
await command.coro
|
|
else:
|
|
await command.coro
|
|
|
|
task = asyncio.create_task(task_harness())
|
|
task.add_done_callback(task_done)
|
|
self._tasks.add(task)
|
|
|
|
async def _monitor_command_queue(self) -> None:
|
|
"""The main function of the background thread."""
|
|
|
|
self.logger.info("Started service manager")
|
|
|
|
# Main command processing loop.
|
|
while (command := await self._command_queue.get()) is not None:
|
|
self._run_command(command)
|
|
|
|
# Send a stop command to all services. We don't have a status command yet, so we can just
|
|
# stop everything and be done with it.
|
|
with self._services_lock:
|
|
self.logger.debug(f"Stopping {len(self._services)} services")
|
|
for service in self._services.values():
|
|
service.stop()
|
|
|
|
# Wait for all commands to finish executing.
|
|
await self._shutdown()
|
|
|
|
self.logger.info("Exiting service manager")
|
|
|
|
async def _shutdown(self) -> None:
|
|
"""Ensure all commands have been queued & executed."""
|
|
|
|
while True:
|
|
command = None
|
|
try:
|
|
# Try and get a command from the queue.
|
|
command = self._command_queue.get_nowait()
|
|
except asyncio.QueueEmpty:
|
|
if self._pending_commands.value > 0:
|
|
# If there are pending commands to queue, await them.
|
|
command = await self._command_queue.get()
|
|
elif self._tasks:
|
|
# If there are still pending tasks, wait for them. These tasks might queue
|
|
# commands though, so we have to loop again.
|
|
await asyncio.wait(self._tasks)
|
|
else:
|
|
# Nothing is pending at this point, so we're done here.
|
|
break
|
|
|
|
# If we got a command, run it.
|
|
if command is not None:
|
|
self._run_command(command)
|
|
|
|
|
|
class AtomicCounter:
|
|
"""A lock-protected atomic counter."""
|
|
|
|
# Modern CPUs have atomics, but python doesn't seem to include them in the standard library.
|
|
# Besides, the performance penalty is negligible compared to, well, using python.
|
|
# So this will do just fine.
|
|
|
|
def __init__(self, initial: int = 0):
|
|
self._lock = threading.Lock()
|
|
self._value = initial
|
|
|
|
def add(self, value: int = 1) -> None:
|
|
with self._lock:
|
|
self._value += value
|
|
|
|
def sub(self, value: int = 1) -> None:
|
|
with self._lock:
|
|
self._value -= value
|
|
|
|
@property
|
|
def value(self) -> int:
|
|
with self._lock:
|
|
return self._value
|