diff --git a/frigate/http.py b/frigate/http.py index ae4587ad5..c34292b2a 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -38,18 +38,28 @@ class MqttBackend(): """Register a WebSocket connection for Mqtt updates.""" self.clients.append(client) + def publish(self, message): + json_message = json.loads(message) + self.mqtt_client.publish(json_message['topic'], json_message['payload'], retain=json_message['retain']) + def run(self): def send(client, userdata, message): """Sends mqtt messages to clients.""" - logger.info(f"Sending mqtt to ws clients {len(self.clients)}") - ws_message = json.dumps({ - 'topic': message.topic, - 'payload': message.payload.decode() - }) + try: + ws_message = json.dumps({ + 'topic': message.topic, + 'payload': message.payload.decode() + }) + except: + # if the payload can't be decoded don't relay to clients + return + for client in self.clients: - client.send(ws_message) + try: + client.send(ws_message) + except: + self.clients.remove(client) - logger.info(f"Subscribing to {self.topic_prefix}/#") self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) def start(self): @@ -349,5 +359,9 @@ def echo_socket(socket): current_app.mqtt_backend.register(socket) while not socket.closed: - # Context switch while `ChatBackend.start` is running in the background. + # Sleep to prevent *constant* context-switches. gevent.sleep(0.1) + + message = socket.receive() + if message: + current_app.mqtt_backend.publish(message) diff --git a/frigate/mqtt.py b/frigate/mqtt.py index 7052025dc..13ca35163 100644 --- a/frigate/mqtt.py +++ b/frigate/mqtt.py @@ -118,8 +118,6 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics): client.publish(f"{mqtt_config.topic_prefix}/{name}/snapshots/state", 'ON' if config.cameras[name].snapshots.enabled else 'OFF', retain=True) client.publish(f"{mqtt_config.topic_prefix}/{name}/detect/state", 'ON' if config.cameras[name].detect.enabled else 'OFF', retain=True) - client.subscribe(f"{mqtt_config.topic_prefix}/+/clips/set") - client.subscribe(f"{mqtt_config.topic_prefix}/+/snapshots/set") - client.subscribe(f"{mqtt_config.topic_prefix}/+/detect/set") + client.subscribe(f"{mqtt_config.topic_prefix}/#") return client