mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	switch to a thread for object detection
This commit is contained in:
		
							parent
							
								
									a074945394
								
							
						
					
					
						commit
						0514eeac03
					
				@ -75,22 +75,12 @@ def main():
 | 
				
			|||||||
    frame_lock = mp.Lock()
 | 
					    frame_lock = mp.Lock()
 | 
				
			||||||
    # Condition for notifying that a new frame is ready
 | 
					    # Condition for notifying that a new frame is ready
 | 
				
			||||||
    frame_ready = mp.Condition()
 | 
					    frame_ready = mp.Condition()
 | 
				
			||||||
    # Shared memory array for passing prepped frame to tensorflow
 | 
					 | 
				
			||||||
    prepped_frame_array = mp.Array(ctypes.c_uint8, 300*300*3)
 | 
					 | 
				
			||||||
    # create shared value for storing the frame_time
 | 
					 | 
				
			||||||
    prepped_frame_time = mp.Value('d', 0.0)
 | 
					 | 
				
			||||||
    # Event for notifying that object detection needs a new frame
 | 
					 | 
				
			||||||
    prepped_frame_grabbed = mp.Event()
 | 
					 | 
				
			||||||
    # Event for notifying that new frame is ready for detection
 | 
					 | 
				
			||||||
    prepped_frame_ready = mp.Event()
 | 
					 | 
				
			||||||
    # Condition for notifying that objects were parsed
 | 
					    # Condition for notifying that objects were parsed
 | 
				
			||||||
    objects_parsed = mp.Condition()
 | 
					    objects_parsed = mp.Condition()
 | 
				
			||||||
    # Queue for detected objects
 | 
					    # Queue for detected objects
 | 
				
			||||||
    object_queue = mp.Queue()
 | 
					    object_queue = queue.Queue()
 | 
				
			||||||
    # Queue for prepped frames
 | 
					    # Queue for prepped frames
 | 
				
			||||||
    prepped_frame_queue = queue.Queue(len(regions)*2)
 | 
					    prepped_frame_queue = queue.Queue(len(regions)*2)
 | 
				
			||||||
    # Array for passing original region box to compute object bounding box
 | 
					 | 
				
			||||||
    prepped_frame_box = mp.Array(ctypes.c_uint16, 3)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # shape current frame so it can be treated as an image
 | 
					    # shape current frame so it can be treated as an image
 | 
				
			||||||
    frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
 | 
					    frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
 | 
				
			||||||
@ -113,28 +103,11 @@ def main():
 | 
				
			|||||||
        ))
 | 
					        ))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    prepped_queue_processor = PreppedQueueProcessor(
 | 
					    prepped_queue_processor = PreppedQueueProcessor(
 | 
				
			||||||
        prepped_frame_array,
 | 
					        prepped_frame_queue,
 | 
				
			||||||
        prepped_frame_time,
 | 
					        object_queue
 | 
				
			||||||
        prepped_frame_ready,
 | 
					 | 
				
			||||||
        prepped_frame_grabbed,
 | 
					 | 
				
			||||||
        prepped_frame_box,
 | 
					 | 
				
			||||||
        prepped_frame_queue
 | 
					 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    prepped_queue_processor.start()
 | 
					    prepped_queue_processor.start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # create a process for object detection
 | 
					 | 
				
			||||||
    # if the coprocessor is doing the work, can this run as a thread
 | 
					 | 
				
			||||||
    # since it is waiting for IO?
 | 
					 | 
				
			||||||
    detection_process = mp.Process(target=detect_objects, args=(
 | 
					 | 
				
			||||||
        prepped_frame_array,
 | 
					 | 
				
			||||||
        prepped_frame_time,
 | 
					 | 
				
			||||||
        prepped_frame_ready,
 | 
					 | 
				
			||||||
        prepped_frame_grabbed,
 | 
					 | 
				
			||||||
        prepped_frame_box,
 | 
					 | 
				
			||||||
        object_queue, DEBUG
 | 
					 | 
				
			||||||
    ))
 | 
					 | 
				
			||||||
    detection_process.daemon = True
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # start a thread to store recent motion frames for processing
 | 
					    # start a thread to store recent motion frames for processing
 | 
				
			||||||
    frame_tracker = FrameTracker(frame_arr, shared_frame_time, frame_ready, frame_lock, 
 | 
					    frame_tracker = FrameTracker(frame_arr, shared_frame_time, frame_ready, frame_lock, 
 | 
				
			||||||
        recent_frames)
 | 
					        recent_frames)
 | 
				
			||||||
@ -177,9 +150,6 @@ def main():
 | 
				
			|||||||
    for detection_prep_thread in detection_prep_threads:
 | 
					    for detection_prep_thread in detection_prep_threads:
 | 
				
			||||||
        detection_prep_thread.start()
 | 
					        detection_prep_thread.start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    detection_process.start()
 | 
					 | 
				
			||||||
    print("detection_process pid ", detection_process.pid)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # create a flask app that encodes frames a mjpeg on demand
 | 
					    # create a flask app that encodes frames a mjpeg on demand
 | 
				
			||||||
    app = Flask(__name__)
 | 
					    app = Flask(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -237,7 +207,6 @@ def main():
 | 
				
			|||||||
    capture_process.join()
 | 
					    capture_process.join()
 | 
				
			||||||
    for detection_prep_thread in detection_prep_threads:
 | 
					    for detection_prep_thread in detection_prep_threads:
 | 
				
			||||||
        detection_prep_thread.join()
 | 
					        detection_prep_thread.join()
 | 
				
			||||||
    detection_process.join()
 | 
					 | 
				
			||||||
    frame_tracker.join()
 | 
					    frame_tracker.join()
 | 
				
			||||||
    best_person_frame.join()
 | 
					    best_person_frame.join()
 | 
				
			||||||
    object_parser.join()
 | 
					    object_parser.join()
 | 
				
			||||||
 | 
				
			|||||||
@ -21,30 +21,24 @@ def ReadLabelFile(file_path):
 | 
				
			|||||||
        ret[int(pair[0])] = pair[1].strip()
 | 
					        ret[int(pair[0])] = pair[1].strip()
 | 
				
			||||||
    return ret
 | 
					    return ret
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def detect_objects(prepped_frame_array, prepped_frame_time,
 | 
					class PreppedQueueProcessor(threading.Thread):
 | 
				
			||||||
                   prepped_frame_ready, prepped_frame_grabbed,
 | 
					    def __init__(self, prepped_frame_queue, object_queue):
 | 
				
			||||||
                   prepped_frame_box, object_queue, debug):
 | 
					
 | 
				
			||||||
    prepped_frame_np = tonumpyarray(prepped_frame_array)
 | 
					        threading.Thread.__init__(self)
 | 
				
			||||||
 | 
					        self.prepped_frame_queue = prepped_frame_queue
 | 
				
			||||||
 | 
					        self.object_queue = object_queue
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        # Load the edgetpu engine and labels
 | 
					        # Load the edgetpu engine and labels
 | 
				
			||||||
    engine = DetectionEngine(PATH_TO_CKPT)
 | 
					        self.engine = DetectionEngine(PATH_TO_CKPT)
 | 
				
			||||||
    labels = ReadLabelFile(PATH_TO_LABELS)
 | 
					        self.labels = ReadLabelFile(PATH_TO_LABELS)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    frame_time = 0.0
 | 
					    def run(self):
 | 
				
			||||||
    region_box = [0,0,0]
 | 
					        # process queue...
 | 
				
			||||||
        while True:
 | 
					        while True:
 | 
				
			||||||
        # wait until a frame is ready
 | 
					            frame = self.prepped_frame_queue.get()
 | 
				
			||||||
        prepped_frame_ready.wait()
 | 
					            # print(self.prepped_frame_queue.qsize())
 | 
				
			||||||
 | 
					 | 
				
			||||||
        prepped_frame_copy = prepped_frame_np.copy()
 | 
					 | 
				
			||||||
        frame_time = prepped_frame_time.value
 | 
					 | 
				
			||||||
        region_box[:] = prepped_frame_box
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        prepped_frame_grabbed.set()
 | 
					 | 
				
			||||||
        # print("Grabbed " + str(region_box[1]) + "," + str(region_box[2]))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # Actual detection.
 | 
					            # Actual detection.
 | 
				
			||||||
        objects = engine.DetectWithInputTensor(prepped_frame_copy, threshold=0.5, top_k=3)
 | 
					            objects = self.engine.DetectWithInputTensor(frame['frame'], threshold=0.5, top_k=3)
 | 
				
			||||||
            # time.sleep(0.1)
 | 
					            # time.sleep(0.1)
 | 
				
			||||||
            # objects = []
 | 
					            # objects = []
 | 
				
			||||||
            # print(engine.get_inference_time())
 | 
					            # print(engine.get_inference_time())
 | 
				
			||||||
@ -52,58 +46,15 @@ def detect_objects(prepped_frame_array, prepped_frame_time,
 | 
				
			|||||||
            if objects:
 | 
					            if objects:
 | 
				
			||||||
                for obj in objects:
 | 
					                for obj in objects:
 | 
				
			||||||
                    box = obj.bounding_box.flatten().tolist()
 | 
					                    box = obj.bounding_box.flatten().tolist()
 | 
				
			||||||
                object_queue.put({
 | 
					                    self.object_queue.put({
 | 
				
			||||||
                            'frame_time': frame_time,
 | 
					                                'frame_time': frame['frame_time'],
 | 
				
			||||||
                            'name': str(labels[obj.label_id]),
 | 
					                                'name': str(self.labels[obj.label_id]),
 | 
				
			||||||
                                'score': float(obj.score),
 | 
					                                'score': float(obj.score),
 | 
				
			||||||
                            'xmin': int((box[0] * region_box[0]) + region_box[1]),
 | 
					                                'xmin': int((box[0] * frame['region_size']) + frame['region_x_offset']),
 | 
				
			||||||
                            'ymin': int((box[1] * region_box[0]) + region_box[2]),
 | 
					                                'ymin': int((box[1] * frame['region_size']) + frame['region_y_offset']),
 | 
				
			||||||
                            'xmax': int((box[2] * region_box[0]) + region_box[1]),
 | 
					                                'xmax': int((box[2] * frame['region_size']) + frame['region_x_offset']),
 | 
				
			||||||
                            'ymax': int((box[3] * region_box[0]) + region_box[2])
 | 
					                                'ymax': int((box[3] * frame['region_size']) + frame['region_y_offset'])
 | 
				
			||||||
                            })
 | 
					                            })
 | 
				
			||||||
        # else:
 | 
					 | 
				
			||||||
        #     object_queue.put({
 | 
					 | 
				
			||||||
        #                     'frame_time': frame_time,
 | 
					 | 
				
			||||||
        #                     'name': 'dummy',
 | 
					 | 
				
			||||||
        #                     'score': 0.99,
 | 
					 | 
				
			||||||
        #                     'xmin': int(0 + region_box[1]),
 | 
					 | 
				
			||||||
        #                     'ymin': int(0 + region_box[2]),
 | 
					 | 
				
			||||||
        #                     'xmax': int(10 + region_box[1]),
 | 
					 | 
				
			||||||
        #                     'ymax': int(10 + region_box[2])
 | 
					 | 
				
			||||||
        #                 })
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class PreppedQueueProcessor(threading.Thread):
 | 
					 | 
				
			||||||
    def __init__(self, prepped_frame_array,
 | 
					 | 
				
			||||||
        prepped_frame_time,
 | 
					 | 
				
			||||||
        prepped_frame_ready,
 | 
					 | 
				
			||||||
        prepped_frame_grabbed,
 | 
					 | 
				
			||||||
        prepped_frame_box,
 | 
					 | 
				
			||||||
        prepped_frame_queue):
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        threading.Thread.__init__(self)
 | 
					 | 
				
			||||||
        self.prepped_frame_array = prepped_frame_array
 | 
					 | 
				
			||||||
        self.prepped_frame_time = prepped_frame_time
 | 
					 | 
				
			||||||
        self.prepped_frame_ready = prepped_frame_ready
 | 
					 | 
				
			||||||
        self.prepped_frame_grabbed = prepped_frame_grabbed
 | 
					 | 
				
			||||||
        self.prepped_frame_box = prepped_frame_box
 | 
					 | 
				
			||||||
        self.prepped_frame_queue = prepped_frame_queue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def run(self):
 | 
					 | 
				
			||||||
        prepped_frame_np = tonumpyarray(self.prepped_frame_array)
 | 
					 | 
				
			||||||
        # process queue...
 | 
					 | 
				
			||||||
        while True:
 | 
					 | 
				
			||||||
            frame = self.prepped_frame_queue.get()
 | 
					 | 
				
			||||||
            # print(self.prepped_frame_queue.qsize())
 | 
					 | 
				
			||||||
            prepped_frame_np[:] = frame['frame']
 | 
					 | 
				
			||||||
            self.prepped_frame_time.value = frame['frame_time']
 | 
					 | 
				
			||||||
            self.prepped_frame_box[0] = frame['region_size']
 | 
					 | 
				
			||||||
            self.prepped_frame_box[1] = frame['region_x_offset']
 | 
					 | 
				
			||||||
            self.prepped_frame_box[2] = frame['region_y_offset']
 | 
					 | 
				
			||||||
            # print("Passed " + str(frame['region_x_offset']) + "," + str(frame['region_x_offset']))
 | 
					 | 
				
			||||||
            self.prepped_frame_ready.set()
 | 
					 | 
				
			||||||
            self.prepped_frame_grabbed.wait()
 | 
					 | 
				
			||||||
            self.prepped_frame_grabbed.clear()
 | 
					 | 
				
			||||||
            self.prepped_frame_ready.clear()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# should this be a region class?
 | 
					# should this be a region class?
 | 
				
			||||||
@ -156,5 +107,5 @@ class FramePrepper(threading.Thread):
 | 
				
			|||||||
                    'region_x_offset': self.region_x_offset,
 | 
					                    'region_x_offset': self.region_x_offset,
 | 
				
			||||||
                    'region_y_offset': self.region_y_offset
 | 
					                    'region_y_offset': self.region_y_offset
 | 
				
			||||||
                })
 | 
					                })
 | 
				
			||||||
            # else:
 | 
					            else:
 | 
				
			||||||
            #     print("queue full. moving on")
 | 
					                print("queue full. moving on")
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user