diff --git a/benchmark.py b/benchmark.py index d669ec97c..f8d432d83 100755 --- a/benchmark.py +++ b/benchmark.py @@ -11,7 +11,7 @@ labels = load_labels('/labelmap.txt') ###### # Minimal same process runner ###### -# object_detector = ObjectDetector() +# object_detector = LocalObjectDetector() # tensor_input = np.expand_dims(np.full((300,300,3), 0, np.uint8), axis=0) # start = datetime.datetime.now().timestamp() @@ -40,8 +40,8 @@ labels = load_labels('/labelmap.txt') ###### # Separate process runner ###### -def start(id, num_detections, detection_queue): - object_detector = RemoteObjectDetector(str(id), '/labelmap.txt', detection_queue) +def start(id, num_detections, detection_queue, event): + object_detector = RemoteObjectDetector(str(id), '/labelmap.txt', detection_queue, event) start = datetime.datetime.now().timestamp() frame_times = [] @@ -54,26 +54,35 @@ def start(id, num_detections, detection_queue): print(f"{id} - Processed for {duration:.2f} seconds.") print(f"{id} - Average frame processing time: {mean(frame_times)*1000:.2f}ms") -edgetpu_process = EdgeTPUProcess() +event = mp.Event() +edgetpu_process = EdgeTPUProcess({'1': event}) -# start(1, 1000, edgetpu_process.detect_lock, edgetpu_process.detect_ready, edgetpu_process.frame_ready) +start(1, 1000, edgetpu_process.detection_queue, event) +print(f"Average raw inference speed: {edgetpu_process.avg_inference_speed.value*1000:.2f}ms") #### # Multiple camera processes #### -camera_processes = [] -for x in range(0, 10): - camera_process = mp.Process(target=start, args=(x, 100, edgetpu_process.detection_queue)) - camera_process.daemon = True - camera_processes.append(camera_process) +# camera_processes = [] -start = datetime.datetime.now().timestamp() +# pipes = {} +# for x in range(0, 10): +# pipes[x] = mp.Pipe(duplex=False) -for p in camera_processes: - p.start() +# edgetpu_process = EdgeTPUProcess({str(key): value[1] for (key, value) in pipes.items()}) -for p in camera_processes: - p.join() +# for x in range(0, 10): +# camera_process = mp.Process(target=start, args=(x, 100, edgetpu_process.detection_queue, pipes[x][0])) +# camera_process.daemon = True +# camera_processes.append(camera_process) -duration = datetime.datetime.now().timestamp()-start -print(f"Total - Processed for {duration:.2f} seconds.") \ No newline at end of file +# start = datetime.datetime.now().timestamp() + +# for p in camera_processes: +# p.start() + +# for p in camera_processes: +# p.join() + +# duration = datetime.datetime.now().timestamp()-start +# print(f"Total - Processed for {duration:.2f} seconds.") \ No newline at end of file diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py index 52ef4be80..b95cd05b7 100644 --- a/frigate/edgetpu.py +++ b/frigate/edgetpu.py @@ -102,12 +102,21 @@ class LocalObjectDetector(ObjectDetector): return detections -def run_detector(detection_queue, result_connections: Dict[str, Connection], avg_speed, start, tf_device): +def run_detector(detection_queue, out_events: Dict[str, mp.Event], avg_speed, start, tf_device): print(f"Starting detection process: {os.getpid()}") listen() frame_manager = SharedMemoryFrameManager() object_detector = LocalObjectDetector(tf_device=tf_device) + outputs = {} + for name in out_events.keys(): + out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False) + out_np = np.ndarray((20,6), dtype=np.float32, buffer=out_shm.buf) + outputs[name] = { + 'shm': out_shm, + 'np': out_np + } + while True: connection_id = detection_queue.get() input_frame = frame_manager.get(connection_id, (1,300,300,3)) @@ -115,20 +124,21 @@ def run_detector(detection_queue, result_connections: Dict[str, Connection], avg if input_frame is None: continue - # detect and put the output in the plasma store + # detect and send the output start.value = datetime.datetime.now().timestamp() # TODO: what is the overhead for pickling this result vs writing back to shared memory? # I could try using an Event() and waiting in the other process before looking in memory... detections = object_detector.detect_raw(input_frame) - result_connections[connection_id].send(detections) duration = datetime.datetime.now().timestamp()-start.value + outputs[connection_id]['np'][:] = detections[:] + out_events[connection_id].set() start.value = 0.0 avg_speed.value = (avg_speed.value*9 + duration)/10 class EdgeTPUProcess(): - def __init__(self, result_connections, tf_device=None): - self.result_connections = result_connections + def __init__(self, out_events, tf_device=None): + self.out_events = out_events self.detection_queue = mp.Queue() self.avg_inference_speed = mp.Value('d', 0.01) self.detection_start = mp.Value('d', 0.0) @@ -149,19 +159,21 @@ class EdgeTPUProcess(): self.detection_start.value = 0.0 if (not self.detect_process is None) and self.detect_process.is_alive(): self.stop() - self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.result_connections, self.avg_inference_speed, self.detection_start, self.tf_device)) + self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.out_events, self.avg_inference_speed, self.detection_start, self.tf_device)) self.detect_process.daemon = True self.detect_process.start() class RemoteObjectDetector(): - def __init__(self, name, labels, detection_queue, result_connection: Connection): + def __init__(self, name, labels, detection_queue, event): self.labels = load_labels(labels) self.name = name self.fps = EventsPerSecond() self.detection_queue = detection_queue - self.result_connection = result_connection + self.event = event self.shm = mp.shared_memory.SharedMemory(name=self.name, create=True, size=300*300*3) self.np_shm = np.ndarray((1,300,300,3), dtype=np.uint8, buffer=self.shm.buf) + self.out_shm = mp.shared_memory.SharedMemory(name=f"out-{self.name}", create=True, size=20*6*4) + self.out_np_shm = np.ndarray((20,6), dtype=np.float32, buffer=self.out_shm.buf) def detect(self, tensor_input, threshold=.4): detections = [] @@ -169,13 +181,16 @@ class RemoteObjectDetector(): # copy input to shared memory # TODO: what if I just write it there in the first place? self.np_shm[:] = tensor_input[:] + self.event.clear() self.detection_queue.put(self.name) - if self.result_connection.poll(10): - raw_detections = self.result_connection.recv() - else: - return detections + self.event.wait() + + # if self.result_connection.poll(10): + # raw_detections = self.result_connection.recv() + # else: + # return detections - for d in raw_detections: + for d in self.out_np_shm: if d[1] < threshold: break detections.append((