Handle SIGINT with forkserver (#18860)

* Pass stopevent from main start

* Share stop event across processes

* preload modules

* remove explicit os._exit call

---------

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
This commit is contained in:
Nicolas Mowen 2025-06-24 11:41:11 -06:00 committed by GitHub
parent a22e24f24b
commit 47a0097e95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 68 additions and 46 deletions

View File

@ -23,6 +23,10 @@ def main() -> None:
setup_logging(manager)
threading.current_thread().name = "frigate"
stop_event = mp.Event()
# send stop event on SIGINT
signal.signal(signal.SIGINT, lambda sig, frame: stop_event.set())
# Make sure we exit cleanly on SIGTERM.
signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit())
@ -110,9 +114,23 @@ def main() -> None:
sys.exit(0)
# Run the main application.
FrigateApp(config, manager).start()
FrigateApp(config, manager, stop_event).start()
if __name__ == "__main__":
mp.set_forkserver_preload(
[
# Standard library and core dependencies
"sqlite3",
# Third-party libraries commonly used in Frigate
"numpy",
"cv2",
"peewee",
"zmq",
"ruamel.yaml",
# Frigate core modules
"frigate.camera.maintainer",
]
)
mp.set_start_method("forkserver", force=True)
main()

View File

@ -79,10 +79,12 @@ logger = logging.getLogger(__name__)
class FrigateApp:
def __init__(self, config: FrigateConfig, manager: SyncManager) -> None:
def __init__(
self, config: FrigateConfig, manager: SyncManager, stop_event: MpEvent
) -> None:
self.metrics_manager = manager
self.audio_process: Optional[mp.Process] = None
self.stop_event: MpEvent = mp.Event()
self.stop_event = stop_event
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
@ -223,14 +225,14 @@ class FrigateApp:
self.processes["go2rtc"] = proc.info["pid"]
def init_recording_manager(self) -> None:
recording_process = RecordProcess(self.config)
recording_process = RecordProcess(self.config, self.stop_event)
self.recording_process = recording_process
recording_process.start()
self.processes["recording"] = recording_process.pid or 0
logger.info(f"Recording process started: {recording_process.pid}")
def init_review_segment_manager(self) -> None:
review_segment_process = ReviewProcess(self.config)
review_segment_process = ReviewProcess(self.config, self.stop_event)
self.review_segment_process = review_segment_process
review_segment_process.start()
self.processes["review_segment"] = review_segment_process.pid or 0
@ -250,8 +252,7 @@ class FrigateApp:
return
embedding_process = EmbeddingProcess(
self.config,
self.embeddings_metrics,
self.config, self.embeddings_metrics, self.stop_event
)
self.embedding_process = embedding_process
embedding_process.start()
@ -387,6 +388,7 @@ class FrigateApp:
list(self.config.cameras.keys()),
self.config,
detector_config,
self.stop_event,
)
def start_ptz_autotracker(self) -> None:
@ -410,7 +412,7 @@ class FrigateApp:
self.detected_frames_processor.start()
def start_video_output_processor(self) -> None:
output_processor = OutputProcess(self.config)
output_processor = OutputProcess(self.config, self.stop_event)
self.output_processor = output_processor
output_processor.start()
logger.info(f"Output process started: {output_processor.pid}")
@ -436,7 +438,7 @@ class FrigateApp:
if audio_cameras:
self.audio_process = AudioProcessor(
self.config, audio_cameras, self.camera_metrics
self.config, audio_cameras, self.camera_metrics, self.stop_event
)
self.audio_process.start()
self.processes["audio_detector"] = self.audio_process.pid or 0
@ -658,4 +660,3 @@ class FrigateApp:
_stop_logging()
self.metrics_manager.shutdown()
os._exit(os.EX_OK)

View File

@ -165,6 +165,7 @@ class CameraMaintainer(threading.Thread):
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
self.stop_event,
)
self.camera_processes[config.name] = camera_process
camera_process.start()
@ -184,7 +185,9 @@ class CameraMaintainer(threading.Thread):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
capture_process = CameraCapture(config, count, self.camera_metrics[name])
capture_process = CameraCapture(
config, count, self.camera_metrics[name], self.stop_event
)
capture_process.daemon = True
self.capture_processes[name] = capture_process
capture_process.start()

View File

@ -5,6 +5,7 @@ import json
import logging
import os
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Union
import regex
@ -28,9 +29,12 @@ logger = logging.getLogger(__name__)
class EmbeddingProcess(FrigateProcess):
def __init__(
self, config: FrigateConfig, metrics: DataProcessorMetrics | None
self,
config: FrigateConfig,
metrics: DataProcessorMetrics | None,
stop_event: MpEvent,
) -> None:
super().__init__(name="frigate.embeddings_manager", daemon=True)
super().__init__(stop_event, name="frigate.embeddings_manager", daemon=True)
self.config = config
self.metrics = metrics

View File

@ -7,6 +7,7 @@ import string
import threading
import time
from multiprocessing.managers import DictProxy
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Tuple
import numpy as np
@ -84,8 +85,9 @@ class AudioProcessor(FrigateProcess):
config: FrigateConfig,
cameras: list[CameraConfig],
camera_metrics: DictProxy,
stop_event: MpEvent,
):
super().__init__(name="frigate.audio_manager", daemon=True)
super().__init__(stop_event, name="frigate.audio_manager", daemon=True)
self.camera_metrics = camera_metrics
self.cameras = cameras

View File

@ -95,8 +95,9 @@ class DetectorRunner(FrigateProcess):
start_time: Value,
config: FrigateConfig,
detector_config: BaseDetectorConfig,
stop_event: MpEvent,
) -> None:
super().__init__(name=name, daemon=True)
super().__init__(stop_event, name=name, daemon=True)
self.detection_queue = detection_queue
self.cameras = cameras
self.avg_speed = avg_speed
@ -166,6 +167,7 @@ class ObjectDetectProcess:
cameras: list[str],
config: FrigateConfig,
detector_config: BaseDetectorConfig,
stop_event: MpEvent,
):
self.name = name
self.cameras = cameras
@ -175,6 +177,7 @@ class ObjectDetectProcess:
self.detect_process: FrigateProcess | None = None
self.config = config
self.detector_config = detector_config
self.stop_event = stop_event
self.start_or_restart()
def stop(self):
@ -202,6 +205,7 @@ class ObjectDetectProcess:
self.detection_start,
self.config,
self.detector_config,
self.stop_event,
)
self.detect_process.start()

View File

@ -5,6 +5,7 @@ import logging
import os
import shutil
import threading
from multiprocessing.synchronize import Event as MpEvent
from wsgiref.simple_server import make_server
from ws4py.server.wsgirefserver import (
@ -72,8 +73,8 @@ def check_disabled_camera_update(
class OutputProcess(FrigateProcess):
def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.output", daemon=True)
def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
super().__init__(stop_event, name="frigate.output", daemon=True)
self.config = config
def run(self) -> None:

View File

@ -1,6 +1,7 @@
"""Run recording maintainer and cleanup."""
import logging
from multiprocessing.synchronize import Event as MpEvent
from playhouse.sqliteq import SqliteQueueDatabase
@ -13,8 +14,8 @@ logger = logging.getLogger(__name__)
class RecordProcess(FrigateProcess):
def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.recording_manager", daemon=True)
def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
super().__init__(stop_event, name="frigate.recording_manager", daemon=True)
self.config = config
def run(self) -> None:

View File

@ -1,6 +1,7 @@
"""Run recording maintainer and cleanup."""
import logging
from multiprocessing.synchronize import Event as MpEvent
from frigate.config import FrigateConfig
from frigate.review.maintainer import ReviewSegmentMaintainer
@ -10,8 +11,8 @@ logger = logging.getLogger(__name__)
class ReviewProcess(FrigateProcess):
def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.review_segment_manager", daemon=True)
def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
super().__init__(stop_event, name="frigate.review_segment_manager", daemon=True)
self.config = config
def run(self) -> None:

View File

@ -1,10 +1,9 @@
import faulthandler
import logging
import multiprocessing as mp
import signal
import sys
import threading
from logging.handlers import QueueHandler
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable, Optional
from setproctitle import setproctitle
@ -16,6 +15,7 @@ from frigate.config.logger import LoggerConfig
class BaseProcess(mp.Process):
def __init__(
self,
stop_event: MpEvent,
*,
name: Optional[str] = None,
target: Optional[Callable] = None,
@ -23,6 +23,7 @@ class BaseProcess(mp.Process):
kwargs: dict = {},
daemon: Optional[bool] = None,
):
self.stop_event = stop_event
super().__init__(
name=name, target=target, args=args, kwargs=kwargs, daemon=daemon
)
@ -42,14 +43,6 @@ class BaseProcess(mp.Process):
class FrigateProcess(BaseProcess):
logger: logging.Logger
@property
def stop_event(self) -> threading.Event:
# Lazily create the stop_event. This allows the signal handler to tell if anyone is
# monitoring the stop event, and to raise a SystemExit if not.
if "stop_event" not in self.__dict__:
self.__dict__["stop_event"] = threading.Event()
return self.__dict__["stop_event"]
def before_start(self) -> None:
self.__log_queue = frigate.log.log_listener.queue
@ -58,18 +51,7 @@ class FrigateProcess(BaseProcess):
threading.current_thread().name = f"process:{self.name}"
faulthandler.enable()
def receiveSignal(signalNumber, frame):
# Get the stop_event through the dict to bypass lazy initialization.
stop_event = self.__dict__.get("stop_event")
if stop_event is not None:
# Someone is monitoring stop_event. We should set it.
stop_event.set()
else:
# Nobody is monitoring stop_event. We should raise SystemExit.
sys.exit()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
# setup logging
self.logger = logging.getLogger(self.name)
logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue))

View File

@ -439,9 +439,13 @@ class CameraCaptureRunner(threading.Thread):
class CameraCapture(FrigateProcess):
def __init__(
self, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics
self,
config: CameraConfig,
shm_frame_count: int,
camera_metrics: CameraMetrics,
stop_event: MpEvent,
) -> None:
super().__init__(name=f"frigate.capture:{config.name}", daemon=True)
super().__init__(stop_event, name=f"frigate.capture:{config.name}", daemon=True)
self.config = config
self.shm_frame_count = shm_frame_count
self.camera_metrics = camera_metrics
@ -472,8 +476,9 @@ class CameraTracker(FrigateProcess):
camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics,
region_grid: list[list[dict[str, Any]]],
stop_event: MpEvent,
) -> None:
super().__init__(name=f"frigate.process:{config.name}", daemon=True)
super().__init__(stop_event, name=f"frigate.process:{config.name}", daemon=True)
self.config = config
self.model_config = model_config
self.labelmap = labelmap