switch to a queue for detected objects and expire objects after 1 second

This commit is contained in:
blakeblackshear 2019-02-24 08:13:36 -06:00
parent 122a1666ca
commit b4e5c812ce

View File

@ -81,51 +81,63 @@ def detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_o
score = scores[0, index] score = scores[0, index]
if score > 0.5: if score > 0.5:
box = boxes[0, index].tolist() box = boxes[0, index].tolist()
box[0] = (box[0] * region_size) + region_y_offset objects.append({
box[1] = (box[1] * region_size) + region_x_offset 'name': str(category_index.get(value).get('name')),
box[2] = (box[2] * region_size) + region_y_offset 'score': float(score),
box[3] = (box[3] * region_size) + region_x_offset 'ymin': int((box[0] * region_size) + region_y_offset),
objects += [value, scores[0, index]] + box 'xmin': int((box[1] * region_size) + region_x_offset),
# only get the first 10 objects 'ymax': int((box[2] * region_size) + region_y_offset),
if len(objects) == 60: 'xmax': int((box[3] * region_size) + region_x_offset)
break })
return objects return objects
class ObjectParser(threading.Thread): class ObjectParser(threading.Thread):
def __init__(self, objects_changed, objects_parsed, object_arrays): def __init__(self, object_queue, objects_parsed):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self._objects_changed = objects_changed self._object_queue = object_queue
self._objects_parsed = objects_parsed self._objects_parsed = objects_parsed
self._object_arrays = object_arrays
def run(self): def run(self):
global DETECTED_OBJECTS global DETECTED_OBJECTS
while True: while True:
detected_objects = [] obj = self._object_queue.get()
# wait until object detection has run print(obj)
# TODO: what if something else changed while I was processing??? DETECTED_OBJECTS.append(obj)
with self._objects_changed:
self._objects_changed.wait()
for object_array in self._object_arrays:
object_index = 0
while(object_index < 60 and object_array[object_index] > 0):
object_class = object_array[object_index]
detected_objects.append({
'name': str(category_index.get(object_class).get('name')),
'score': object_array[object_index+1],
'ymin': int(object_array[object_index+2]),
'xmin': int(object_array[object_index+3]),
'ymax': int(object_array[object_index+4]),
'xmax': int(object_array[object_index+5])
})
object_index += 6
DETECTED_OBJECTS = detected_objects
# notify that objects were parsed # notify that objects were parsed
with self._objects_parsed: with self._objects_parsed:
self._objects_parsed.notify_all() self._objects_parsed.notify_all()
class ObjectCleaner(threading.Thread):
def __init__(self, objects_parsed):
threading.Thread.__init__(self)
self._objects_parsed = objects_parsed
def run(self):
global DETECTED_OBJECTS
while True:
# expire the objects that are more than 1 second old
now = datetime.datetime.now().timestamp()
# look for the first object found within the last second
# (newest objects are appended to the end)
detected_objects = DETECTED_OBJECTS.copy()
num_to_delete = 0
for obj in detected_objects:
if now-obj['frame_time']<1:
break
num_to_delete += 1
if num_to_delete > 0:
del DETECTED_OBJECTS[:num_to_delete]
# notify that parsed objects were changed
with self._objects_parsed:
self._objects_parsed.notify_all()
# wait a bit before checking for more expired frames
time.sleep(0.2)
class MqttMotionPublisher(threading.Thread): class MqttMotionPublisher(threading.Thread):
def __init__(self, client, topic_prefix, motion_changed, motion_flags): def __init__(self, client, topic_prefix, motion_changed, motion_flags):
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -165,20 +177,17 @@ class MqttObjectPublisher(threading.Thread):
# initialize the payload # initialize the payload
payload = {} payload = {}
for obj in self.object_classes:
payload[obj] = []
# wait until objects have been parsed # wait until objects have been parsed
with self.objects_parsed: with self.objects_parsed:
self.objects_parsed.wait() self.objects_parsed.wait()
# loop over detected objects and populate # add all the person scores in detected objects and
# the payload # average over past 1 seconds (5fps)
detected_objects = DETECTED_OBJECTS.copy() detected_objects = DETECTED_OBJECTS.copy()
for obj in detected_objects: avg_person_score = sum([obj['score'] for obj in detected_objects if obj['name'] == 'person'])/5
if obj['name'] in self.object_classes: payload['person'] = int(avg_person_score*100)
payload[obj['name']].append(obj)
# send message for objects if different # send message for objects if different
new_payload = json.dumps(payload, sort_keys=True) new_payload = json.dumps(payload, sort_keys=True)
if new_payload != last_sent_payload: if new_payload != last_sent_payload:
@ -229,10 +238,10 @@ def main():
frame_ready = mp.Condition() frame_ready = mp.Condition()
# Condition for notifying that motion status changed globally # Condition for notifying that motion status changed globally
motion_changed = mp.Condition() motion_changed = mp.Condition()
# Condition for notifying that object detection ran
objects_changed = mp.Condition()
# Condition for notifying that objects were parsed # Condition for notifying that objects were parsed
objects_parsed = mp.Condition() objects_parsed = mp.Condition()
# Queue for detected objects
object_queue = mp.Queue()
# shape current frame so it can be treated as an image # shape current frame so it can be treated as an image
frame_arr = tonumpyarray(shared_arr).reshape(frame_shape) frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
@ -244,11 +253,10 @@ def main():
motion_processes = [] motion_processes = []
for region in regions: for region in regions:
detection_process = mp.Process(target=process_frames, args=(shared_arr, detection_process = mp.Process(target=process_frames, args=(shared_arr,
region['output_array'], object_queue,
shared_frame_time, shared_frame_time,
frame_lock, frame_ready, frame_lock, frame_ready,
region['motion_detected'], region['motion_detected'],
objects_changed,
frame_shape, frame_shape,
region['size'], region['x_offset'], region['y_offset'], region['size'], region['x_offset'], region['y_offset'],
False)) False))
@ -267,8 +275,10 @@ def main():
motion_process.daemon = True motion_process.daemon = True
motion_processes.append(motion_process) motion_processes.append(motion_process)
object_parser = ObjectParser(objects_changed, objects_parsed, [region['output_array'] for region in regions]) object_parser = ObjectParser(object_queue, objects_parsed)
object_parser.start() object_parser.start()
object_cleaner = ObjectCleaner(objects_parsed)
object_cleaner.start()
client = mqtt.Client() client = mqtt.Client()
client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True) client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
@ -347,6 +357,7 @@ def main():
for motion_process in motion_processes: for motion_process in motion_processes:
motion_process.join() motion_process.join()
object_parser.join() object_parser.join()
object_cleaner.join()
mqtt_publisher.join() mqtt_publisher.join()
# convert shared memory array into numpy array # convert shared memory array into numpy array
@ -391,8 +402,8 @@ def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_s
video.release() video.release()
# do the actual object detection # do the actual object detection
def process_frames(shared_arr, shared_output_arr, shared_frame_time, frame_lock, frame_ready, def process_frames(shared_arr, object_queue, shared_frame_time, frame_lock, frame_ready,
motion_detected, objects_changed, frame_shape, region_size, region_x_offset, region_y_offset, motion_detected, frame_shape, region_size, region_x_offset, region_y_offset,
debug): debug):
debug = True debug = True
# shape shared input array into frame for processing # shape shared input array into frame for processing
@ -429,10 +440,10 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, frame_lock,
cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB) cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB)
# do the object detection # do the object detection
objects = detect_objects(cropped_frame_rgb, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug) objects = detect_objects(cropped_frame_rgb, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug)
# copy the detected objects to the output array, filling the array when needed for obj in objects:
shared_output_arr[:] = objects + [0.0] * (60-len(objects)) obj['frame_time'] = frame_time
with objects_changed: object_queue.put(obj)
objects_changed.notify_all()
# do the actual motion detection # do the actual motion detection
def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed, def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed,