diff --git a/README.md b/README.md index 63d522054..a501ce7f7 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,9 @@ automation: caption: A person was detected. ``` +## Disabling Detection +You can disable or enable detection via mqtt by publishing to `topic_prefix/detection` then you want to send a payload of either 'enable' or 'disable'. + ## Tips - Lower the framerate of the video feed on the camera to reduce the CPU usage for capturing the feed @@ -111,7 +114,7 @@ automation: - [ ] See if motion detection is even worth running - [ ] Scan for people across entire image rather than specfic regions - [ ] Dynamically resize detection area and follow people -- [ ] Add ability to turn detection on and off via MQTT +- [x] Add ability to turn detection on and off via MQTT - [ ] Output movie clips of people for notifications, etc. - [ ] Integrate with homeassistant push camera - [ ] Merge bounding boxes that span multiple regions diff --git a/detect_objects.py b/detect_objects.py index 565f36f30..eb197c953 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -9,6 +9,8 @@ import paho.mqtt.client as mqtt from frigate.video import Camera from frigate.object_detection import PreppedQueueProcessor +from frigate.mqtt import MqttObjectConsumer + with open('/config/config.yml') as f: CONFIG = yaml.safe_load(f) @@ -62,13 +64,25 @@ def main(): print ("Unable to connect to MQTT: Connection refused. Error code: " + str(rc)) # publish a message to signal that the service is running client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True) + + # start a thread to listen for responses over mqtt + listen_queue = queue.Queue(10) # since we are listening for messages shouldn't need a very large queue + def on_message(client, obj, msg): + if msg.topic.startswith(MQTT_TOPIC_PREFIX): + payload = str(msg.payload.decode("utf-8")) + listen_queue.put({ + 'topic': msg.topic, + 'payload': payload + }) client = mqtt.Client(client_id=MQTT_CLIENT_ID) client.on_connect = on_connect + client.on_message = on_message client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True) if not MQTT_USER is None: client.username_pw_set(MQTT_USER, password=MQTT_PASS) client.connect(MQTT_HOST, MQTT_PORT, 60) client.loop_start() + client.subscribe("frigate/detection") # Queue for prepped frames, max size set to (number of cameras * 5) max_queue_size = len(CONFIG['cameras'].items())*5 @@ -78,6 +92,9 @@ def main(): for name, config in CONFIG['cameras'].items(): cameras[name] = Camera(name, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, config, prepped_frame_queue, client, MQTT_TOPIC_PREFIX) + mqtt_listener = MqttObjectConsumer(client, MQTT_TOPIC_PREFIX, listen_queue, cameras.values()) + mqtt_listener.start() + prepped_queue_processor = PreppedQueueProcessor( cameras, prepped_frame_queue diff --git a/frigate/mqtt.py b/frigate/mqtt.py index c053172fa..0787a5712 100644 --- a/frigate/mqtt.py +++ b/frigate/mqtt.py @@ -2,6 +2,7 @@ import json import cv2 import threading from collections import Counter, defaultdict +import subprocess as sp class MqttObjectPublisher(threading.Thread): def __init__(self, client, topic_prefix, objects_parsed, detected_objects, best_frames): @@ -44,4 +45,28 @@ class MqttObjectPublisher(threading.Thread): expired_objects = [obj_name for obj_name, status in current_object_status.items() if status == 'ON' and not obj_name in obj_counter] for obj_name in expired_objects: current_object_status[obj_name] = 'OFF' - self.client.publish(self.topic_prefix+'/'+obj_name, 'OFF', retain=False) \ No newline at end of file + self.client.publish(self.topic_prefix+'/'+obj_name, 'OFF', retain=False) + +class MqttObjectConsumer(threading.Thread): + def __init__(self, client, topic_prefix, listen_queue, cameras): + threading.Thread.__init__(self) + self.client = client + self.topic_prefix = topic_prefix + self.listen_queue = listen_queue + self.cameras = cameras + + def run(self): + while True: + # Now we want to see if there are more messages and then process them + item = self.listen_queue.get() + topic = item['topic'] + payload = item['payload'] + if topic == ("%s/detection" % self.topic_prefix): + if payload == 'enable': + for camera in self.cameras: + camera.start_or_restart_capture() + elif payload == 'disable': + for camera in self.cameras: + camera.disable_capture() + camera.watchdog.disable = True + camera.watchdog = None diff --git a/frigate/video.py b/frigate/video.py index e8532f6d3..b6cc930ba 100644 --- a/frigate/video.py +++ b/frigate/video.py @@ -8,6 +8,7 @@ import multiprocessing as mp import subprocess as sp import numpy as np from collections import defaultdict +import queue from . util import tonumpyarray, draw_box_with_label from . object_detection import FramePrepper from . objects import ObjectCleaner, BestFrames @@ -64,10 +65,11 @@ class CameraWatchdog(threading.Thread): def __init__(self, camera): threading.Thread.__init__(self) self.camera = camera + self.disable = False def run(self): - while True: + while not self.disable: # wait a bit before checking time.sleep(10) @@ -206,9 +208,10 @@ class Camera: self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8) self.mask[:] = 255 - - def start_or_restart_capture(self): - if not self.ffmpeg_process is None: + def disable_capture(self): + if self.ffmpeg_process is None: + print("Attempted to disable capture but was already disabled") + else: print("Terminating the existing ffmpeg process...") self.ffmpeg_process.terminate() try: @@ -223,6 +226,10 @@ class Camera: self.capture_thread.join() self.ffmpeg_process = None self.capture_thread = None + + def start_or_restart_capture(self): + if not self.ffmpeg_process is None: + self.disable_capture() # create the process to capture frames from the input stream and store in a shared array print("Creating a new ffmpeg process...") @@ -232,7 +239,10 @@ class Camera: self.capture_thread = CameraCapture(self) print("Starting a new capture thread...") self.capture_thread.start() - + + if self.watchdog == None: + self.watchdog = CameraWatchdog(self) + def start_ffmpeg(self): ffmpeg_cmd = (['ffmpeg'] + self.ffmpeg_global_args +