From 03c57bf67d0ddd02570b122f471a7d133c3b08a0 Mon Sep 17 00:00:00 2001 From: blakeblackshear Date: Sun, 17 Feb 2019 13:12:27 -0600 Subject: [PATCH] use a condition to signal to the mqtt publisher that objects were parsed --- detect_objects.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/detect_objects.py b/detect_objects.py index 097068f40..d69ff4c9e 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -93,9 +93,10 @@ def detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_o return objects class ObjectParser(threading.Thread): - def __init__(self, objects_changed, object_arrays): + def __init__(self, objects_changed, objects_parsed, object_arrays): threading.Thread.__init__(self) self._objects_changed = objects_changed + self._objects_parsed = objects_parsed self._object_arrays = object_arrays def run(self): @@ -103,6 +104,7 @@ class ObjectParser(threading.Thread): while True: detected_objects = [] # wait until object detection has run + # TODO: what if something else changed while I was processing??? with self._objects_changed: self._objects_changed.wait() @@ -120,6 +122,9 @@ class ObjectParser(threading.Thread): }) object_index += 6 DETECTED_OBJECTS = detected_objects + # notify that objects were parsed + with self._objects_parsed: + self._objects_parsed.notify_all() class MqttMotionPublisher(threading.Thread): def __init__(self, client, topic_prefix, motion_changed, motion_flags): @@ -139,16 +144,17 @@ class MqttMotionPublisher(threading.Thread): motion_status = 'OFF' if any(obj.is_set() for obj in self.motion_flags): motion_status = 'ON' - + if last_sent_motion != motion_status: last_sent_motion = motion_status self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False) -class MqttPublisher(threading.Thread): - def __init__(self, client, topic_prefix, object_classes): +class MqttObjectPublisher(threading.Thread): + def __init__(self, client, topic_prefix, objects_parsed, object_classes): threading.Thread.__init__(self) self.client = client self.topic_prefix = topic_prefix + self.objects_parsed = objects_parsed self.object_classes = object_classes def run(self): @@ -156,10 +162,16 @@ class MqttPublisher(threading.Thread): last_sent_payload = "" while True: + # initialize the payload payload = {} for obj in self.object_classes: payload[obj] = [] + + # wait until objects have been parsed + with self.objects_parsed: + self.objects_parsed.wait() + # loop over detected objects and populate # the payload detected_objects = DETECTED_OBJECTS.copy() @@ -167,14 +179,12 @@ class MqttPublisher(threading.Thread): if obj['name'] in self.object_classes: payload[obj['name']].append(obj) - # send message for objects + # send message for objects if different new_payload = json.dumps(payload, sort_keys=True) if new_payload != last_sent_payload: last_sent_payload = new_payload self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False) - time.sleep(0.1) - def main(): # Parse selected regions regions = [] @@ -218,6 +228,8 @@ def main(): motion_changed = mp.Condition() # Condition for notifying that object detection ran objects_changed = mp.Condition() + # Condition for notifying that objects were parsed + objects_parsed = mp.Condition() # shape current frame so it can be treated as an image frame_arr = tonumpyarray(shared_arr).reshape(frame_shape) @@ -251,7 +263,7 @@ def main(): motion_process.daemon = True motion_processes.append(motion_process) - object_parser = ObjectParser(objects_changed, [region['output_array'] for region in regions]) + object_parser = ObjectParser(objects_changed, objects_parsed, [region['output_array'] for region in regions]) object_parser.start() client = mqtt.Client() @@ -260,7 +272,7 @@ def main(): client.loop_start() client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True) - mqtt_publisher = MqttPublisher(client, MQTT_TOPIC_PREFIX, + mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed, MQTT_OBJECT_CLASSES.split(',')) mqtt_publisher.start()