Shutdown hang (#11793)

* intentionally handle queues during shutdown and carefully manage shutdown order

* more carefully manage shutdown to avoid threadlocks

* use debug for signal logging

* ensure disabled cameras dont break shutdown

* typo
This commit is contained in:
Blake Blackshear 2024-06-06 18:54:38 -05:00 committed by GitHub
parent 5b42c91a91
commit b4384a1be3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 93 additions and 33 deletions

View File

@ -2,7 +2,7 @@ blank_issues_enabled: false
contact_links: contact_links:
- name: Frigate Support - name: Frigate Support
url: https://github.com/blakeblackshear/frigate/discussions/new/choose url: https://github.com/blakeblackshear/frigate/discussions/new/choose
about: Get support for setting up or troubelshooting Frigate. about: Get support for setting up or troubleshooting Frigate.
- name: Frigate Bug Report - name: Frigate Bug Report
url: https://github.com/blakeblackshear/frigate/discussions/new/choose url: https://github.com/blakeblackshear/frigate/discussions/new/choose
about: Report a specific UI or backend bug. about: Report a specific UI or backend bug.

View File

@ -68,7 +68,7 @@ from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, PTZMetricsTypes from frigate.types import CameraMetricsTypes, PTZMetricsTypes
from frigate.util.builtin import save_default_config from frigate.util.builtin import empty_and_close_queue, save_default_config
from frigate.util.config import migrate_frigate_config from frigate.util.config import migrate_frigate_config
from frigate.util.object import get_camera_regions_grid from frigate.util.object import get_camera_regions_grid
from frigate.version import VERSION from frigate.version import VERSION
@ -521,8 +521,9 @@ class FrigateApp:
logger.info(f"Capture process started for {name}: {capture_process.pid}") logger.info(f"Capture process started for {name}: {capture_process.pid}")
def start_audio_processors(self) -> None: def start_audio_processors(self) -> None:
self.audio_process = None
if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0: if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0:
audio_process = mp.Process( self.audio_process = mp.Process(
target=listen_to_audio, target=listen_to_audio,
name="audio_capture", name="audio_capture",
args=( args=(
@ -530,10 +531,10 @@ class FrigateApp:
self.camera_metrics, self.camera_metrics,
), ),
) )
audio_process.daemon = True self.audio_process.daemon = True
audio_process.start() self.audio_process.start()
self.processes["audio_detector"] = audio_process.pid or 0 self.processes["audio_detector"] = self.audio_process.pid or 0
logger.info(f"Audio process started: {audio_process.pid}") logger.info(f"Audio process started: {self.audio_process.pid}")
def start_timeline_processor(self) -> None: def start_timeline_processor(self) -> None:
self.timeline_processor = TimelineProcessor( self.timeline_processor = TimelineProcessor(
@ -706,9 +707,9 @@ class FrigateApp:
self.check_shm() self.check_shm()
self.init_auth() self.init_auth()
# Flask only listens for SIGINT, so we need to catch SIGTERM and send SIGINT
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
self.stop() os.kill(os.getpid(), signal.SIGINT)
sys.exit()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)
@ -717,10 +718,13 @@ class FrigateApp:
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
logger.info("Flask has exited...")
self.stop() self.stop()
def stop(self) -> None: def stop(self) -> None:
logger.info("Stopping...") logger.info("Stopping...")
self.stop_event.set() self.stop_event.set()
# set an end_time on entries without an end_time before exiting # set an end_time on entries without an end_time before exiting
@ -731,43 +735,76 @@ class FrigateApp:
ReviewSegment.end_time == None ReviewSegment.end_time == None
).execute() ).execute()
# Stop Communicators # stop the audio process
self.inter_process_communicator.stop() if self.audio_process is not None:
self.inter_config_updater.stop() self.audio_process.terminate()
self.inter_detection_proxy.stop() self.audio_process.join()
# ensure the capture processes are done
for camera in self.camera_metrics.keys():
capture_process = self.camera_metrics[camera]["capture_process"]
if capture_process is not None:
logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.terminate()
capture_process.join()
# ensure the camera processors are done
for camera in self.camera_metrics.keys():
camera_process = self.camera_metrics[camera]["process"]
if camera_process is not None:
logger.info(f"Waiting for process for {camera} to stop")
camera_process.terminate()
camera_process.join()
logger.info(f"Closing frame queue for {camera}")
frame_queue = self.camera_metrics[camera]["frame_queue"]
empty_and_close_queue(frame_queue)
# ensure the detectors are done
for detector in self.detectors.values(): for detector in self.detectors.values():
detector.stop() detector.stop()
# Empty the detection queue and set the events for all requests empty_and_close_queue(self.detection_queue)
while not self.detection_queue.empty(): logger.info("Detection queue closed")
connection_id = self.detection_queue.get(timeout=1)
self.detection_out_events[connection_id].set() self.detected_frames_processor.join()
self.detection_queue.close() empty_and_close_queue(self.detected_frames_queue)
self.detection_queue.join_thread() logger.info("Detected frames queue closed")
self.timeline_processor.join()
self.event_processor.join()
empty_and_close_queue(self.timeline_queue)
logger.info("Timeline queue closed")
self.output_processor.terminate()
self.output_processor.join()
self.recording_process.terminate()
self.recording_process.join()
self.review_segment_process.terminate()
self.review_segment_process.join()
self.external_event_processor.stop() self.external_event_processor.stop()
self.dispatcher.stop() self.dispatcher.stop()
self.detected_frames_processor.join()
self.ptz_autotracker_thread.join() self.ptz_autotracker_thread.join()
self.event_processor.join()
self.event_cleanup.join() self.event_cleanup.join()
self.record_cleanup.join() self.record_cleanup.join()
self.stats_emitter.join() self.stats_emitter.join()
self.frigate_watchdog.join() self.frigate_watchdog.join()
self.db.stop() self.db.stop()
# Stop Communicators
self.inter_process_communicator.stop()
self.inter_config_updater.stop()
self.inter_detection_proxy.stop()
while len(self.detection_shms) > 0: while len(self.detection_shms) > 0:
shm = self.detection_shms.pop() shm = self.detection_shms.pop()
shm.close() shm.close()
shm.unlink() shm.unlink()
for queue in [ self.log_process.terminate()
self.detected_frames_queue, self.log_process.join()
self.log_queue,
]: os._exit(os.EX_OK)
if queue is not None:
while not queue.empty():
queue.get_nowait()
queue.close()
queue.join_thread()

View File

@ -83,6 +83,7 @@ def listen_to_audio(
logger.info("Exiting audio detector...") logger.info("Exiting audio detector...")
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
logger.debug(f"Audio process received signal {signalNumber}")
stop_event.set() stop_event.set()
exit_process() exit_process()

View File

@ -57,8 +57,8 @@ def log_process(log_queue: Queue) -> None:
while True: while True:
try: try:
record = log_queue.get(timeout=1) record = log_queue.get(block=True, timeout=1.0)
except (queue.Empty, KeyboardInterrupt): except queue.Empty:
if stop_event.is_set(): if stop_event.is_set():
break break
continue continue

View File

@ -38,6 +38,7 @@ def output_frames(
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber, frame): def receiveSignal(signalNumber, frame):
logger.debug(f"Output frames process received signal {signalNumber}")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)

View File

@ -22,6 +22,7 @@ def manage_recordings(config: FrigateConfig) -> None:
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
logger.debug(f"Recording manager process received signal {signalNumber}")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)

View File

@ -20,6 +20,7 @@ def manage_review_segments(config: FrigateConfig) -> None:
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
logger.debug(f"Manage review segments process received signal {signalNumber}")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)

View File

@ -3,6 +3,8 @@
import copy import copy
import datetime import datetime
import logging import logging
import multiprocessing as mp
import queue
import re import re
import shlex import shlex
import urllib.parse import urllib.parse
@ -337,3 +339,13 @@ def clear_and_unlink(file: Path, missing_ok: bool = True) -> None:
pass pass
file.unlink(missing_ok=missing_ok) file.unlink(missing_ok=missing_ok)
def empty_and_close_queue(q: mp.Queue):
while True:
try:
q.get(block=True, timeout=0.5)
except queue.Empty:
q.close()
q.join_thread()
return

View File

@ -33,7 +33,7 @@ def restart_frigate():
proc.terminate() proc.terminate()
# otherwise, just try and exit frigate # otherwise, just try and exit frigate
else: else:
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGINT)
def print_stack(sig, frame): def print_stack(sig, frame):

View File

@ -360,6 +360,7 @@ def capture_camera(name, config: CameraConfig, process_info):
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber, frame): def receiveSignal(signalNumber, frame):
logger.debug(f"Capture camera received signal {signalNumber}")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)
@ -446,6 +447,12 @@ def track_camera(
region_grid, region_grid,
) )
# empty the frame queue
logger.info(f"{name}: emptying frame queue")
while not frame_queue.empty():
frame_time = frame_queue.get(False)
frame_manager.delete(f"{name}{frame_time}")
logger.info(f"{name}: exiting subprocess") logger.info(f"{name}: exiting subprocess")