From 3a9781c4f81056e6cc2ad98424a792afe50e15aa Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Mon, 9 Mar 2020 21:12:19 -0500 Subject: [PATCH] handle various scenarios with external process failures --- Dockerfile | 6 ++ detect_objects.py | 101 +++++++++++++++---- frigate/edgetpu.py | 5 +- frigate/object_processing.py | 190 ++++++++++++++++++----------------- frigate/util.py | 8 ++ frigate/video.py | 41 ++++---- 6 files changed, 218 insertions(+), 133 deletions(-) diff --git a/Dockerfile b/Dockerfile index d50ee9bd3..e8674da55 100755 --- a/Dockerfile +++ b/Dockerfile @@ -52,6 +52,12 @@ RUN wget -q https://storage.googleapis.com/download.tensorflow.org/models/tflite mv /detect.tflite /cpu_model.tflite && \ rm /cpu_model.zip +RUN apt -qq update && apt -qq install --no-install-recommends -y \ + gdb \ + python3.7-dbg \ + && rm -rf /var/lib/apt/lists/* \ + && (apt-get autoremove -y; apt-get autoclean -y) + WORKDIR /opt/frigate/ ADD frigate frigate/ COPY detect_objects.py . diff --git a/detect_objects.py b/detect_objects.py index c2d7a114f..71d82e805 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -1,4 +1,7 @@ import os +import sys +import traceback +import signal import cv2 import time import datetime @@ -58,27 +61,50 @@ GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {}) WEB_PORT = CONFIG.get('web_port', 5000) DEBUG = (CONFIG.get('debug', '0') == '1') +def start_plasma_store(): + plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma'] + plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL) + time.sleep(1) + rc = plasma_process.poll() + if rc is not None: + return None + return plasma_process + class CameraWatchdog(threading.Thread): - def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, object_processor): + def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, object_processor, plasma_process): threading.Thread.__init__(self) self.camera_processes = camera_processes self.config = config self.tflite_process = tflite_process self.tracked_objects_queue = tracked_objects_queue self.object_processor = object_processor + self.plasma_process = plasma_process def run(self): time.sleep(10) while True: # wait a bit before checking time.sleep(30) + + # check the plasma process + rc = self.plasma_process.poll() + if rc != None: + print(f"plasma_process exited unexpectedly with {rc}") + self.plasma_process = start_plasma_store() + time.sleep(10) + # check the detection process if (self.tflite_process.detection_start.value > 0.0 and datetime.datetime.now().timestamp() - self.tflite_process.detection_start.value > 10): print("Detection appears to be stuck. Restarting detection process") self.tflite_process.start_or_restart() time.sleep(30) + elif not self.tflite_process.detect_process.is_alive(): + print("Detection appears to have stopped. Restarting detection process") + self.tflite_process.start_or_restart() + time.sleep(30) + # check the camera processes for name, camera_process in self.camera_processes.items(): process = camera_process['process'] if not process.is_alive(): @@ -86,14 +112,33 @@ class CameraWatchdog(threading.Thread): camera_process['fps'].value = float(self.config[name]['fps']) camera_process['skipped_fps'].value = 0.0 camera_process['detection_fps'].value = 0.0 + camera_process['read_start'].value = 0.0 + camera_process['ffmpeg_pid'].value = 0 process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, self.tflite_process.detection_queue, self.tracked_objects_queue, - camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps'])) + camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps'], + camera_process['read_start'], camera_process['ffmpeg_pid'])) process.daemon = True camera_process['process'] = process process.start() print(f"Camera_process started for {name}: {process.pid}") + if (camera_process['read_start'].value > 0.0 and + datetime.datetime.now().timestamp() - camera_process['read_start'].value > 10): + print(f"Process for {name} has been reading from ffmpeg for over 10 seconds long. Killing ffmpeg...") + ffmpeg_pid = camera_process['ffmpeg_pid'].value + if ffmpeg_pid != 0: + try: + os.kill(ffmpeg_pid, signal.SIGTERM) + except OSError: + print(f"Unable to terminate ffmpeg with pid {ffmpeg_pid}") + time.sleep(10) + try: + os.kill(ffmpeg_pid, signal.SIGKILL) + print(f"Unable to kill ffmpeg with pid {ffmpeg_pid}") + except OSError: + pass + def main(): # connect to mqtt and setup last will def on_connect(client, userdata, flags, rc): @@ -117,14 +162,7 @@ def main(): client.connect(MQTT_HOST, MQTT_PORT, 60) client.loop_start() - # start plasma store - plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma'] - plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL) - time.sleep(1) - rc = plasma_process.poll() - if rc is not None: - raise RuntimeError("plasma_store exited unexpectedly with " - "code %d" % (rc,)) + plasma_process = start_plasma_store() ## # Setup config defaults for cameras @@ -135,7 +173,7 @@ def main(): } # Queue for cameras to push tracked objects to - tracked_objects_queue = mp.Queue() + tracked_objects_queue = mp.SimpleQueue() # Start the shared tflite process tflite_process = EdgeTPUProcess() @@ -146,11 +184,14 @@ def main(): camera_processes[name] = { 'fps': mp.Value('d', float(config['fps'])), 'skipped_fps': mp.Value('d', 0.0), - 'detection_fps': mp.Value('d', 0.0) + 'detection_fps': mp.Value('d', 0.0), + 'read_start': mp.Value('d', 0.0), + 'ffmpeg_pid': mp.Value('i', 0) } camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, - tflite_process.detection_queue, tracked_objects_queue, - camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps'])) + tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['fps'], + camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps'], + camera_processes[name]['read_start'], camera_processes[name]['ffmpeg_pid'])) camera_process.daemon = True camera_processes[name]['process'] = camera_process @@ -161,7 +202,7 @@ def main(): object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue) object_processor.start() - camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, object_processor) + camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, object_processor, plasma_process) camera_watchdog.start() # create a flask app that encodes frames a mjpeg on demand @@ -174,6 +215,23 @@ def main(): # return a healh return "Frigate is running. Alive and healthy!" + @app.route('/debug/stack') + def processor_stack(): + frame = sys._current_frames().get(object_processor.ident, None) + if frame: + return "
".join(traceback.format_stack(frame)), 200 + else: + return "no frame found", 200 + + @app.route('/debug/print_stack') + def print_stack(): + pid = int(request.args.get('pid', 0)) + if pid == 0: + return "missing pid", 200 + else: + os.kill(pid, signal.SIGUSR1) + return "check logs", 200 + @app.route('/debug/stats') def stats(): stats = {} @@ -185,21 +243,22 @@ def main(): stats[name] = { 'fps': round(camera_stats['fps'].value, 2), 'skipped_fps': round(camera_stats['skipped_fps'].value, 2), - 'detection_fps': round(camera_stats['detection_fps'].value, 2) + 'detection_fps': round(camera_stats['detection_fps'].value, 2), + 'read_start': camera_stats['read_start'].value, + 'pid': camera_stats['process'].pid, + 'ffmpeg_pid': camera_stats['ffmpeg_pid'].value } stats['coral'] = { 'fps': round(total_detection_fps, 2), 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2), - 'detection_queue': tflite_process.detection_queue.qsize(), - 'detection_start': tflite_process.detection_start.value + 'detection_start': tflite_process.detection_start.value, + 'pid': tflite_process.detect_process.pid } - rc = plasma_process.poll() + rc = camera_watchdog.plasma_process.poll() stats['plasma_store_rc'] = rc - stats['tracked_objects_queue'] = tracked_objects_queue.qsize() - return jsonify(stats) @app.route('//