diff --git a/frigate/http.py b/frigate/http.py index 4232fa3eb..752279a96 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -39,25 +39,39 @@ class MqttBackend(): self.clients.append(client) def publish(self, message): - json_message = json.loads(message) - self.mqtt_client.publish(f"{self.topic_prefix}/{json_message['topic']}", json_message['payload'], retain=json_message['retain']) + try: + json_message = json.loads(message) + json_message = { + 'topic': f"{self.topic_prefix}/{json_message['topic']}", + 'payload': json_message.get['payload'], + 'retain': json_message.get('retain', False) + } + except: + logger.warning("Unable to parse websocket message as valid json.") + return + + logger.debug(f"Publishing mqtt message from websockets at {json_message['topic']}.") + self.mqtt_client.publish(json_message['topic'], json_message['payload'], retain=json_message['retain']) def run(self): def send(client, userdata, message): """Sends mqtt messages to clients.""" try: + logger.debug(f"Received mqtt message on {message.topic}.") ws_message = json.dumps({ 'topic': message.topic.replace(f"{self.topic_prefix}/",""), 'payload': message.payload.decode() }) except: # if the payload can't be decoded don't relay to clients + logger.debug(f"MQTT payload for {message.topic} wasn't text. Skipping...") return for client in self.clients: try: client.send(ws_message) except: + logger.debug("Removing websocket client due to a closed connection.") self.clients.remove(client) self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send)