Added stop_event to util.Process (#14142)

* Added stop_event to util.Process

util.Process will take care of receiving signals when the stop_event is
accessed in the subclass. If it never is, SystemExit is raised instead.

This has the effect of still behaving like multiprocessing.Process when
stop_event is not accessed, while still allowing subclasses to not deal
with the hassle of setting it up.

* Give each util.Process their own logger

This will help to reduce boilerplate in subclasses.

* Give explicit types to util.Process.__init__

This gives better type hinting in the editor.

* Use util.Process facilities in AudioProcessor

Boilerplate begone!

* Removed pointless check in util.Process

The log_listener.queue should never be None, unless something has gone
extremely wrong in the log setup code. If we're that far gone, crashing
is better.

* Make sure faulthandler is enabled in all processes

This has no effect currently since we're using the fork start_method.
However, when we inevidably switch to forkserver (either by choice, or
by upgrading to python 3.14+) not having this makes for some really fun
failure modes :D
This commit is contained in:
gtsiam 2024-10-03 20:03:43 +03:00 committed by GitHub
parent e725730982
commit a468ed316d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 68 additions and 34 deletions

View File

@ -2,8 +2,6 @@
import datetime import datetime
import logging import logging
import signal
import sys
import threading import threading
import time import time
from typing import Tuple from typing import Tuple
@ -73,36 +71,31 @@ class AudioProcessor(util.Process):
): ):
super().__init__(name="frigate.audio_manager", daemon=True) super().__init__(name="frigate.audio_manager", daemon=True)
self.logger = logging.getLogger(self.name)
self.camera_metrics = camera_metrics self.camera_metrics = camera_metrics
self.cameras = cameras self.cameras = cameras
def run(self) -> None: def run(self) -> None:
stop_event = threading.Event()
audio_threads: list[AudioEventMaintainer] = [] audio_threads: list[AudioEventMaintainer] = []
threading.current_thread().name = "process:audio_manager" threading.current_thread().name = "process:audio_manager"
signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit())
if len(self.cameras) == 0: if len(self.cameras) == 0:
return return
try:
for camera in self.cameras: for camera in self.cameras:
audio_thread = AudioEventMaintainer( audio_thread = AudioEventMaintainer(
camera, camera,
self.camera_metrics, self.camera_metrics,
stop_event, self.stop_event,
) )
audio_threads.append(audio_thread) audio_threads.append(audio_thread)
audio_thread.start() audio_thread.start()
self.logger.info(f"Audio processor started (pid: {self.pid})") self.logger.info(f"Audio processor started (pid: {self.pid})")
while True: while not self.stop_event.wait():
signal.pause() pass
finally:
stop_event.set()
for thread in audio_threads: for thread in audio_threads:
thread.join(1) thread.join(1)
if thread.is_alive(): if thread.is_alive():
@ -112,6 +105,7 @@ class AudioProcessor(util.Process):
for thread in audio_threads: for thread in audio_threads:
if thread.is_alive(): if thread.is_alive():
self.logger.warning(f"Thread {thread.name} is still alive") self.logger.warning(f"Thread {thread.name} is still alive")
self.logger.info("Exiting audio processor") self.logger.info("Exiting audio processor")

View File

@ -1,15 +1,29 @@
import faulthandler
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import signal
import sys
import threading
from functools import wraps from functools import wraps
from logging.handlers import QueueHandler from logging.handlers import QueueHandler
from typing import Any from typing import Any, Callable, Optional
import frigate.log import frigate.log
class BaseProcess(mp.Process): class BaseProcess(mp.Process):
def __init__(self, **kwargs): def __init__(
super().__init__(**kwargs) self,
*,
name: Optional[str] = None,
target: Optional[Callable] = None,
args: tuple = (),
kwargs: dict = {},
daemon: Optional[bool] = None,
):
super().__init__(
name=name, target=target, args=args, kwargs=kwargs, daemon=daemon
)
def start(self, *args, **kwargs): def start(self, *args, **kwargs):
self.before_start() self.before_start()
@ -46,10 +60,36 @@ class BaseProcess(mp.Process):
class Process(BaseProcess): class Process(BaseProcess):
logger: logging.Logger
@property
def stop_event(self) -> threading.Event:
# Lazily create the stop_event. This allows the signal handler to tell if anyone is
# monitoring the stop event, and to raise a SystemExit if not.
if "stop_event" not in self.__dict__:
self.__dict__["stop_event"] = threading.Event()
return self.__dict__["stop_event"]
def before_start(self) -> None: def before_start(self) -> None:
self.__log_queue = frigate.log.log_listener.queue self.__log_queue = frigate.log.log_listener.queue
def before_run(self) -> None: def before_run(self) -> None:
if self.__log_queue: faulthandler.enable()
def receiveSignal(signalNumber, frame):
# Get the stop_event through the dict to bypass lazy initialization.
stop_event = self.__dict__.get("stop_event")
if stop_event is not None:
# Someone is monitoring stop_event. We should set it.
stop_event.set()
else:
# Nobody is monitoring stop_event. We should raise SystemExit.
sys.exit()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
self.logger = logging.getLogger(self.name)
logging.basicConfig(handlers=[], force=True) logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue)) logging.getLogger().addHandler(QueueHandler(self.__log_queue))