From f95a49b81d595cd745d4347d73b38cc07cb3d894 Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 1 Sep 2025 12:57:02 -0400 Subject: [PATCH] implement automatic MQTT reconnection with 10s retry interval MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix connection state management: only set connected=True on successful connection - Add automatic reconnection loop that retries every 10 seconds indefinitely - Proper cleanup of reconnection thread on stop - Enhanced disconnect logging with reason codes - Thread-safe reconnection handling to avoid blocking main MQTT thread 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- frigate/comms/mqtt.py | 52 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index 0af56e259..ccc162980 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -1,5 +1,6 @@ import logging import threading +import time from typing import Any, Callable import paho.mqtt.client as mqtt @@ -18,6 +19,9 @@ class MqttClient(Communicator): self.config = config self.mqtt_config = config.mqtt self.connected = False + self._reconnect_thread = None + self._reconnect_delay = 10 # Retry every 10 seconds + self._stop_reconnect = False def subscribe(self, receiver: Callable) -> None: """Wrapper for allowing dispatcher to subscribe.""" @@ -38,7 +42,11 @@ class MqttClient(Communicator): ) def stop(self) -> None: - self.client.disconnect() + self._stop_reconnect = True + if self._reconnect_thread and self._reconnect_thread.is_alive(): + self._reconnect_thread.join(timeout=5) + if hasattr(self, 'client'): + self.client.disconnect() def _set_initial_topics(self) -> None: """Set initial state topics.""" @@ -176,6 +184,8 @@ class MqttClient(Communicator): "Unable to connect to MQTT server: Connection refused. Error code: " + reason_code.getName() ) + # Don't set connected = True on connection failure + return self.connected = True logger.debug("MQTT connected") @@ -192,7 +202,20 @@ class MqttClient(Communicator): ) -> None: """Mqtt disconnection callback.""" self.connected = False - logger.error("MQTT disconnected") + logger.error(f"MQTT disconnected (reason: {reason_code.getName()})") + + # Don't attempt reconnection if we're stopping or if it was a clean disconnect + if self._stop_reconnect or reason_code == 0: + return + + # Start reconnection in a separate thread to avoid blocking + if not self._reconnect_thread or not self._reconnect_thread.is_alive(): + self._reconnect_thread = threading.Thread( + target=self._reconnect_loop, + name="mqtt-reconnect", + daemon=True + ) + self._reconnect_thread.start() def _start(self) -> None: """Start mqtt client.""" @@ -281,3 +304,28 @@ class MqttClient(Communicator): except Exception as e: logger.error(f"Unable to connect to MQTT server: {e}") return + + def _reconnect_loop(self) -> None: + """Handle MQTT reconnection, retrying every 10 seconds indefinitely.""" + attempt = 0 + while not self._stop_reconnect and not self.connected: + attempt += 1 + + logger.debug(f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})") + + # Wait with ability to exit early if stopping + for _ in range(self._reconnect_delay): + if self._stop_reconnect: + return + time.sleep(1) + + if self._stop_reconnect: + break + + try: + logger.debug(f"Attempting MQTT reconnection (attempt {attempt})...") + self.client.reconnect() + break # Let the on_connect callback handle success + except Exception as e: + logger.error(f"MQTT reconnection attempt {attempt} failed: {e}") + # Continue the loop to retry