diff --git a/detect_objects.py b/detect_objects.py index bc1b2f778..097068f40 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -120,23 +120,41 @@ class ObjectParser(threading.Thread): }) object_index += 6 DETECTED_OBJECTS = detected_objects -class MqttPublisher(threading.Thread): - def __init__(self, host, topic_prefix, object_classes, motion_flags): + +class MqttMotionPublisher(threading.Thread): + def __init__(self, client, topic_prefix, motion_changed, motion_flags): threading.Thread.__init__(self) - self.client = mqtt.Client() - self.client.will_set(topic_prefix+'/available', payload='offline', qos=1, retain=True) - self.client.connect(host, 1883, 60) - self.client.loop_start() - self.client.publish(topic_prefix+'/available', 'online', retain=True) + self.client = client + self.topic_prefix = topic_prefix + self.motion_changed = motion_changed + self.motion_flags = motion_flags + + def run(self): + last_sent_motion = "" + while True: + with self.motion_changed: + self.motion_changed.wait() + + # send message for motion + motion_status = 'OFF' + if any(obj.is_set() for obj in self.motion_flags): + motion_status = 'ON' + + if last_sent_motion != motion_status: + last_sent_motion = motion_status + self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False) + +class MqttPublisher(threading.Thread): + def __init__(self, client, topic_prefix, object_classes): + threading.Thread.__init__(self) + self.client = client self.topic_prefix = topic_prefix self.object_classes = object_classes - self.motion_flags = motion_flags def run(self): global DETECTED_OBJECTS last_sent_payload = "" - last_motion = "" while True: # initialize the payload payload = {} @@ -154,15 +172,6 @@ class MqttPublisher(threading.Thread): if new_payload != last_sent_payload: last_sent_payload = new_payload self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False) - - # send message for motion - motion_status = 'OFF' - if any(obj.is_set() for obj in self.motion_flags): - motion_status = 'ON' - - if motion_status != last_motion: - last_motion = motion_status - self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False) time.sleep(0.1) @@ -205,6 +214,8 @@ def main(): frame_lock = mp.Lock() # Condition for notifying that a new frame is ready frame_ready = mp.Condition() + # Condition for notifying that motion status changed globally + motion_changed = mp.Condition() # Condition for notifying that object detection ran objects_changed = mp.Condition() # shape current frame so it can be treated as an image @@ -232,6 +243,7 @@ def main(): shared_frame_time, frame_lock, frame_ready, region['motion_detected'], + motion_changed, frame_shape, region['size'], region['x_offset'], region['y_offset'], region['min_object_size'], @@ -242,11 +254,20 @@ def main(): object_parser = ObjectParser(objects_changed, [region['output_array'] for region in regions]) object_parser.start() - mqtt_publisher = MqttPublisher(MQTT_HOST, MQTT_TOPIC_PREFIX, - MQTT_OBJECT_CLASSES.split(','), - [region['motion_detected'] for region in regions]) + client = mqtt.Client() + client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True) + client.connect(MQTT_HOST, 1883, 60) + client.loop_start() + client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True) + + mqtt_publisher = MqttPublisher(client, MQTT_TOPIC_PREFIX, + MQTT_OBJECT_CLASSES.split(',')) mqtt_publisher.start() + mqtt_motion_publisher = MqttMotionPublisher(client, MQTT_TOPIC_PREFIX, motion_changed, + [region['motion_detected'] for region in regions]) + mqtt_motion_publisher.start() + capture_process.start() print("capture_process pid ", capture_process.pid) for detection_process in detection_processes: @@ -392,7 +413,8 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, frame_lock, objects_changed.notify_all() # do the actual motion detection -def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area, debug): +def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed, + frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area, debug): # shape shared input array into frame for processing arr = tonumpyarray(shared_arr).reshape(frame_shape) @@ -406,6 +428,8 @@ def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion if last_motion > 0 and (now - last_motion) > 2: last_motion = -1 motion_detected.clear() + with motion_changed: + motion_changed.notify_all() with frame_ready: # if there isnt a frame ready for processing or it is old, wait for a signal @@ -464,6 +488,8 @@ def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion # if there have been enough consecutive motion frames, report motion if motion_frames >= 3: motion_detected.set() + with motion_changed: + motion_changed.notify_all() last_motion = now else: motion_frames = 0