Merge pull request #3 from blakeblackshear/mqtt

Mqtt
This commit is contained in:
Blake Blackshear 2019-02-10 12:01:47 -06:00 committed by GitHub
commit 30c223711d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 13 deletions

View File

@ -41,7 +41,8 @@ RUN pip install -U pip \
keras \ keras \
autovizwidget \ autovizwidget \
Flask \ Flask \
imutils imutils \
paho-mqtt
# Install tensorflow models object detection # Install tensorflow models object detection
RUN GIT_SSL_NO_VERIFY=true git clone -q https://github.com/tensorflow/models /usr/local/lib/python3.5/dist-packages/tensorflow/models RUN GIT_SSL_NO_VERIFY=true git clone -q https://github.com/tensorflow/models /usr/local/lib/python3.5/dist-packages/tensorflow/models

View File

@ -25,7 +25,11 @@ docker run -it --rm \
-v <path_to_labelmap.pbtext>:/label_map.pbtext:ro \ -v <path_to_labelmap.pbtext>:/label_map.pbtext:ro \
-p 5000:5000 \ -p 5000:5000 \
-e RTSP_URL='<rtsp_url>' \ -e RTSP_URL='<rtsp_url>' \
-e REGIONS='<box_size_1>,<x_offset_1>,<y_offset_1>:<box_size_2>,<x_offset_2>,<y_offset_2>' \ -e REGIONS='<box_size_1>,<x_offset_1>,<y_offset_1>,<min_object_size_1>:<box_size_2>,<x_offset_2>,<y_offset_2>,<min_object_size_2>' \
-e MQTT_HOST='your.mqtthost.com' \
-e MQTT_MOTION_TOPIC='cameras/1/motion' \
-e MQTT_OBJECT_TOPIC='cameras/1/objects' \
-e MQTT_OBJECT_CLASSES='person,car,truck' \
realtime-od:latest realtime-od:latest
``` ```
@ -36,11 +40,14 @@ Access the mjpeg stream at http://localhost:5000
- Use SSDLite models - Use SSDLite models
## Future improvements ## Future improvements
- [ ] Look for a subset of object types - [ ] Add a max size for motion and objects
- [ ] Filter out detected objects that are not the right size
- [ ] Change color of bounding box if motion detected
- [x] Look for a subset of object types
- [ ] Try and simplify the tensorflow model to just look for the objects we care about - [ ] Try and simplify the tensorflow model to just look for the objects we care about
- [ ] MQTT messages when detected objects change - [x] MQTT messages when detected objects change
- [ ] Implement basic motion detection with opencv and only look for objects in the regions with detected motion - [x] Implement basic motion detection with opencv and only look for objects in the regions with detected motion
- [ ] Dynamic changes to processing speed, ie. only process 1FPS unless motion detected - [x] Dynamic changes to processing speed, ie. only process 1FPS unless motion detected
- [x] Parallel processing to increase FPS - [x] Parallel processing to increase FPS
- [ ] Look into GPU accelerated decoding of RTSP stream - [ ] Look into GPU accelerated decoding of RTSP stream
- [ ] Send video over a socket and use JSMPEG - [ ] Send video over a socket and use JSMPEG

View File

@ -7,12 +7,14 @@ import ctypes
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import threading import threading
import json
from contextlib import closing from contextlib import closing
import numpy as np import numpy as np
import tensorflow as tf import tensorflow as tf
from object_detection.utils import label_map_util from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util from object_detection.utils import visualization_utils as vis_util
from flask import Flask, Response, make_response from flask import Flask, Response, make_response
import paho.mqtt.client as mqtt
RTSP_URL = os.getenv('RTSP_URL') RTSP_URL = os.getenv('RTSP_URL')
@ -22,6 +24,11 @@ PATH_TO_CKPT = '/frozen_inference_graph.pb'
# List of the strings that is used to add correct label for each box. # List of the strings that is used to add correct label for each box.
PATH_TO_LABELS = '/label_map.pbtext' PATH_TO_LABELS = '/label_map.pbtext'
MQTT_HOST = os.getenv('MQTT_HOST')
MQTT_MOTION_TOPIC = os.getenv('MQTT_MOTION_TOPIC')
MQTT_OBJECT_TOPIC = os.getenv('MQTT_OBJECT_TOPIC')
MQTT_OBJECT_CLASSES = os.getenv('MQTT_OBJECT_CLASSES')
# TODO: make dynamic? # TODO: make dynamic?
NUM_CLASSES = 90 NUM_CLASSES = 90
@ -96,7 +103,50 @@ class ObjectParser(threading.Thread):
}) })
object_index += 6 object_index += 6
DETECTED_OBJECTS = detected_objects DETECTED_OBJECTS = detected_objects
time.sleep(0.01) time.sleep(0.1)
class MqttPublisher(threading.Thread):
def __init__(self, host, motion_topic, object_topic, object_classes, motion_flags):
threading.Thread.__init__(self)
self.client = mqtt.Client()
self.client.connect(host, 1883, 60)
self.client.loop_start()
self.motion_topic = motion_topic
self.object_topic = object_topic
self.object_classes = object_classes
self.motion_flags = motion_flags
def run(self):
global DETECTED_OBJECTS
last_sent_payload = ""
last_motion = ""
while True:
# initialize the payload
payload = {}
for obj in self.object_classes:
payload[obj] = []
# loop over detected objects and populate
# the payload
detected_objects = DETECTED_OBJECTS.copy()
for obj in detected_objects:
if obj['name'] in self.object_classes:
payload[obj['name']].append(obj)
new_payload = json.dumps(payload, sort_keys=True)
if new_payload != last_sent_payload:
last_sent_payload = new_payload
self.client.publish(self.object_topic, new_payload, retain=False)
motion_status = 'OFF'
if any(obj.value == 1 for obj in self.motion_flags):
motion_status = 'ON'
if motion_status != last_motion:
last_motion = motion_status
self.client.publish(self.motion_topic, motion_status, retain=False)
time.sleep(0.1)
def main(): def main():
# Parse selected regions # Parse selected regions
@ -173,6 +223,11 @@ def main():
object_parser = ObjectParser([obj['output_array'] for obj in shared_memory_objects]) object_parser = ObjectParser([obj['output_array'] for obj in shared_memory_objects])
object_parser.start() object_parser.start()
mqtt_publisher = MqttPublisher(MQTT_HOST, MQTT_MOTION_TOPIC, MQTT_OBJECT_TOPIC,
MQTT_OBJECT_CLASSES.split(','),
[obj['motion_detected'] for obj in shared_memory_objects])
mqtt_publisher.start()
capture_process.start() capture_process.start()
print("capture_process pid ", capture_process.pid) print("capture_process pid ", capture_process.pid)
for detection_process in detection_processes: for detection_process in detection_processes:
@ -237,6 +292,7 @@ def main():
for motion_process in motion_processes: for motion_process in motion_processes:
motion_process.join() motion_process.join()
object_parser.join() object_parser.join()
mqtt_publisher.join()
# convert shared memory array into numpy array # convert shared memory array into numpy array
def tonumpyarray(mp_arr): def tonumpyarray(mp_arr):
@ -272,7 +328,7 @@ def fetch_frames(shared_arr, shared_frame_time, ready_for_frame_flags, frame_sha
flag.value = 0 flag.value = 0
else: else:
# sleep a little to reduce CPU usage # sleep a little to reduce CPU usage
time.sleep(0.01) time.sleep(0.1)
video.release() video.release()
@ -297,7 +353,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
# if there is no motion detected # if there is no motion detected
if shared_motion.value == 0: if shared_motion.value == 0:
time.sleep(0.01) time.sleep(0.1)
continue continue
# if there isnt a new frame ready for processing # if there isnt a new frame ready for processing
@ -312,7 +368,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti
print("sleeping because no frames have been available in a while") print("sleeping because no frames have been available in a while")
else: else:
# rest a little bit to avoid maxing out the CPU # rest a little bit to avoid maxing out the CPU
time.sleep(0.01) time.sleep(0.1)
continue continue
# we got a valid frame, so reset the timer # we got a valid frame, so reset the timer
@ -321,7 +377,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti
# if the frame is more than 0.5 second old, ignore it # if the frame is more than 0.5 second old, ignore it
if (now - shared_frame_time.value) > 0.5: if (now - shared_frame_time.value) > 0.5:
# rest a little bit to avoid maxing out the CPU # rest a little bit to avoid maxing out the CPU
time.sleep(0.01) time.sleep(0.1)
continue continue
# make a copy of the cropped frame # make a copy of the cropped frame
@ -362,7 +418,7 @@ def detect_motion(shared_arr, shared_frame_time, ready_for_frame, shared_motion,
print("sleeping because no frames have been available in a while") print("sleeping because no frames have been available in a while")
else: else:
# rest a little bit to avoid maxing out the CPU # rest a little bit to avoid maxing out the CPU
time.sleep(0.01) time.sleep(0.1)
if ready_for_frame.value == 0: if ready_for_frame.value == 0:
ready_for_frame.value = 1 ready_for_frame.value = 1
continue continue
@ -375,7 +431,7 @@ def detect_motion(shared_arr, shared_frame_time, ready_for_frame, shared_motion,
# signal that we need a new frame # signal that we need a new frame
ready_for_frame.value = 1 ready_for_frame.value = 1
# rest a little bit to avoid maxing out the CPU # rest a little bit to avoid maxing out the CPU
time.sleep(0.01) time.sleep(0.1)
continue continue
# make a copy of the cropped frame # make a copy of the cropped frame