diff --git a/README.md b/README.md index 4f0003c50..cdecea352 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,11 @@ docker run -it --rm \ -v :/label_map.pbtext:ro \ -p 5000:5000 \ -e RTSP_URL='' \ --e REGIONS=',,:,,' \ +-e REGIONS=',,,:,,,' \ +-e MQTT_HOST='your.mqtthost.com' \ +-e MQTT_MOTION_TOPIC='cameras/1/motion' \ +-e MQTT_OBJECT_TOPIC='cameras/1/objects' \ +-e MQTT_OBJECT_CLASSES='person,car,truck' \ realtime-od:latest ``` @@ -36,11 +40,14 @@ Access the mjpeg stream at http://localhost:5000 - Use SSDLite models ## Future improvements -- [ ] Look for a subset of object types +- [ ] Add a max size for motion and objects +- [ ] Filter out detected objects that are not the right size +- [ ] Change color of bounding box if motion detected +- [x] Look for a subset of object types - [ ] Try and simplify the tensorflow model to just look for the objects we care about -- [ ] MQTT messages when detected objects change -- [ ] Implement basic motion detection with opencv and only look for objects in the regions with detected motion -- [ ] Dynamic changes to processing speed, ie. only process 1FPS unless motion detected +- [x] MQTT messages when detected objects change +- [x] Implement basic motion detection with opencv and only look for objects in the regions with detected motion +- [x] Dynamic changes to processing speed, ie. only process 1FPS unless motion detected - [x] Parallel processing to increase FPS - [ ] Look into GPU accelerated decoding of RTSP stream - [ ] Send video over a socket and use JSMPEG diff --git a/detect_objects.py b/detect_objects.py index 8e6be85e1..9c583032f 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -7,12 +7,14 @@ import ctypes import logging import multiprocessing as mp import threading +import json from contextlib import closing import numpy as np import tensorflow as tf from object_detection.utils import label_map_util from object_detection.utils import visualization_utils as vis_util from flask import Flask, Response, make_response +import paho.mqtt.client as mqtt RTSP_URL = os.getenv('RTSP_URL') @@ -22,6 +24,11 @@ PATH_TO_CKPT = '/frozen_inference_graph.pb' # List of the strings that is used to add correct label for each box. PATH_TO_LABELS = '/label_map.pbtext' +MQTT_HOST = os.getenv('MQTT_HOST') +MQTT_MOTION_TOPIC = os.getenv('MQTT_MOTION_TOPIC') +MQTT_OBJECT_TOPIC = os.getenv('MQTT_OBJECT_TOPIC') +MQTT_OBJECT_CLASSES = os.getenv('MQTT_OBJECT_CLASSES') + # TODO: make dynamic? NUM_CLASSES = 90 @@ -96,7 +103,50 @@ class ObjectParser(threading.Thread): }) object_index += 6 DETECTED_OBJECTS = detected_objects - time.sleep(0.01) + time.sleep(0.1) +class MqttPublisher(threading.Thread): + def __init__(self, host, motion_topic, object_topic, object_classes, motion_flags): + threading.Thread.__init__(self) + self.client = mqtt.Client() + self.client.connect(host, 1883, 60) + self.client.loop_start() + self.motion_topic = motion_topic + self.object_topic = object_topic + self.object_classes = object_classes + self.motion_flags = motion_flags + + def run(self): + global DETECTED_OBJECTS + + last_sent_payload = "" + last_motion = "" + while True: + # initialize the payload + payload = {} + for obj in self.object_classes: + payload[obj] = [] + # loop over detected objects and populate + # the payload + detected_objects = DETECTED_OBJECTS.copy() + for obj in detected_objects: + if obj['name'] in self.object_classes: + payload[obj['name']].append(obj) + + new_payload = json.dumps(payload, sort_keys=True) + if new_payload != last_sent_payload: + last_sent_payload = new_payload + self.client.publish(self.object_topic, new_payload, retain=False) + + motion_status = 'OFF' + if any(obj.value == 1 for obj in self.motion_flags): + motion_status = 'ON' + + if motion_status != last_motion: + last_motion = motion_status + self.client.publish(self.motion_topic, motion_status, retain=False) + + + time.sleep(0.1) def main(): # Parse selected regions @@ -173,6 +223,11 @@ def main(): object_parser = ObjectParser([obj['output_array'] for obj in shared_memory_objects]) object_parser.start() + mqtt_publisher = MqttPublisher(MQTT_HOST, MQTT_MOTION_TOPIC, MQTT_OBJECT_TOPIC, + MQTT_OBJECT_CLASSES.split(','), + [obj['motion_detected'] for obj in shared_memory_objects]) + mqtt_publisher.start() + capture_process.start() print("capture_process pid ", capture_process.pid) for detection_process in detection_processes: @@ -237,6 +292,7 @@ def main(): for motion_process in motion_processes: motion_process.join() object_parser.join() + mqtt_publisher.join() # convert shared memory array into numpy array def tonumpyarray(mp_arr): @@ -272,7 +328,7 @@ def fetch_frames(shared_arr, shared_frame_time, ready_for_frame_flags, frame_sha flag.value = 0 else: # sleep a little to reduce CPU usage - time.sleep(0.01) + time.sleep(0.1) video.release() @@ -297,7 +353,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti now = datetime.datetime.now().timestamp() # if there is no motion detected if shared_motion.value == 0: - time.sleep(0.01) + time.sleep(0.1) continue # if there isnt a new frame ready for processing @@ -312,7 +368,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti print("sleeping because no frames have been available in a while") else: # rest a little bit to avoid maxing out the CPU - time.sleep(0.01) + time.sleep(0.1) continue # we got a valid frame, so reset the timer @@ -321,7 +377,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti # if the frame is more than 0.5 second old, ignore it if (now - shared_frame_time.value) > 0.5: # rest a little bit to avoid maxing out the CPU - time.sleep(0.01) + time.sleep(0.1) continue # make a copy of the cropped frame @@ -362,7 +418,7 @@ def detect_motion(shared_arr, shared_frame_time, ready_for_frame, shared_motion, print("sleeping because no frames have been available in a while") else: # rest a little bit to avoid maxing out the CPU - time.sleep(0.01) + time.sleep(0.1) if ready_for_frame.value == 0: ready_for_frame.value = 1 continue @@ -375,7 +431,7 @@ def detect_motion(shared_arr, shared_frame_time, ready_for_frame, shared_motion, # signal that we need a new frame ready_for_frame.value = 1 # rest a little bit to avoid maxing out the CPU - time.sleep(0.01) + time.sleep(0.1) continue # make a copy of the cropped frame