graceful exit of subprocesses

This commit is contained in:
Blake Blackshear 2020-11-29 16:19:59 -06:00
parent 4e0cf3681e
commit d62aec7287
3 changed files with 73 additions and 16 deletions

View File

@ -5,6 +5,7 @@ import multiprocessing as mp
import os import os
import queue import queue
import threading import threading
import signal
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from multiprocessing.connection import Connection from multiprocessing.connection import Connection
from typing import Dict from typing import Dict
@ -109,6 +110,14 @@ def run_detector(name: str, detection_queue: mp.Queue, out_events: Dict[str, mp.
threading.current_thread().name = f"detector:{name}" threading.current_thread().name = f"detector:{name}"
logging.info(f"Starting detection process: {os.getpid()}") logging.info(f"Starting detection process: {os.getpid()}")
listen() listen()
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager() frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(tf_device=tf_device) object_detector = LocalObjectDetector(tf_device=tf_device)
@ -122,7 +131,13 @@ def run_detector(name: str, detection_queue: mp.Queue, out_events: Dict[str, mp.
} }
while True: while True:
connection_id = detection_queue.get() if stop_event.is_set():
break
try:
connection_id = detection_queue.get(timeout=5)
except queue.Empty:
continue
input_frame = frame_manager.get(connection_id, (1,300,300,3)) input_frame = frame_manager.get(connection_id, (1,300,300,3))
if input_frame is None: if input_frame is None:

View File

@ -1,6 +1,8 @@
# adapted from https://medium.com/@jonathonbao/python3-logging-with-multiprocessing-f51f460b8778 # adapted from https://medium.com/@jonathonbao/python3-logging-with-multiprocessing-f51f460b8778
import logging import logging
import threading import threading
import signal
import multiprocessing as mp
from logging import handlers from logging import handlers
@ -19,9 +21,21 @@ def root_configurer(queue):
root.setLevel(logging.INFO) root.setLevel(logging.INFO)
def log_process(queue): def log_process(queue):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = f"logger" threading.current_thread().name = f"logger"
listener_configurer() listener_configurer()
while True: while True:
record = queue.get() if stop_event.is_set() and queue.empty():
break
try:
record = queue.get(timeout=5)
except queue.Empty:
continue
logger = logging.getLogger(record.name) logger = logging.getLogger(record.name)
logger.handle(record) logger.handle(record)

View File

@ -9,6 +9,7 @@ import multiprocessing as mp
import os import os
import queue import queue
import subprocess as sp import subprocess as sp
import signal
import threading import threading
import time import time
from collections import defaultdict from collections import defaultdict
@ -72,8 +73,7 @@ def create_tensor_input(frame, region):
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
return np.expand_dims(cropped_frame, axis=0) return np.expand_dims(cropped_frame, axis=0)
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size=None, ffmpeg_process=None): def stop_ffmpeg(ffmpeg_process):
if not ffmpeg_process is None:
logger.info("Terminating the existing ffmpeg process...") logger.info("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate() ffmpeg_process.terminate()
try: try:
@ -85,6 +85,10 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size=None, ffmpeg_process=None):
ffmpeg_process.communicate() ffmpeg_process.communicate()
ffmpeg_process = None ffmpeg_process = None
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size=None, ffmpeg_process=None):
if not ffmpeg_process is None:
stop_ffmpeg(ffmpeg_process)
if frame_size is None: if frame_size is None:
process = sp.Popen(ffmpeg_cmd, stdout = sp.DEVNULL, stdin = sp.DEVNULL, start_new_session=True) process = sp.Popen(ffmpeg_cmd, stdout = sp.DEVNULL, stdin = sp.DEVNULL, start_new_session=True)
else: else:
@ -133,7 +137,7 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram
frame_queue.put(current_frame.value) frame_queue.put(current_frame.value)
class CameraWatchdog(threading.Thread): class CameraWatchdog(threading.Thread):
def __init__(self, camera_name, config, frame_queue, camera_fps, ffmpeg_pid): def __init__(self, camera_name, config, frame_queue, camera_fps, ffmpeg_pid, stop_event):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = f"watchdog:{camera_name}" self.name = f"watchdog:{camera_name}"
self.camera_name = camera_name self.camera_name = camera_name
@ -146,6 +150,7 @@ class CameraWatchdog(threading.Thread):
self.frame_queue = frame_queue self.frame_queue = frame_queue
self.frame_shape = self.config.frame_shape_yuv self.frame_shape = self.config.frame_shape_yuv
self.frame_size = self.frame_shape[0] * self.frame_shape[1] self.frame_size = self.frame_shape[0] * self.frame_shape[1]
self.stop_event = stop_event
def run(self): def run(self):
self.start_ffmpeg_detect() self.start_ffmpeg_detect()
@ -160,6 +165,12 @@ class CameraWatchdog(threading.Thread):
time.sleep(10) time.sleep(10)
while True: while True:
if self.stop_event.is_set():
stop_ffmpeg(self.ffmpeg_detect_process)
for p in self.ffmpeg_other_processes:
stop_ffmpeg(p['process'])
break
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
if not self.capture_thread.is_alive(): if not self.capture_thread.is_alive():
@ -212,12 +223,26 @@ class CameraCapture(threading.Thread):
self.fps, self.skipped_fps, self.current_frame) self.fps, self.skipped_fps, self.current_frame)
def capture_camera(name, config: CameraConfig, process_info): def capture_camera(name, config: CameraConfig, process_info):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_queue = process_info['frame_queue'] frame_queue = process_info['frame_queue']
camera_watchdog = CameraWatchdog(name, config, frame_queue, process_info['camera_fps'], process_info['ffmpeg_pid']) camera_watchdog = CameraWatchdog(name, config, frame_queue, process_info['camera_fps'], process_info['ffmpeg_pid'], stop_event)
camera_watchdog.start() camera_watchdog.start()
camera_watchdog.join() camera_watchdog.join()
def track_camera(name, config: CameraConfig, detection_queue, result_connection, detected_objects_queue, process_info): def track_camera(name, config: CameraConfig, detection_queue, result_connection, detected_objects_queue, process_info):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = f"process:{name}" threading.current_thread().name = f"process:{name}"
listen() listen()
@ -236,7 +261,7 @@ def track_camera(name, config: CameraConfig, detection_queue, result_connection,
frame_manager = SharedMemoryFrameManager() frame_manager = SharedMemoryFrameManager()
process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector, process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector,
object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask) object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask, stop_event)
logger.info(f"{name}: exiting subprocess") logger.info(f"{name}: exiting subprocess")
@ -273,7 +298,7 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
frame_manager: FrameManager, motion_detector: MotionDetector, frame_manager: FrameManager, motion_detector: MotionDetector,
object_detector: RemoteObjectDetector, object_tracker: ObjectTracker, object_detector: RemoteObjectDetector, object_tracker: ObjectTracker,
detected_objects_queue: mp.Queue, process_info: Dict, detected_objects_queue: mp.Queue, process_info: Dict,
objects_to_track: List[str], object_filters, mask, objects_to_track: List[str], object_filters, mask, stop_event,
exit_on_empty: bool = False): exit_on_empty: bool = False):
fps = process_info['process_fps'] fps = process_info['process_fps']
@ -284,6 +309,9 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
fps_tracker.start() fps_tracker.start()
while True: while True:
if stop_event.is_set():
break
if exit_on_empty and frame_queue.empty(): if exit_on_empty and frame_queue.empty():
logger.info(f"Exiting track_objects...") logger.info(f"Exiting track_objects...")
break break