diff --git a/.cspell/frigate-dictionary.txt b/.cspell/frigate-dictionary.txt index 0cbcc4beb..b019f8492 100644 --- a/.cspell/frigate-dictionary.txt +++ b/.cspell/frigate-dictionary.txt @@ -42,6 +42,7 @@ codeproject colormap colorspace comms +coro ctypeslib CUDA Cuvid @@ -59,6 +60,7 @@ dsize dtype ECONNRESET edgetpu +fastapi faststart fflags ffprobe @@ -237,6 +239,7 @@ sleeptime SNDMORE socs sqliteq +sqlitevecq ssdlite statm stimeout @@ -271,6 +274,7 @@ unraid unreviewed userdata usermod +uvicorn vaapi vainfo variations diff --git a/frigate/app.py b/frigate/app.py index 0cf76699c..bc4f626e0 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -63,6 +63,7 @@ from frigate.record.cleanup import RecordingCleanup from frigate.record.export import migrate_exports from frigate.record.record import manage_recordings from frigate.review.review import manage_review_segments +from frigate.service_manager import ServiceManager from frigate.stats.emitter import StatsEmitter from frigate.stats.util import stats_init from frigate.storage import StorageMaintainer @@ -78,7 +79,6 @@ logger = logging.getLogger(__name__) class FrigateApp: def __init__(self, config: FrigateConfig) -> None: - self.audio_process: Optional[mp.Process] = None self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} @@ -449,9 +449,8 @@ class FrigateApp: ] if audio_cameras: - self.audio_process = AudioProcessor(audio_cameras, self.camera_metrics) - self.audio_process.start() - self.processes["audio_detector"] = self.audio_process.pid or 0 + proc = AudioProcessor(audio_cameras, self.camera_metrics).start(wait=True) + self.processes["audio_detector"] = proc.pid or 0 def start_timeline_processor(self) -> None: self.timeline_processor = TimelineProcessor( @@ -639,11 +638,6 @@ class FrigateApp: ReviewSegment.end_time == None ).execute() - # stop the audio process - if self.audio_process: - self.audio_process.terminate() - self.audio_process.join() - # ensure the capture processes are done for camera, metrics in self.camera_metrics.items(): capture_process = metrics.capture_process @@ -712,4 +706,6 @@ class FrigateApp: shm.close() shm.unlink() + ServiceManager.current().shutdown(wait=True) + os._exit(os.EX_OK) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 66a27fcd0..45706dcc8 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -9,7 +9,6 @@ from typing import Tuple import numpy as np import requests -import frigate.util as util from frigate.camera import CameraMetrics from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum @@ -26,6 +25,7 @@ from frigate.const import ( from frigate.ffmpeg_presets import parse_preset_input from frigate.log import LogPipe from frigate.object_detection import load_labels +from frigate.service_manager import ServiceProcess from frigate.util.builtin import get_ffmpeg_arg_list from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg @@ -63,13 +63,15 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: ) -class AudioProcessor(util.Process): +class AudioProcessor(ServiceProcess): + name = "frigate.audio_manager" + def __init__( self, cameras: list[CameraConfig], camera_metrics: dict[str, CameraMetrics], ): - super().__init__(name="frigate.audio_manager", daemon=True) + super().__init__() self.camera_metrics = camera_metrics self.cameras = cameras diff --git a/frigate/mypy.ini b/frigate/mypy.ini index d8f849334..dd726f454 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -59,3 +59,7 @@ ignore_errors = false [mypy-frigate.watchdog] ignore_errors = false disallow_untyped_calls = false + + +[mypy-frigate.service_manager.*] +ignore_errors = false diff --git a/frigate/service_manager/__init__.py b/frigate/service_manager/__init__.py new file mode 100644 index 000000000..2da23b8b0 --- /dev/null +++ b/frigate/service_manager/__init__.py @@ -0,0 +1,4 @@ +from .multiprocessing import ServiceProcess +from .service import Service, ServiceManager + +__all__ = ["Service", "ServiceProcess", "ServiceManager"] diff --git a/frigate/service_manager/multiprocessing.py b/frigate/service_manager/multiprocessing.py new file mode 100644 index 000000000..d0b169275 --- /dev/null +++ b/frigate/service_manager/multiprocessing.py @@ -0,0 +1,164 @@ +import asyncio +import faulthandler +import logging +import multiprocessing as mp +import signal +import sys +import threading +from abc import ABC, abstractmethod +from asyncio.exceptions import TimeoutError +from logging.handlers import QueueHandler +from types import FrameType +from typing import Optional + +import frigate.log + +from .multiprocessing_waiter import wait as mp_wait +from .service import Service, ServiceManager + +DEFAULT_STOP_TIMEOUT = 10 # seconds + + +class BaseServiceProcess(Service, ABC): + """A Service the manages a multiprocessing.Process.""" + + _process: Optional[mp.Process] + + def __init__( + self, + *, + name: Optional[str] = None, + manager: Optional[ServiceManager] = None, + ) -> None: + super().__init__(name=name, manager=manager) + + self._process = None + + async def on_start(self) -> None: + if self._process is not None: + if self._process.is_alive(): + return # Already started. + else: + self._process.close() + + # At this point, the process is either stopped or dead, so we can recreate it. + self._process = mp.Process(target=self._run) + self._process.name = self.name + self._process.daemon = True + self.before_start() + self._process.start() + self.after_start() + + self.manager.logger.info(f"Started {self.name} (pid: {self._process.pid})") + + async def on_stop( + self, + *, + force: bool = False, + timeout: Optional[float] = None, + ) -> None: + if timeout is None: + timeout = DEFAULT_STOP_TIMEOUT + + if self._process is None: + return # Already stopped. + + running = True + + if not force: + self._process.terminate() + try: + await asyncio.wait_for(mp_wait(self._process), timeout) + running = False + except TimeoutError: + self.manager.logger.warning( + f"{self.name} is still running after " + f"{timeout} seconds. Killing." + ) + + if running: + self._process.kill() + await mp_wait(self._process) + + self._process.close() + self._process = None + + self.manager.logger.info(f"{self.name} stopped") + + @property + def pid(self) -> Optional[int]: + return self._process.pid if self._process else None + + def _run(self) -> None: + self.before_run() + self.run() + self.after_run() + + def before_start(self) -> None: + pass + + def after_start(self) -> None: + pass + + def before_run(self) -> None: + pass + + def after_run(self) -> None: + pass + + @abstractmethod + def run(self) -> None: + pass + + def __getstate__(self) -> dict: + return { + k: v + for k, v in self.__dict__.items() + if not (k.startswith("_Service__") or k == "_process") + } + + +class ServiceProcess(BaseServiceProcess): + logger: logging.Logger + + @property + def stop_event(self) -> threading.Event: + # Lazily create the stop_event. This allows the signal handler to tell if anyone is + # monitoring the stop event, and to raise a SystemExit if not. + if "stop_event" not in self.__dict__: + stop_event = threading.Event() + self.__dict__["stop_event"] = stop_event + else: + stop_event = self.__dict__["stop_event"] + assert isinstance(stop_event, threading.Event) + + return stop_event + + def before_start(self) -> None: + if frigate.log.log_listener is None: + raise RuntimeError("Logging has not yet been set up.") + self.__log_queue = frigate.log.log_listener.queue + + def before_run(self) -> None: + super().before_run() + + faulthandler.enable() + + def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: + # Get the stop_event through the dict to bypass lazy initialization. + stop_event = self.__dict__.get("stop_event") + if stop_event is not None: + # Someone is monitoring stop_event. We should set it. + stop_event.set() + else: + # Nobody is monitoring stop_event. We should raise SystemExit. + sys.exit() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + self.logger = logging.getLogger(self.name) + + logging.basicConfig(handlers=[], force=True) + logging.getLogger().addHandler(QueueHandler(self.__log_queue)) + del self.__log_queue diff --git a/frigate/service_manager/multiprocessing_waiter.py b/frigate/service_manager/multiprocessing_waiter.py new file mode 100644 index 000000000..8acdf583c --- /dev/null +++ b/frigate/service_manager/multiprocessing_waiter.py @@ -0,0 +1,150 @@ +import asyncio +import functools +import logging +import multiprocessing as mp +import queue +import threading +from multiprocessing.connection import Connection +from multiprocessing.connection import wait as mp_wait +from socket import socket +from typing import Any, Optional, Union + +logger = logging.getLogger(__name__) + + +class MultiprocessingWaiter(threading.Thread): + """A background thread that manages futures for the multiprocessing.connection.wait() method.""" + + def __init__(self) -> None: + super().__init__(daemon=True) + + # Queue of objects to wait for and futures to set results for. + self._queue: queue.Queue[tuple[Any, asyncio.Future[None]]] = queue.Queue() + + # This is required to get mp_wait() to wake up when new objects to wait for are received. + receive, send = mp.Pipe(duplex=False) + self._receive_connection = receive + self._send_connection = send + + def wait_for_sentinel(self, sentinel: Any) -> asyncio.Future[None]: + """Create an asyncio.Future tracking a sentinel for multiprocessing.connection.wait() + + Warning: This method is NOT thread-safe. + """ + # This would be incredibly stupid, but you never know. + assert sentinel != self._receive_connection + + # Send the future to the background thread for processing. + future = asyncio.get_running_loop().create_future() + self._queue.put((sentinel, future)) + + # Notify the background thread. + # + # This is the non-thread-safe part, but since this method is not really meant to be called + # by users, we can get away with not adding a lock at this point (to avoid adding 2 locks). + self._send_connection.send_bytes(b".") + + return future + + def run(self) -> None: + logger.debug("Started background thread") + + wait_dict: dict[Any, set[asyncio.Future[None]]] = { + self._receive_connection: set() + } + while True: + for ready_obj in mp_wait(wait_dict.keys()): + # Make sure we never remove the receive connection from the wait dict + if ready_obj is self._receive_connection: + continue + + logger.debug( + f"Sentinel {ready_obj!r} is ready. " + f"Notifying {len(wait_dict[ready_obj])} future(s)." + ) + + # Go over all the futures attached to this object and mark them as ready. + for fut in wait_dict.pop(ready_obj): + if fut.cancelled(): + logger.debug( + f"A future for sentinel {ready_obj!r} is ready, " + "but the future is cancelled. Skipping." + ) + else: + fut.get_loop().call_soon_threadsafe( + # Note: We need to check fut.cancelled() again, since it might + # have been set before the event loop's definition of "soon". + functools.partial( + lambda fut: fut.cancelled() or fut.set_result(None), fut + ) + ) + + # Check for cancellations in the remaining futures. + done_objects = [] + for obj, fut_set in wait_dict.items(): + if obj is self._receive_connection: + continue + + # Find any cancelled futures and remove them. + cancelled = [fut for fut in fut_set if fut.cancelled()] + fut_set.difference_update(cancelled) + logger.debug( + f"Removing {len(cancelled)} future(s) from sentinel: {obj!r}" + ) + + # Mark objects with no remaining futures for removal. + if len(fut_set) == 0: + done_objects.append(obj) + + # Remove any objects that are done after removing cancelled futures. + for obj in done_objects: + logger.debug( + f"Sentinel {obj!r} no longer has any futures waiting for it." + ) + del wait_dict[obj] + + # Get new objects to wait for from the queue. + while True: + try: + obj, fut = self._queue.get_nowait() + self._receive_connection.recv_bytes(maxlength=1) + self._queue.task_done() + + logger.debug(f"Received new sentinel: {obj!r}") + + wait_dict.setdefault(obj, set()).add(fut) + except queue.Empty: + break + + +waiter_lock = threading.Lock() +waiter_thread: Optional[MultiprocessingWaiter] = None + + +async def wait(object: Union[mp.Process, Connection, socket]) -> None: + """Wait for the supplied object to be ready. + + Under the hood, this uses multiprocessing.connection.wait() and a background thread manage the + returned futures. + """ + global waiter_thread, waiter_lock + + sentinel: Union[Connection, socket, int] + if isinstance(object, mp.Process): + sentinel = object.sentinel + elif isinstance(object, Connection) or isinstance(object, socket): + sentinel = object + else: + raise ValueError(f"Cannot wait for object of type {type(object).__qualname__}") + + with waiter_lock: + if waiter_thread is None: + # Start a new waiter thread. + waiter_thread = MultiprocessingWaiter() + waiter_thread.start() + + # Create the future while still holding the lock, + # since wait_for_sentinel() is not thread safe. + fut = waiter_thread.wait_for_sentinel(sentinel) + + await fut diff --git a/frigate/service_manager/service.py b/frigate/service_manager/service.py new file mode 100644 index 000000000..62be6205b --- /dev/null +++ b/frigate/service_manager/service.py @@ -0,0 +1,446 @@ +from __future__ import annotations + +import asyncio +import atexit +import logging +import threading +from abc import ABC, abstractmethod +from contextvars import ContextVar +from dataclasses import dataclass +from functools import partial +from typing import Coroutine, Optional, Union, cast + +from typing_extensions import Self + + +class Service(ABC): + """An abstract service instance.""" + + def __init__( + self, + *, + name: Optional[str] = None, + manager: Optional[ServiceManager] = None, + ): + if name: + self.__dict__["name"] = name + + self.__manager = manager or ServiceManager.current() + self.__lock = asyncio.Lock(loop=self.__manager._event_loop) + self.__manager._register(self) + + @property + def name(self) -> str: + try: + return cast(str, self.__dict__["name"]) + except KeyError: + return type(self).__qualname__ + + @property + def manager(self) -> ServiceManager: + """The service manager this service is registered with.""" + try: + return self.__manager + except AttributeError: + raise RuntimeError("Cannot access associated service manager") + + def start( + self, + *, + wait: bool = False, + wait_timeout: Optional[float] = None, + ) -> Self: + """Start this service. + + :param wait: If set, this function will block until the task is complete. + :param wait_timeout: If set, this function will not return until the task is complete or the + specified timeout has elapsed. + """ + + self.manager.run_task( + self.on_start(), + wait=wait, + wait_timeout=wait_timeout, + lock=self.__lock, + ) + + return self + + def stop( + self, + *, + force: bool = False, + timeout: Optional[float] = None, + wait: bool = False, + wait_timeout: Optional[float] = None, + ) -> Self: + """Stop this service. + + :param force: If set, the service will be killed immediately. + :param timeout: Maximum amount of time to wait before force-killing the service. + + :param wait: If set, this function will block until the task is complete. + :param wait_timeout: If set, this function will not return until the task is complete or the + specified timeout has elapsed. + """ + + self.manager.run_task( + self.on_stop(force=force, timeout=timeout), + wait=wait, + wait_timeout=wait_timeout, + lock=self.__lock, + ) + + return self + + def restart( + self, + *, + force: bool = False, + stop_timeout: Optional[float] = None, + wait: bool = False, + wait_timeout: Optional[float] = None, + ) -> Self: + """Restart this service. + + :param force: If set, the service will be killed immediately. + :param timeout: Maximum amount of time to wait before force-killing the service. + + :param wait: If set, this function will block until the task is complete. + :param wait_timeout: If set, this function will not return until the task is complete or the + specified timeout has elapsed. + """ + + self.manager.run_task( + self.on_restart(force=force, stop_timeout=stop_timeout), + wait=wait, + wait_timeout=wait_timeout, + lock=self.__lock, + ) + + return self + + @abstractmethod + async def on_start(self) -> None: + pass + + @abstractmethod + async def on_stop( + self, + *, + force: bool = False, + timeout: Optional[float] = None, + ) -> None: + pass + + async def on_restart( + self, + *, + force: bool = False, + stop_timeout: Optional[float] = None, + ) -> None: + await self.on_stop(force=force, timeout=stop_timeout) + await self.on_start() + + +default_service_manager_lock = threading.Lock() +default_service_manager: Optional[ServiceManager] = None + +current_service_manager: ContextVar[ServiceManager] = ContextVar( + "current_service_manager" +) + + +@dataclass +class Command: + """A coroutine to execute in the service manager thread. + + Attributes: + coro: The coroutine to execute. + lock: An async lock to acquire before calling the coroutine. + done: If specified, the service manager will set this event after the command completes. + """ + + coro: Coroutine + lock: Optional[asyncio.Lock] = None + done: Optional[threading.Event] = None + + +class ServiceManager: + """A set of services, along with the global state required to manage them efficiently. + + Typically users of the service infrastructure will not interact with a service manager directly, + but rather through individual Service subclasses that will automatically manage a service + manager instance. + + Each service manager instance has a background thread in which service lifecycle tasks are + executed in an async executor. This is done to avoid head-of-line blocking in the business logic + that spins up individual services. This thread is automatically started when the service manager + is created and stopped either manually, or on application exit. + + All (public) service manager methods are thread-safe. + """ + + _name: str + _logger: logging.Logger + + # The set of services this service manager knows about. + _services: dict[str, Service] + _services_lock: threading.Lock + + # Commands will be queued with associated event loop. Queueing `None` signals shutdown. + _command_queue: asyncio.Queue[Union[Command, None]] + _event_loop: asyncio.AbstractEventLoop + + # The pending command counter is used to ensure all commands have been queued before shutdown. + _pending_commands: AtomicCounter + + # The set of pending tasks after they have been received by the background thread and spawned. + _tasks: set + + # Fired after the async runtime starts. Object initialization completes after this is set. + _setup_event: threading.Event + + # Will be acquired to ensure the shutdown sentinel is sent only once. Never released. + _shutdown_lock: threading.Lock + + def __init__(self, *, name: Optional[str] = None): + self._name = name if name is not None else (__package__ or __name__) + self._logger = logging.getLogger(self.name) + + self._services = dict() + self._services_lock = threading.Lock() + + self._pending_commands = AtomicCounter() + self._tasks = set() + + self._shutdown_lock = threading.Lock() + + # --- Start the manager thread and wait for it to be ready. --- + + self._setup_event = threading.Event() + + async def start_manager() -> None: + self._event_loop = asyncio.get_running_loop() + self._command_queue = asyncio.Queue() + + self._setup_event.set() + await self._monitor_command_queue() + + self._manager_thread = threading.Thread( + name=self.name, + target=lambda: asyncio.run(start_manager()), + daemon=True, + ) + + self._manager_thread.start() + atexit.register(partial(self.shutdown, wait=True)) + + self._setup_event.wait() + + @property + def name(self) -> str: + """The name of this service manager. Primarily intended for logging purposes.""" + return self._name + + @property + def logger(self) -> logging.Logger: + """The logger used by this service manager.""" + return self._logger + + @classmethod + def current(cls) -> ServiceManager: + """The service manager set in the current context (async task or thread). + + A global default service manager will be automatically created on first access.""" + + global default_service_manager + + current = current_service_manager.get(None) + if current is None: + with default_service_manager_lock: + if default_service_manager is None: + default_service_manager = cls() + + current = default_service_manager + current_service_manager.set(current) + return current + + def make_current(self) -> None: + """Make this the current service manager.""" + + current_service_manager.set(self) + + def run_task( + self, + coro: Coroutine, + *, + wait: bool = False, + wait_timeout: Optional[float] = None, + lock: Optional[asyncio.Lock] = None, + ) -> None: + """Run an async task in the service manager thread. + + :param wait: If set, this function will block until the task is complete. + :param wait_timeout: If set, this function will not return until the task is complete or the + specified timeout has elapsed. + """ + + if not isinstance(coro, Coroutine): + raise TypeError(f"Cannot schedule task for object of type {type(coro)}") + + cmd = Command(coro=coro, lock=lock) + if wait or wait_timeout is not None: + cmd.done = threading.Event() + + self._send_command(cmd) + + if cmd.done is not None: + cmd.done.wait(timeout=wait_timeout) + + def shutdown( + self, *, wait: bool = False, wait_timeout: Optional[float] = None + ) -> None: + """Shutdown the service manager thread. + + After the shutdown process completes, any subsequent calls to the service manager will + produce an error. + + :param wait: If set, this function will block until the shutdown process is complete. + :param wait_timeout: If set, this function will not return until the shutdown process is + complete or the specified timeout has elapsed. + """ + + if self._shutdown_lock.acquire(blocking=False): + self._send_command(None) + if wait: + self._manager_thread.join(timeout=wait_timeout) + + def _ensure_running(self) -> None: + self._setup_event.wait() + if not self._manager_thread.is_alive(): + raise RuntimeError(f"ServiceManager {self.name} is not running") + + def _send_command(self, command: Union[Command, None]) -> None: + self._ensure_running() + + async def queue_command() -> None: + await self._command_queue.put(command) + self._pending_commands.sub() + + self._pending_commands.add() + asyncio.run_coroutine_threadsafe(queue_command(), self._event_loop) + + def _register(self, service: Service) -> None: + """Register a service with the service manager. This is done by the service constructor.""" + + self._ensure_running() + with self._services_lock: + name_conflict: Optional[Service] = next( + ( + existing + for name, existing in self._services.items() + if name == service.name + ), + None, + ) + + if name_conflict is service: + raise RuntimeError(f"Attempt to re-register service: {service.name}") + elif name_conflict is not None: + raise RuntimeError(f"Duplicate service name: {service.name}") + + self.logger.debug(f"Registering service: {service.name}") + self._services[service.name] = service + + def _run_command(self, command: Command) -> None: + """Execute a command and add it to the tasks set.""" + + def task_done(task: asyncio.Task) -> None: + exc = task.exception() + if exc: + self.logger.exception("Exception in service manager task", exc_info=exc) + self._tasks.discard(task) + if command.done is not None: + command.done.set() + + async def task_harness() -> None: + if command.lock is not None: + async with command.lock: + await command.coro + else: + await command.coro + + task = asyncio.create_task(task_harness()) + task.add_done_callback(task_done) + self._tasks.add(task) + + async def _monitor_command_queue(self) -> None: + """The main function of the background thread.""" + + self.logger.info("Started service manager") + + # Main command processing loop. + while (command := await self._command_queue.get()) is not None: + self._run_command(command) + + # Send a stop command to all services. We don't have a status command yet, so we can just + # stop everything and be done with it. + with self._services_lock: + self.logger.debug(f"Stopping {len(self._services)} services") + for service in self._services.values(): + service.stop() + + # Wait for all commands to finish executing. + await self._shutdown() + + self.logger.info("Exiting service manager") + + async def _shutdown(self) -> None: + """Ensure all commands have been queued & executed.""" + + while True: + command = None + try: + # Try and get a command from the queue. + command = self._command_queue.get_nowait() + except asyncio.QueueEmpty: + if self._pending_commands.value > 0: + # If there are pending commands to queue, await them. + command = await self._command_queue.get() + elif self._tasks: + # If there are still pending tasks, wait for them. These tasks might queue + # commands though, so we have to loop again. + await asyncio.wait(self._tasks) + else: + # Nothing is pending at this point, so we're done here. + break + + # If we got a command, run it. + if command is not None: + self._run_command(command) + + +class AtomicCounter: + """A lock-protected atomic counter.""" + + # Modern CPUs have atomics, but python doesn't seem to include them in the standard library. + # Besides, the performance penalty is negligible compared to, well, using python. + # So this will do just fine. + + def __init__(self, initial: int = 0): + self._lock = threading.Lock() + self._value = initial + + def add(self, value: int = 1) -> None: + with self._lock: + self._value += value + + def sub(self, value: int = 1) -> None: + with self._lock: + self._value -= value + + @property + def value(self) -> int: + with self._lock: + return self._value