use a condition to signal to the mqtt publisher that objects were parsed

This commit is contained in:
blakeblackshear 2019-02-17 13:12:27 -06:00
parent 42ff739cea
commit 03c57bf67d

View File

@ -93,9 +93,10 @@ def detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_o
return objects return objects
class ObjectParser(threading.Thread): class ObjectParser(threading.Thread):
def __init__(self, objects_changed, object_arrays): def __init__(self, objects_changed, objects_parsed, object_arrays):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self._objects_changed = objects_changed self._objects_changed = objects_changed
self._objects_parsed = objects_parsed
self._object_arrays = object_arrays self._object_arrays = object_arrays
def run(self): def run(self):
@ -103,6 +104,7 @@ class ObjectParser(threading.Thread):
while True: while True:
detected_objects = [] detected_objects = []
# wait until object detection has run # wait until object detection has run
# TODO: what if something else changed while I was processing???
with self._objects_changed: with self._objects_changed:
self._objects_changed.wait() self._objects_changed.wait()
@ -120,6 +122,9 @@ class ObjectParser(threading.Thread):
}) })
object_index += 6 object_index += 6
DETECTED_OBJECTS = detected_objects DETECTED_OBJECTS = detected_objects
# notify that objects were parsed
with self._objects_parsed:
self._objects_parsed.notify_all()
class MqttMotionPublisher(threading.Thread): class MqttMotionPublisher(threading.Thread):
def __init__(self, client, topic_prefix, motion_changed, motion_flags): def __init__(self, client, topic_prefix, motion_changed, motion_flags):
@ -139,16 +144,17 @@ class MqttMotionPublisher(threading.Thread):
motion_status = 'OFF' motion_status = 'OFF'
if any(obj.is_set() for obj in self.motion_flags): if any(obj.is_set() for obj in self.motion_flags):
motion_status = 'ON' motion_status = 'ON'
if last_sent_motion != motion_status: if last_sent_motion != motion_status:
last_sent_motion = motion_status last_sent_motion = motion_status
self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False) self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False)
class MqttPublisher(threading.Thread): class MqttObjectPublisher(threading.Thread):
def __init__(self, client, topic_prefix, object_classes): def __init__(self, client, topic_prefix, objects_parsed, object_classes):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.client = client self.client = client
self.topic_prefix = topic_prefix self.topic_prefix = topic_prefix
self.objects_parsed = objects_parsed
self.object_classes = object_classes self.object_classes = object_classes
def run(self): def run(self):
@ -156,10 +162,16 @@ class MqttPublisher(threading.Thread):
last_sent_payload = "" last_sent_payload = ""
while True: while True:
# initialize the payload # initialize the payload
payload = {} payload = {}
for obj in self.object_classes: for obj in self.object_classes:
payload[obj] = [] payload[obj] = []
# wait until objects have been parsed
with self.objects_parsed:
self.objects_parsed.wait()
# loop over detected objects and populate # loop over detected objects and populate
# the payload # the payload
detected_objects = DETECTED_OBJECTS.copy() detected_objects = DETECTED_OBJECTS.copy()
@ -167,14 +179,12 @@ class MqttPublisher(threading.Thread):
if obj['name'] in self.object_classes: if obj['name'] in self.object_classes:
payload[obj['name']].append(obj) payload[obj['name']].append(obj)
# send message for objects # send message for objects if different
new_payload = json.dumps(payload, sort_keys=True) new_payload = json.dumps(payload, sort_keys=True)
if new_payload != last_sent_payload: if new_payload != last_sent_payload:
last_sent_payload = new_payload last_sent_payload = new_payload
self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False) self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False)
time.sleep(0.1)
def main(): def main():
# Parse selected regions # Parse selected regions
regions = [] regions = []
@ -218,6 +228,8 @@ def main():
motion_changed = mp.Condition() motion_changed = mp.Condition()
# Condition for notifying that object detection ran # Condition for notifying that object detection ran
objects_changed = mp.Condition() objects_changed = mp.Condition()
# Condition for notifying that objects were parsed
objects_parsed = mp.Condition()
# shape current frame so it can be treated as an image # shape current frame so it can be treated as an image
frame_arr = tonumpyarray(shared_arr).reshape(frame_shape) frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
@ -251,7 +263,7 @@ def main():
motion_process.daemon = True motion_process.daemon = True
motion_processes.append(motion_process) motion_processes.append(motion_process)
object_parser = ObjectParser(objects_changed, [region['output_array'] for region in regions]) object_parser = ObjectParser(objects_changed, objects_parsed, [region['output_array'] for region in regions])
object_parser.start() object_parser.start()
client = mqtt.Client() client = mqtt.Client()
@ -260,7 +272,7 @@ def main():
client.loop_start() client.loop_start()
client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True) client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
mqtt_publisher = MqttPublisher(client, MQTT_TOPIC_PREFIX, mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed,
MQTT_OBJECT_CLASSES.split(',')) MQTT_OBJECT_CLASSES.split(','))
mqtt_publisher.start() mqtt_publisher.start()