improve detection processing and restart when stuck

This commit is contained in:
Blake Blackshear 2020-03-01 07:16:49 -06:00
parent d8aa73d26e
commit a5bef89123
4 changed files with 77 additions and 76 deletions

View File

@ -31,7 +31,7 @@ FFMPEG_CONFIG = CONFIG.get('ffmpeg', {})
FFMPEG_DEFAULT_CONFIG = { FFMPEG_DEFAULT_CONFIG = {
'global_args': FFMPEG_CONFIG.get('global_args', 'global_args': FFMPEG_CONFIG.get('global_args',
['-hide_banner','-loglevel','panic']), ['-hide_banner','-loglevel','panic']),
'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args', 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args',
[]), []),
'input_args': FFMPEG_CONFIG.get('input_args', 'input_args': FFMPEG_CONFIG.get('input_args',
['-avoid_negative_ts', 'make_zero', ['-avoid_negative_ts', 'make_zero',
@ -68,6 +68,11 @@ class CameraWatchdog(threading.Thread):
# wait a bit before checking # wait a bit before checking
time.sleep(30) time.sleep(30)
if (self.tflite_process.detection_start.value > 0.0 and
datetime.datetime.now().timestamp() - self.tflite_process.detection_start.value > 10):
print("Detection appears to be stuck. Restarting detection process")
time.sleep(30)
for name, camera_process in self.camera_processes.items(): for name, camera_process in self.camera_processes.items():
process = camera_process['process'] process = camera_process['process']
if not process.is_alive(): if not process.is_alive():
@ -75,9 +80,8 @@ class CameraWatchdog(threading.Thread):
camera_process['fps'].value = float(self.config[name]['fps']) camera_process['fps'].value = float(self.config[name]['fps'])
camera_process['skipped_fps'].value = 0.0 camera_process['skipped_fps'].value = 0.0
camera_process['detection_fps'].value = 0.0 camera_process['detection_fps'].value = 0.0
self.object_processor.camera_data[name]['current_frame_time'] = None
process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
self.tflite_process.detect_lock, self.tflite_process.detect_ready, self.tflite_process.frame_ready, self.tracked_objects_queue, self.tflite_process.detection_queue, self.tracked_objects_queue,
camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps'])) camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps']))
process.daemon = True process.daemon = True
camera_process['process'] = process camera_process['process'] = process
@ -139,7 +143,7 @@ def main():
'detection_fps': mp.Value('d', 0.0) 'detection_fps': mp.Value('d', 0.0)
} }
camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
tflite_process.detect_lock, tflite_process.detect_ready, tflite_process.frame_ready, tracked_objects_queue, tflite_process.detection_queue, tracked_objects_queue,
camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps'])) camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps']))
camera_process.daemon = True camera_process.daemon = True
camera_processes[name]['process'] = camera_process camera_processes[name]['process'] = camera_process
@ -173,14 +177,16 @@ def main():
for name, camera_stats in camera_processes.items(): for name, camera_stats in camera_processes.items():
total_detection_fps += camera_stats['detection_fps'].value total_detection_fps += camera_stats['detection_fps'].value
stats[name] = { stats[name] = {
'fps': camera_stats['fps'].value, 'fps': round(camera_stats['fps'].value, 2),
'skipped_fps': camera_stats['skipped_fps'].value, 'skipped_fps': round(camera_stats['skipped_fps'].value, 2),
'detection_fps': camera_stats['detection_fps'].value 'detection_fps': round(camera_stats['detection_fps'].value, 2)
} }
stats['coral'] = { stats['coral'] = {
'fps': total_detection_fps, 'fps': round(total_detection_fps, 2),
'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2) 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2),
'detection_queue': tflite_process.detection_queue.qsize(),
'detection_start': tflite_process.detection_start.value
} }
rc = plasma_process.poll() rc = plasma_process.poll()

View File

@ -1,8 +1,10 @@
import os import os
import datetime import datetime
import hashlib
import multiprocessing as mp import multiprocessing as mp
import numpy as np import numpy as np
import SharedArray as sa import SharedArray as sa
import pyarrow.plasma as plasma
import tflite_runtime.interpreter as tflite import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate from tflite_runtime.interpreter import load_delegate
from frigate.util import EventsPerSecond from frigate.util import EventsPerSecond
@ -60,77 +62,75 @@ class ObjectDetector():
return detections return detections
def run_detector(detection_queue, avg_speed, start):
print(f"Starting detection process: {os.getpid()}")
plasma_client = plasma.connect("/tmp/plasma")
object_detector = ObjectDetector()
while True:
object_id_str = detection_queue.get()
object_id_hash = hashlib.sha1(str.encode(object_id_str))
object_id = plasma.ObjectID(object_id_hash.digest())
input_frame = plasma_client.get(object_id, timeout_ms=0)
start.value = datetime.datetime.now().timestamp()
# detect and put the output in the plasma store
object_id_out = hashlib.sha1(str.encode(f"out-{object_id_str}")).digest()
plasma_client.put(object_detector.detect_raw(input_frame), plasma.ObjectID(object_id_out))
duration = datetime.datetime.now().timestamp()-start.value
start.value = 0.0
avg_speed.value = (avg_speed.value*9 + duration)/10
class EdgeTPUProcess(): class EdgeTPUProcess():
def __init__(self): def __init__(self):
# TODO: see if we can use the plasma store with a queue and maintain the same speeds self.detection_queue = mp.Queue()
try:
sa.delete("frame")
except:
pass
try:
sa.delete("detections")
except:
pass
self.input_frame = sa.create("frame", shape=(1,300,300,3), dtype=np.uint8)
self.detections = sa.create("detections", shape=(20,6), dtype=np.float32)
self.detect_lock = mp.Lock()
self.detect_ready = mp.Event()
self.frame_ready = mp.Event()
self.avg_inference_speed = mp.Value('d', 0.01) self.avg_inference_speed = mp.Value('d', 0.01)
self.detection_start = mp.Value('d', 0.0)
self.detect_process = None
self.start_or_restart()
def run_detector(detect_ready, frame_ready, avg_speed): def start_or_restart(self):
print(f"Starting detection process: {os.getpid()}") self.detection_start.value = 0.0
object_detector = ObjectDetector() if (not self.detect_process is None) and self.detect_process.is_alive():
input_frame = sa.attach("frame") self.detect_process.terminate()
detections = sa.attach("detections") print("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
while True: if self.detect_process.exitcode is None:
# wait until a frame is ready print("Detection process didnt exit. Force killing...")
frame_ready.wait() self.detect_process.kill()
start = datetime.datetime.now().timestamp() self.detect_process.join()
# signal that the process is busy self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.avg_inference_speed, self.detection_start))
frame_ready.clear()
detections[:] = object_detector.detect_raw(input_frame)
# signal that the process is ready to detect
detect_ready.set()
duration = datetime.datetime.now().timestamp()-start
avg_speed.value = (avg_speed.value*9 + duration)/10
self.detect_process = mp.Process(target=run_detector, args=(self.detect_ready, self.frame_ready, self.avg_inference_speed))
self.detect_process.daemon = True self.detect_process.daemon = True
self.detect_process.start() self.detect_process.start()
class RemoteObjectDetector(): class RemoteObjectDetector():
def __init__(self, labels, detect_lock, detect_ready, frame_ready): def __init__(self, name, labels, detection_queue):
self.labels = load_labels(labels) self.labels = load_labels(labels)
self.name = name
self.input_frame = sa.attach("frame")
self.detections = sa.attach("detections")
self.fps = EventsPerSecond() self.fps = EventsPerSecond()
self.plasma_client = plasma.connect("/tmp/plasma")
self.detect_lock = detect_lock self.detection_queue = detection_queue
self.detect_ready = detect_ready
self.frame_ready = frame_ready
def detect(self, tensor_input, threshold=.4): def detect(self, tensor_input, threshold=.4):
detections = [] detections = []
with self.detect_lock:
self.input_frame[:] = tensor_input now = f"{self.name}-{str(datetime.datetime.now().timestamp())}"
# unset detections and signal that a frame is ready object_id_frame = plasma.ObjectID(hashlib.sha1(str.encode(now)).digest())
self.detect_ready.clear() object_id_detections = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{now}")).digest())
self.frame_ready.set() self.plasma_client.put(tensor_input, object_id_frame)
# wait until the detection process is finished, self.detection_queue.put(now)
self.detect_ready.wait() raw_detections = self.plasma_client.get(object_id_detections)
for d in self.detections:
if d[1] < threshold: for d in raw_detections:
break if d[1] < threshold:
detections.append(( break
self.labels[int(d[0])], detections.append((
float(d[1]), self.labels[int(d[0])],
(d[2], d[3], d[4], d[5]) float(d[1]),
)) (d[2], d[3], d[4], d[5])
))
self.plasma_client.delete([object_id_frame, object_id_detections])
self.fps.update() self.fps.update()
return detections return detections

View File

@ -34,7 +34,6 @@ class TrackedObjectProcessor(threading.Thread):
'best_objects': {}, 'best_objects': {},
'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')), 'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')),
'tracked_objects': {}, 'tracked_objects': {},
'current_frame_time': None,
'current_frame': np.zeros((720,1280,3), np.uint8), 'current_frame': np.zeros((720,1280,3), np.uint8),
'object_id': None 'object_id': None
}) })
@ -47,9 +46,6 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(self, camera): def get_current_frame(self, camera):
return self.camera_data[camera]['current_frame'] return self.camera_data[camera]['current_frame']
def get_current_frame_time(self, camera):
return self.camera_data[camera]['current_frame_time']
def run(self): def run(self):
while True: while True:
@ -93,7 +89,6 @@ class TrackedObjectProcessor(threading.Thread):
# Set the current frame as ready # Set the current frame as ready
### ###
self.camera_data[camera]['current_frame'] = current_frame self.camera_data[camera]['current_frame'] = current_frame
self.camera_data[camera]['current_frame_time'] = frame_time
# store the object id, so you can delete it at the next loop # store the object id, so you can delete it at the next loop
previous_object_id = self.camera_data[camera]['object_id'] previous_object_id = self.camera_data[camera]['object_id']

View File

@ -114,7 +114,7 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
print(" ".join(ffmpeg_cmd)) print(" ".join(ffmpeg_cmd))
return sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size*10) return sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size*10)
def track_camera(name, config, ffmpeg_global_config, global_objects_config, detect_lock, detect_ready, frame_ready, detected_objects_queue, fps, skipped_fps, detection_fps): def track_camera(name, config, ffmpeg_global_config, global_objects_config, detection_queue, detected_objects_queue, fps, skipped_fps, detection_fps):
print(f"Starting process for {name}: {os.getpid()}") print(f"Starting process for {name}: {os.getpid()}")
# Merge the ffmpeg config with the global config # Merge the ffmpeg config with the global config
@ -172,7 +172,7 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
mask[:] = 255 mask[:] = 255
motion_detector = MotionDetector(frame_shape, mask, resize_factor=6) motion_detector = MotionDetector(frame_shape, mask, resize_factor=6)
object_detector = RemoteObjectDetector('/labelmap.txt', detect_lock, detect_ready, frame_ready) object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue)
object_tracker = ObjectTracker(10) object_tracker = ObjectTracker(10)
@ -196,8 +196,8 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
rc = ffmpeg_process.poll() rc = ffmpeg_process.poll()
if rc is not None: if rc is not None:
print(f"{name}: ffmpeg_process exited unexpectedly with {rc}") print(f"{name}: ffmpeg_process exited unexpectedly with {rc}")
time.sleep(10)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process) ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process)
time.sleep(10)
else: else:
print(f"{name}: ffmpeg_process is still running but didnt return any bytes") print(f"{name}: ffmpeg_process is still running but didnt return any bytes")
continue continue