diff --git a/detect_objects.py b/detect_objects.py index df8c9db54..5518baa4d 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -81,51 +81,63 @@ def detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_o score = scores[0, index] if score > 0.5: box = boxes[0, index].tolist() - box[0] = (box[0] * region_size) + region_y_offset - box[1] = (box[1] * region_size) + region_x_offset - box[2] = (box[2] * region_size) + region_y_offset - box[3] = (box[3] * region_size) + region_x_offset - objects += [value, scores[0, index]] + box - # only get the first 10 objects - if len(objects) == 60: - break + objects.append({ + 'name': str(category_index.get(value).get('name')), + 'score': float(score), + 'ymin': int((box[0] * region_size) + region_y_offset), + 'xmin': int((box[1] * region_size) + region_x_offset), + 'ymax': int((box[2] * region_size) + region_y_offset), + 'xmax': int((box[3] * region_size) + region_x_offset) + }) return objects class ObjectParser(threading.Thread): - def __init__(self, objects_changed, objects_parsed, object_arrays): + def __init__(self, object_queue, objects_parsed): threading.Thread.__init__(self) - self._objects_changed = objects_changed + self._object_queue = object_queue self._objects_parsed = objects_parsed - self._object_arrays = object_arrays def run(self): global DETECTED_OBJECTS while True: - detected_objects = [] - # wait until object detection has run - # TODO: what if something else changed while I was processing??? - with self._objects_changed: - self._objects_changed.wait() + obj = self._object_queue.get() + print(obj) + DETECTED_OBJECTS.append(obj) - for object_array in self._object_arrays: - object_index = 0 - while(object_index < 60 and object_array[object_index] > 0): - object_class = object_array[object_index] - detected_objects.append({ - 'name': str(category_index.get(object_class).get('name')), - 'score': object_array[object_index+1], - 'ymin': int(object_array[object_index+2]), - 'xmin': int(object_array[object_index+3]), - 'ymax': int(object_array[object_index+4]), - 'xmax': int(object_array[object_index+5]) - }) - object_index += 6 - DETECTED_OBJECTS = detected_objects # notify that objects were parsed with self._objects_parsed: self._objects_parsed.notify_all() +class ObjectCleaner(threading.Thread): + def __init__(self, objects_parsed): + threading.Thread.__init__(self) + self._objects_parsed = objects_parsed + + def run(self): + global DETECTED_OBJECTS + while True: + + # expire the objects that are more than 1 second old + now = datetime.datetime.now().timestamp() + # look for the first object found within the last second + # (newest objects are appended to the end) + detected_objects = DETECTED_OBJECTS.copy() + num_to_delete = 0 + for obj in detected_objects: + if now-obj['frame_time']<1: + break + num_to_delete += 1 + if num_to_delete > 0: + del DETECTED_OBJECTS[:num_to_delete] + + # notify that parsed objects were changed + with self._objects_parsed: + self._objects_parsed.notify_all() + + # wait a bit before checking for more expired frames + time.sleep(0.2) + class MqttMotionPublisher(threading.Thread): def __init__(self, client, topic_prefix, motion_changed, motion_flags): threading.Thread.__init__(self) @@ -165,20 +177,17 @@ class MqttObjectPublisher(threading.Thread): # initialize the payload payload = {} - for obj in self.object_classes: - payload[obj] = [] # wait until objects have been parsed with self.objects_parsed: self.objects_parsed.wait() - # loop over detected objects and populate - # the payload + # add all the person scores in detected objects and + # average over past 1 seconds (5fps) detected_objects = DETECTED_OBJECTS.copy() - for obj in detected_objects: - if obj['name'] in self.object_classes: - payload[obj['name']].append(obj) - + avg_person_score = sum([obj['score'] for obj in detected_objects if obj['name'] == 'person'])/5 + payload['person'] = int(avg_person_score*100) + # send message for objects if different new_payload = json.dumps(payload, sort_keys=True) if new_payload != last_sent_payload: @@ -229,10 +238,10 @@ def main(): frame_ready = mp.Condition() # Condition for notifying that motion status changed globally motion_changed = mp.Condition() - # Condition for notifying that object detection ran - objects_changed = mp.Condition() # Condition for notifying that objects were parsed objects_parsed = mp.Condition() + # Queue for detected objects + object_queue = mp.Queue() # shape current frame so it can be treated as an image frame_arr = tonumpyarray(shared_arr).reshape(frame_shape) @@ -244,11 +253,10 @@ def main(): motion_processes = [] for region in regions: detection_process = mp.Process(target=process_frames, args=(shared_arr, - region['output_array'], + object_queue, shared_frame_time, frame_lock, frame_ready, region['motion_detected'], - objects_changed, frame_shape, region['size'], region['x_offset'], region['y_offset'], False)) @@ -267,8 +275,10 @@ def main(): motion_process.daemon = True motion_processes.append(motion_process) - object_parser = ObjectParser(objects_changed, objects_parsed, [region['output_array'] for region in regions]) + object_parser = ObjectParser(object_queue, objects_parsed) object_parser.start() + object_cleaner = ObjectCleaner(objects_parsed) + object_cleaner.start() client = mqtt.Client() client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True) @@ -347,6 +357,7 @@ def main(): for motion_process in motion_processes: motion_process.join() object_parser.join() + object_cleaner.join() mqtt_publisher.join() # convert shared memory array into numpy array @@ -391,8 +402,8 @@ def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_s video.release() # do the actual object detection -def process_frames(shared_arr, shared_output_arr, shared_frame_time, frame_lock, frame_ready, - motion_detected, objects_changed, frame_shape, region_size, region_x_offset, region_y_offset, +def process_frames(shared_arr, object_queue, shared_frame_time, frame_lock, frame_ready, + motion_detected, frame_shape, region_size, region_x_offset, region_y_offset, debug): debug = True # shape shared input array into frame for processing @@ -429,10 +440,10 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, frame_lock, cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB) # do the object detection objects = detect_objects(cropped_frame_rgb, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug) - # copy the detected objects to the output array, filling the array when needed - shared_output_arr[:] = objects + [0.0] * (60-len(objects)) - with objects_changed: - objects_changed.notify_all() + for obj in objects: + obj['frame_time'] = frame_time + object_queue.put(obj) + # do the actual motion detection def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed,