relay messages from sockets to mqtt

This commit is contained in:
Blake Blackshear 2021-02-13 09:54:20 -06:00
parent 718b4f3fd7
commit eed8463832
2 changed files with 23 additions and 11 deletions

View File

@ -38,18 +38,28 @@ class MqttBackend():
"""Register a WebSocket connection for Mqtt updates.""" """Register a WebSocket connection for Mqtt updates."""
self.clients.append(client) self.clients.append(client)
def publish(self, message):
json_message = json.loads(message)
self.mqtt_client.publish(json_message['topic'], json_message['payload'], retain=json_message['retain'])
def run(self): def run(self):
def send(client, userdata, message): def send(client, userdata, message):
"""Sends mqtt messages to clients.""" """Sends mqtt messages to clients."""
logger.info(f"Sending mqtt to ws clients {len(self.clients)}") try:
ws_message = json.dumps({ ws_message = json.dumps({
'topic': message.topic, 'topic': message.topic,
'payload': message.payload.decode() 'payload': message.payload.decode()
}) })
for client in self.clients: except:
client.send(ws_message) # if the payload can't be decoded don't relay to clients
return
for client in self.clients:
try:
client.send(ws_message)
except:
self.clients.remove(client)
logger.info(f"Subscribing to {self.topic_prefix}/#")
self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send)
def start(self): def start(self):
@ -349,5 +359,9 @@ def echo_socket(socket):
current_app.mqtt_backend.register(socket) current_app.mqtt_backend.register(socket)
while not socket.closed: while not socket.closed:
# Context switch while `ChatBackend.start` is running in the background. # Sleep to prevent *constant* context-switches.
gevent.sleep(0.1) gevent.sleep(0.1)
message = socket.receive()
if message:
current_app.mqtt_backend.publish(message)

View File

@ -118,8 +118,6 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics):
client.publish(f"{mqtt_config.topic_prefix}/{name}/snapshots/state", 'ON' if config.cameras[name].snapshots.enabled else 'OFF', retain=True) client.publish(f"{mqtt_config.topic_prefix}/{name}/snapshots/state", 'ON' if config.cameras[name].snapshots.enabled else 'OFF', retain=True)
client.publish(f"{mqtt_config.topic_prefix}/{name}/detect/state", 'ON' if config.cameras[name].detect.enabled else 'OFF', retain=True) client.publish(f"{mqtt_config.topic_prefix}/{name}/detect/state", 'ON' if config.cameras[name].detect.enabled else 'OFF', retain=True)
client.subscribe(f"{mqtt_config.topic_prefix}/+/clips/set") client.subscribe(f"{mqtt_config.topic_prefix}/#")
client.subscribe(f"{mqtt_config.topic_prefix}/+/snapshots/set")
client.subscribe(f"{mqtt_config.topic_prefix}/+/detect/set")
return client return client