diff --git a/detect_objects.py b/detect_objects.py index 16d706e84..a0dd52e3b 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -31,7 +31,7 @@ FFMPEG_CONFIG = CONFIG.get('ffmpeg', {}) FFMPEG_DEFAULT_CONFIG = { 'global_args': FFMPEG_CONFIG.get('global_args', ['-hide_banner','-loglevel','panic']), - 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args', + 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args', []), 'input_args': FFMPEG_CONFIG.get('input_args', ['-avoid_negative_ts', 'make_zero', @@ -68,6 +68,11 @@ class CameraWatchdog(threading.Thread): # wait a bit before checking time.sleep(30) + if (self.tflite_process.detection_start.value > 0.0 and + datetime.datetime.now().timestamp() - self.tflite_process.detection_start.value > 10): + print("Detection appears to be stuck. Restarting detection process") + time.sleep(30) + for name, camera_process in self.camera_processes.items(): process = camera_process['process'] if not process.is_alive(): @@ -75,9 +80,8 @@ class CameraWatchdog(threading.Thread): camera_process['fps'].value = float(self.config[name]['fps']) camera_process['skipped_fps'].value = 0.0 camera_process['detection_fps'].value = 0.0 - self.object_processor.camera_data[name]['current_frame_time'] = None process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, - self.tflite_process.detect_lock, self.tflite_process.detect_ready, self.tflite_process.frame_ready, self.tracked_objects_queue, + self.tflite_process.detection_queue, self.tracked_objects_queue, camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps'])) process.daemon = True camera_process['process'] = process @@ -139,7 +143,7 @@ def main(): 'detection_fps': mp.Value('d', 0.0) } camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, - tflite_process.detect_lock, tflite_process.detect_ready, tflite_process.frame_ready, tracked_objects_queue, + tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps'])) camera_process.daemon = True camera_processes[name]['process'] = camera_process @@ -173,14 +177,16 @@ def main(): for name, camera_stats in camera_processes.items(): total_detection_fps += camera_stats['detection_fps'].value stats[name] = { - 'fps': camera_stats['fps'].value, - 'skipped_fps': camera_stats['skipped_fps'].value, - 'detection_fps': camera_stats['detection_fps'].value + 'fps': round(camera_stats['fps'].value, 2), + 'skipped_fps': round(camera_stats['skipped_fps'].value, 2), + 'detection_fps': round(camera_stats['detection_fps'].value, 2) } stats['coral'] = { - 'fps': total_detection_fps, - 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2) + 'fps': round(total_detection_fps, 2), + 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2), + 'detection_queue': tflite_process.detection_queue.qsize(), + 'detection_start': tflite_process.detection_start.value } rc = plasma_process.poll() diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py index 5471f7b51..846522d50 100644 --- a/frigate/edgetpu.py +++ b/frigate/edgetpu.py @@ -1,8 +1,10 @@ import os import datetime +import hashlib import multiprocessing as mp import numpy as np import SharedArray as sa +import pyarrow.plasma as plasma import tflite_runtime.interpreter as tflite from tflite_runtime.interpreter import load_delegate from frigate.util import EventsPerSecond @@ -60,77 +62,75 @@ class ObjectDetector(): return detections +def run_detector(detection_queue, avg_speed, start): + print(f"Starting detection process: {os.getpid()}") + plasma_client = plasma.connect("/tmp/plasma") + object_detector = ObjectDetector() + + while True: + object_id_str = detection_queue.get() + object_id_hash = hashlib.sha1(str.encode(object_id_str)) + object_id = plasma.ObjectID(object_id_hash.digest()) + input_frame = plasma_client.get(object_id, timeout_ms=0) + + start.value = datetime.datetime.now().timestamp() + + # detect and put the output in the plasma store + object_id_out = hashlib.sha1(str.encode(f"out-{object_id_str}")).digest() + plasma_client.put(object_detector.detect_raw(input_frame), plasma.ObjectID(object_id_out)) + + duration = datetime.datetime.now().timestamp()-start.value + start.value = 0.0 + avg_speed.value = (avg_speed.value*9 + duration)/10 + class EdgeTPUProcess(): def __init__(self): - # TODO: see if we can use the plasma store with a queue and maintain the same speeds - try: - sa.delete("frame") - except: - pass - try: - sa.delete("detections") - except: - pass - - self.input_frame = sa.create("frame", shape=(1,300,300,3), dtype=np.uint8) - self.detections = sa.create("detections", shape=(20,6), dtype=np.float32) - - self.detect_lock = mp.Lock() - self.detect_ready = mp.Event() - self.frame_ready = mp.Event() + self.detection_queue = mp.Queue() self.avg_inference_speed = mp.Value('d', 0.01) + self.detection_start = mp.Value('d', 0.0) + self.detect_process = None + self.start_or_restart() - def run_detector(detect_ready, frame_ready, avg_speed): - print(f"Starting detection process: {os.getpid()}") - object_detector = ObjectDetector() - input_frame = sa.attach("frame") - detections = sa.attach("detections") - - while True: - # wait until a frame is ready - frame_ready.wait() - start = datetime.datetime.now().timestamp() - # signal that the process is busy - frame_ready.clear() - detections[:] = object_detector.detect_raw(input_frame) - # signal that the process is ready to detect - detect_ready.set() - duration = datetime.datetime.now().timestamp()-start - avg_speed.value = (avg_speed.value*9 + duration)/10 - - self.detect_process = mp.Process(target=run_detector, args=(self.detect_ready, self.frame_ready, self.avg_inference_speed)) + def start_or_restart(self): + self.detection_start.value = 0.0 + if (not self.detect_process is None) and self.detect_process.is_alive(): + self.detect_process.terminate() + print("Waiting for detection process to exit gracefully...") + self.detect_process.join(timeout=30) + if self.detect_process.exitcode is None: + print("Detection process didnt exit. Force killing...") + self.detect_process.kill() + self.detect_process.join() + self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.avg_inference_speed, self.detection_start)) self.detect_process.daemon = True self.detect_process.start() class RemoteObjectDetector(): - def __init__(self, labels, detect_lock, detect_ready, frame_ready): + def __init__(self, name, labels, detection_queue): self.labels = load_labels(labels) - - self.input_frame = sa.attach("frame") - self.detections = sa.attach("detections") - + self.name = name self.fps = EventsPerSecond() - - self.detect_lock = detect_lock - self.detect_ready = detect_ready - self.frame_ready = frame_ready + self.plasma_client = plasma.connect("/tmp/plasma") + self.detection_queue = detection_queue def detect(self, tensor_input, threshold=.4): detections = [] - with self.detect_lock: - self.input_frame[:] = tensor_input - # unset detections and signal that a frame is ready - self.detect_ready.clear() - self.frame_ready.set() - # wait until the detection process is finished, - self.detect_ready.wait() - for d in self.detections: - if d[1] < threshold: - break - detections.append(( - self.labels[int(d[0])], - float(d[1]), - (d[2], d[3], d[4], d[5]) - )) + + now = f"{self.name}-{str(datetime.datetime.now().timestamp())}" + object_id_frame = plasma.ObjectID(hashlib.sha1(str.encode(now)).digest()) + object_id_detections = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{now}")).digest()) + self.plasma_client.put(tensor_input, object_id_frame) + self.detection_queue.put(now) + raw_detections = self.plasma_client.get(object_id_detections) + + for d in raw_detections: + if d[1] < threshold: + break + detections.append(( + self.labels[int(d[0])], + float(d[1]), + (d[2], d[3], d[4], d[5]) + )) + self.plasma_client.delete([object_id_frame, object_id_detections]) self.fps.update() return detections \ No newline at end of file diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 06d704f10..c9f901fc9 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -34,7 +34,6 @@ class TrackedObjectProcessor(threading.Thread): 'best_objects': {}, 'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')), 'tracked_objects': {}, - 'current_frame_time': None, 'current_frame': np.zeros((720,1280,3), np.uint8), 'object_id': None }) @@ -47,9 +46,6 @@ class TrackedObjectProcessor(threading.Thread): def get_current_frame(self, camera): return self.camera_data[camera]['current_frame'] - - def get_current_frame_time(self, camera): - return self.camera_data[camera]['current_frame_time'] def run(self): while True: @@ -93,7 +89,6 @@ class TrackedObjectProcessor(threading.Thread): # Set the current frame as ready ### self.camera_data[camera]['current_frame'] = current_frame - self.camera_data[camera]['current_frame_time'] = frame_time # store the object id, so you can delete it at the next loop previous_object_id = self.camera_data[camera]['object_id'] diff --git a/frigate/video.py b/frigate/video.py index 466654263..1e6c98249 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -114,7 +114,7 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None): print(" ".join(ffmpeg_cmd)) return sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size*10) -def track_camera(name, config, ffmpeg_global_config, global_objects_config, detect_lock, detect_ready, frame_ready, detected_objects_queue, fps, skipped_fps, detection_fps): +def track_camera(name, config, ffmpeg_global_config, global_objects_config, detection_queue, detected_objects_queue, fps, skipped_fps, detection_fps): print(f"Starting process for {name}: {os.getpid()}") # Merge the ffmpeg config with the global config @@ -172,7 +172,7 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete mask[:] = 255 motion_detector = MotionDetector(frame_shape, mask, resize_factor=6) - object_detector = RemoteObjectDetector('/labelmap.txt', detect_lock, detect_ready, frame_ready) + object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue) object_tracker = ObjectTracker(10) @@ -196,8 +196,8 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete rc = ffmpeg_process.poll() if rc is not None: print(f"{name}: ffmpeg_process exited unexpectedly with {rc}") - time.sleep(10) ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process) + time.sleep(10) else: print(f"{name}: ffmpeg_process is still running but didnt return any bytes") continue