move mqtt motion publishing into separate thread

This commit is contained in:
blakeblackshear 2019-02-17 12:59:51 -06:00
parent 92e1833def
commit 42ff739cea

View File

@ -120,23 +120,41 @@ class ObjectParser(threading.Thread):
}) })
object_index += 6 object_index += 6
DETECTED_OBJECTS = detected_objects DETECTED_OBJECTS = detected_objects
class MqttPublisher(threading.Thread):
def __init__(self, host, topic_prefix, object_classes, motion_flags): class MqttMotionPublisher(threading.Thread):
def __init__(self, client, topic_prefix, motion_changed, motion_flags):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.client = mqtt.Client() self.client = client
self.client.will_set(topic_prefix+'/available', payload='offline', qos=1, retain=True) self.topic_prefix = topic_prefix
self.client.connect(host, 1883, 60) self.motion_changed = motion_changed
self.client.loop_start() self.motion_flags = motion_flags
self.client.publish(topic_prefix+'/available', 'online', retain=True)
def run(self):
last_sent_motion = ""
while True:
with self.motion_changed:
self.motion_changed.wait()
# send message for motion
motion_status = 'OFF'
if any(obj.is_set() for obj in self.motion_flags):
motion_status = 'ON'
if last_sent_motion != motion_status:
last_sent_motion = motion_status
self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False)
class MqttPublisher(threading.Thread):
def __init__(self, client, topic_prefix, object_classes):
threading.Thread.__init__(self)
self.client = client
self.topic_prefix = topic_prefix self.topic_prefix = topic_prefix
self.object_classes = object_classes self.object_classes = object_classes
self.motion_flags = motion_flags
def run(self): def run(self):
global DETECTED_OBJECTS global DETECTED_OBJECTS
last_sent_payload = "" last_sent_payload = ""
last_motion = ""
while True: while True:
# initialize the payload # initialize the payload
payload = {} payload = {}
@ -155,15 +173,6 @@ class MqttPublisher(threading.Thread):
last_sent_payload = new_payload last_sent_payload = new_payload
self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False) self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False)
# send message for motion
motion_status = 'OFF'
if any(obj.is_set() for obj in self.motion_flags):
motion_status = 'ON'
if motion_status != last_motion:
last_motion = motion_status
self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False)
time.sleep(0.1) time.sleep(0.1)
def main(): def main():
@ -205,6 +214,8 @@ def main():
frame_lock = mp.Lock() frame_lock = mp.Lock()
# Condition for notifying that a new frame is ready # Condition for notifying that a new frame is ready
frame_ready = mp.Condition() frame_ready = mp.Condition()
# Condition for notifying that motion status changed globally
motion_changed = mp.Condition()
# Condition for notifying that object detection ran # Condition for notifying that object detection ran
objects_changed = mp.Condition() objects_changed = mp.Condition()
# shape current frame so it can be treated as an image # shape current frame so it can be treated as an image
@ -232,6 +243,7 @@ def main():
shared_frame_time, shared_frame_time,
frame_lock, frame_ready, frame_lock, frame_ready,
region['motion_detected'], region['motion_detected'],
motion_changed,
frame_shape, frame_shape,
region['size'], region['x_offset'], region['y_offset'], region['size'], region['x_offset'], region['y_offset'],
region['min_object_size'], region['min_object_size'],
@ -242,11 +254,20 @@ def main():
object_parser = ObjectParser(objects_changed, [region['output_array'] for region in regions]) object_parser = ObjectParser(objects_changed, [region['output_array'] for region in regions])
object_parser.start() object_parser.start()
mqtt_publisher = MqttPublisher(MQTT_HOST, MQTT_TOPIC_PREFIX, client = mqtt.Client()
MQTT_OBJECT_CLASSES.split(','), client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
[region['motion_detected'] for region in regions]) client.connect(MQTT_HOST, 1883, 60)
client.loop_start()
client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
mqtt_publisher = MqttPublisher(client, MQTT_TOPIC_PREFIX,
MQTT_OBJECT_CLASSES.split(','))
mqtt_publisher.start() mqtt_publisher.start()
mqtt_motion_publisher = MqttMotionPublisher(client, MQTT_TOPIC_PREFIX, motion_changed,
[region['motion_detected'] for region in regions])
mqtt_motion_publisher.start()
capture_process.start() capture_process.start()
print("capture_process pid ", capture_process.pid) print("capture_process pid ", capture_process.pid)
for detection_process in detection_processes: for detection_process in detection_processes:
@ -392,7 +413,8 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, frame_lock,
objects_changed.notify_all() objects_changed.notify_all()
# do the actual motion detection # do the actual motion detection
def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area, debug): def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed,
frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area, debug):
# shape shared input array into frame for processing # shape shared input array into frame for processing
arr = tonumpyarray(shared_arr).reshape(frame_shape) arr = tonumpyarray(shared_arr).reshape(frame_shape)
@ -406,6 +428,8 @@ def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion
if last_motion > 0 and (now - last_motion) > 2: if last_motion > 0 and (now - last_motion) > 2:
last_motion = -1 last_motion = -1
motion_detected.clear() motion_detected.clear()
with motion_changed:
motion_changed.notify_all()
with frame_ready: with frame_ready:
# if there isnt a frame ready for processing or it is old, wait for a signal # if there isnt a frame ready for processing or it is old, wait for a signal
@ -464,6 +488,8 @@ def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion
# if there have been enough consecutive motion frames, report motion # if there have been enough consecutive motion frames, report motion
if motion_frames >= 3: if motion_frames >= 3:
motion_detected.set() motion_detected.set()
with motion_changed:
motion_changed.notify_all()
last_motion = now last_motion = now
else: else:
motion_frames = 0 motion_frames = 0