From a468ed316d59818752d1ba0951539b089a554228 Mon Sep 17 00:00:00 2001 From: gtsiam Date: Thu, 3 Oct 2024 20:03:43 +0300 Subject: [PATCH] Added stop_event to util.Process (#14142) * Added stop_event to util.Process util.Process will take care of receiving signals when the stop_event is accessed in the subclass. If it never is, SystemExit is raised instead. This has the effect of still behaving like multiprocessing.Process when stop_event is not accessed, while still allowing subclasses to not deal with the hassle of setting it up. * Give each util.Process their own logger This will help to reduce boilerplate in subclasses. * Give explicit types to util.Process.__init__ This gives better type hinting in the editor. * Use util.Process facilities in AudioProcessor Boilerplate begone! * Removed pointless check in util.Process The log_listener.queue should never be None, unless something has gone extremely wrong in the log setup code. If we're that far gone, crashing is better. * Make sure faulthandler is enabled in all processes This has no effect currently since we're using the fork start_method. However, when we inevidably switch to forkserver (either by choice, or by upgrading to python 3.14+) not having this makes for some really fun failure modes :D --- frigate/events/audio.py | 50 +++++++++++++++++---------------------- frigate/util/process.py | 52 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 68 insertions(+), 34 deletions(-) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index f4b382eba..66a27fcd0 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -2,8 +2,6 @@ import datetime import logging -import signal -import sys import threading import time from typing import Tuple @@ -73,46 +71,42 @@ class AudioProcessor(util.Process): ): super().__init__(name="frigate.audio_manager", daemon=True) - self.logger = logging.getLogger(self.name) self.camera_metrics = camera_metrics self.cameras = cameras def run(self) -> None: - stop_event = threading.Event() audio_threads: list[AudioEventMaintainer] = [] threading.current_thread().name = "process:audio_manager" - signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit()) if len(self.cameras) == 0: return - try: - for camera in self.cameras: - audio_thread = AudioEventMaintainer( - camera, - self.camera_metrics, - stop_event, - ) - audio_threads.append(audio_thread) - audio_thread.start() + for camera in self.cameras: + audio_thread = AudioEventMaintainer( + camera, + self.camera_metrics, + self.stop_event, + ) + audio_threads.append(audio_thread) + audio_thread.start() - self.logger.info(f"Audio processor started (pid: {self.pid})") + self.logger.info(f"Audio processor started (pid: {self.pid})") - while True: - signal.pause() - finally: - stop_event.set() - for thread in audio_threads: - thread.join(1) - if thread.is_alive(): - self.logger.info(f"Waiting for thread {thread.name:s} to exit") - thread.join(10) + while not self.stop_event.wait(): + pass - for thread in audio_threads: - if thread.is_alive(): - self.logger.warning(f"Thread {thread.name} is still alive") - self.logger.info("Exiting audio processor") + for thread in audio_threads: + thread.join(1) + if thread.is_alive(): + self.logger.info(f"Waiting for thread {thread.name:s} to exit") + thread.join(10) + + for thread in audio_threads: + if thread.is_alive(): + self.logger.warning(f"Thread {thread.name} is still alive") + + self.logger.info("Exiting audio processor") class AudioEventMaintainer(threading.Thread): diff --git a/frigate/util/process.py b/frigate/util/process.py index 1a14cae58..ac15539fe 100644 --- a/frigate/util/process.py +++ b/frigate/util/process.py @@ -1,15 +1,29 @@ +import faulthandler import logging import multiprocessing as mp +import signal +import sys +import threading from functools import wraps from logging.handlers import QueueHandler -from typing import Any +from typing import Any, Callable, Optional import frigate.log class BaseProcess(mp.Process): - def __init__(self, **kwargs): - super().__init__(**kwargs) + def __init__( + self, + *, + name: Optional[str] = None, + target: Optional[Callable] = None, + args: tuple = (), + kwargs: dict = {}, + daemon: Optional[bool] = None, + ): + super().__init__( + name=name, target=target, args=args, kwargs=kwargs, daemon=daemon + ) def start(self, *args, **kwargs): self.before_start() @@ -46,10 +60,36 @@ class BaseProcess(mp.Process): class Process(BaseProcess): + logger: logging.Logger + + @property + def stop_event(self) -> threading.Event: + # Lazily create the stop_event. This allows the signal handler to tell if anyone is + # monitoring the stop event, and to raise a SystemExit if not. + if "stop_event" not in self.__dict__: + self.__dict__["stop_event"] = threading.Event() + return self.__dict__["stop_event"] + def before_start(self) -> None: self.__log_queue = frigate.log.log_listener.queue def before_run(self) -> None: - if self.__log_queue: - logging.basicConfig(handlers=[], force=True) - logging.getLogger().addHandler(QueueHandler(self.__log_queue)) + faulthandler.enable() + + def receiveSignal(signalNumber, frame): + # Get the stop_event through the dict to bypass lazy initialization. + stop_event = self.__dict__.get("stop_event") + if stop_event is not None: + # Someone is monitoring stop_event. We should set it. + stop_event.set() + else: + # Nobody is monitoring stop_event. We should raise SystemExit. + sys.exit() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + self.logger = logging.getLogger(self.name) + + logging.basicConfig(handlers=[], force=True) + logging.getLogger().addHandler(QueueHandler(self.__log_queue))