From f95a49b81d595cd745d4347d73b38cc07cb3d894 Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 1 Sep 2025 12:57:02 -0400 Subject: [PATCH 1/5] implement automatic MQTT reconnection with 10s retry interval MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix connection state management: only set connected=True on successful connection - Add automatic reconnection loop that retries every 10 seconds indefinitely - Proper cleanup of reconnection thread on stop - Enhanced disconnect logging with reason codes - Thread-safe reconnection handling to avoid blocking main MQTT thread 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- frigate/comms/mqtt.py | 52 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index 0af56e259..ccc162980 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -1,5 +1,6 @@ import logging import threading +import time from typing import Any, Callable import paho.mqtt.client as mqtt @@ -18,6 +19,9 @@ class MqttClient(Communicator): self.config = config self.mqtt_config = config.mqtt self.connected = False + self._reconnect_thread = None + self._reconnect_delay = 10 # Retry every 10 seconds + self._stop_reconnect = False def subscribe(self, receiver: Callable) -> None: """Wrapper for allowing dispatcher to subscribe.""" @@ -38,7 +42,11 @@ class MqttClient(Communicator): ) def stop(self) -> None: - self.client.disconnect() + self._stop_reconnect = True + if self._reconnect_thread and self._reconnect_thread.is_alive(): + self._reconnect_thread.join(timeout=5) + if hasattr(self, 'client'): + self.client.disconnect() def _set_initial_topics(self) -> None: """Set initial state topics.""" @@ -176,6 +184,8 @@ class MqttClient(Communicator): "Unable to connect to MQTT server: Connection refused. Error code: " + reason_code.getName() ) + # Don't set connected = True on connection failure + return self.connected = True logger.debug("MQTT connected") @@ -192,7 +202,20 @@ class MqttClient(Communicator): ) -> None: """Mqtt disconnection callback.""" self.connected = False - logger.error("MQTT disconnected") + logger.error(f"MQTT disconnected (reason: {reason_code.getName()})") + + # Don't attempt reconnection if we're stopping or if it was a clean disconnect + if self._stop_reconnect or reason_code == 0: + return + + # Start reconnection in a separate thread to avoid blocking + if not self._reconnect_thread or not self._reconnect_thread.is_alive(): + self._reconnect_thread = threading.Thread( + target=self._reconnect_loop, + name="mqtt-reconnect", + daemon=True + ) + self._reconnect_thread.start() def _start(self) -> None: """Start mqtt client.""" @@ -281,3 +304,28 @@ class MqttClient(Communicator): except Exception as e: logger.error(f"Unable to connect to MQTT server: {e}") return + + def _reconnect_loop(self) -> None: + """Handle MQTT reconnection, retrying every 10 seconds indefinitely.""" + attempt = 0 + while not self._stop_reconnect and not self.connected: + attempt += 1 + + logger.debug(f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})") + + # Wait with ability to exit early if stopping + for _ in range(self._reconnect_delay): + if self._stop_reconnect: + return + time.sleep(1) + + if self._stop_reconnect: + break + + try: + logger.debug(f"Attempting MQTT reconnection (attempt {attempt})...") + self.client.reconnect() + break # Let the on_connect callback handle success + except Exception as e: + logger.error(f"MQTT reconnection attempt {attempt} failed: {e}") + # Continue the loop to retry From a45915517f2a55e24e3cb08c21a4847f38ac20bb Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 1 Sep 2025 15:32:04 -0400 Subject: [PATCH 2/5] improve MQTT reconnection with fresh client creation approach MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace client.reconnect() with fresh client creation for each retry attempt - Add proper cleanup of old client before creating new one - Enhanced logging with detailed debugging info for disconnect reasons - Use proven aiomqtt-style retry pattern for better reliability - Each reconnection attempt now creates completely new MQTT client instance 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- frigate/comms/mqtt.py | 52 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index ccc162980..a47175163 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -202,12 +202,22 @@ class MqttClient(Communicator): ) -> None: """Mqtt disconnection callback.""" self.connected = False - logger.error(f"MQTT disconnected (reason: {reason_code.getName()})") + # Debug reason code thoroughly + reason_name = reason_code.getName() if hasattr(reason_code, 'getName') else str(reason_code) + reason_value = getattr(reason_code, 'value', reason_code) + logger.error(f"MQTT disconnected - reason: '{reason_name}', code: {reason_value}, type: {type(reason_code)}") # Don't attempt reconnection if we're stopping or if it was a clean disconnect - if self._stop_reconnect or reason_code == 0: + if self._stop_reconnect: + logger.error("MQTT not reconnecting - stop flag set") return + if reason_code == 0: + logger.error("MQTT not reconnecting - clean disconnect (code 0)") + return + + logger.error("MQTT will attempt reconnection...") + # Start reconnection in a separate thread to avoid blocking if not self._reconnect_thread or not self._reconnect_thread.is_alive(): self._reconnect_thread = threading.Thread( @@ -306,26 +316,50 @@ class MqttClient(Communicator): return def _reconnect_loop(self) -> None: - """Handle MQTT reconnection, retrying every 10 seconds indefinitely.""" + """Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely.""" + logger.error("MQTT reconnection loop started") attempt = 0 while not self._stop_reconnect and not self.connected: attempt += 1 - logger.debug(f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})") + logger.error(f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})") # Wait with ability to exit early if stopping for _ in range(self._reconnect_delay): if self._stop_reconnect: + logger.error("MQTT reconnection stopped during delay") return time.sleep(1) if self._stop_reconnect: + logger.error("MQTT reconnection stopped after delay") break try: - logger.debug(f"Attempting MQTT reconnection (attempt {attempt})...") - self.client.reconnect() - break # Let the on_connect callback handle success - except Exception as e: - logger.error(f"MQTT reconnection attempt {attempt} failed: {e}") + logger.error(f"Creating fresh MQTT client for reconnection attempt {attempt}...") + + # Clean up old client if it exists + if hasattr(self, 'client'): + try: + self.client.disconnect() + self.client.loop_stop() + except Exception: + pass # Ignore cleanup errors + + # Create completely fresh client and attempt connection + self._start() + + # Give the connection attempt some time to complete + for _ in range(5): # Wait up to 5 seconds for connection + if self.connected: + logger.error(f"MQTT fresh connection successful on attempt {attempt}!") + return + time.sleep(1) + + logger.error(f"MQTT fresh connection attempt {attempt} timed out, will retry") # Continue the loop to retry + except Exception as e: + logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}") + # Continue the loop to retry + + logger.error("MQTT reconnection loop finished") From 3952035579bd42d61746b2c2ad72d8ee05f10f66 Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 1 Sep 2025 15:54:48 -0400 Subject: [PATCH 3/5] format MQTT code with ruff --- frigate/comms/mqtt.py | 62 ++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index a47175163..84271066a 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -45,7 +45,7 @@ class MqttClient(Communicator): self._stop_reconnect = True if self._reconnect_thread and self._reconnect_thread.is_alive(): self._reconnect_thread.join(timeout=5) - if hasattr(self, 'client'): + if hasattr(self, "client"): self.client.disconnect() def _set_initial_topics(self) -> None: @@ -203,27 +203,31 @@ class MqttClient(Communicator): """Mqtt disconnection callback.""" self.connected = False # Debug reason code thoroughly - reason_name = reason_code.getName() if hasattr(reason_code, 'getName') else str(reason_code) - reason_value = getattr(reason_code, 'value', reason_code) - logger.error(f"MQTT disconnected - reason: '{reason_name}', code: {reason_value}, type: {type(reason_code)}") - + reason_name = ( + reason_code.getName() + if hasattr(reason_code, "getName") + else str(reason_code) + ) + reason_value = getattr(reason_code, "value", reason_code) + logger.error( + f"MQTT disconnected - reason: '{reason_name}', code: {reason_value}, type: {type(reason_code)}" + ) + # Don't attempt reconnection if we're stopping or if it was a clean disconnect if self._stop_reconnect: logger.error("MQTT not reconnecting - stop flag set") return - + if reason_code == 0: logger.error("MQTT not reconnecting - clean disconnect (code 0)") return - + logger.error("MQTT will attempt reconnection...") - + # Start reconnection in a separate thread to avoid blocking if not self._reconnect_thread or not self._reconnect_thread.is_alive(): self._reconnect_thread = threading.Thread( - target=self._reconnect_loop, - name="mqtt-reconnect", - daemon=True + target=self._reconnect_loop, name="mqtt-reconnect", daemon=True ) self._reconnect_thread.start() @@ -321,45 +325,53 @@ class MqttClient(Communicator): attempt = 0 while not self._stop_reconnect and not self.connected: attempt += 1 - - logger.error(f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})") - + + logger.error( + f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})" + ) + # Wait with ability to exit early if stopping for _ in range(self._reconnect_delay): if self._stop_reconnect: logger.error("MQTT reconnection stopped during delay") return time.sleep(1) - + if self._stop_reconnect: logger.error("MQTT reconnection stopped after delay") break - + try: - logger.error(f"Creating fresh MQTT client for reconnection attempt {attempt}...") - + logger.error( + f"Creating fresh MQTT client for reconnection attempt {attempt}..." + ) + # Clean up old client if it exists - if hasattr(self, 'client'): + if hasattr(self, "client"): try: self.client.disconnect() self.client.loop_stop() except Exception: pass # Ignore cleanup errors - + # Create completely fresh client and attempt connection self._start() - + # Give the connection attempt some time to complete for _ in range(5): # Wait up to 5 seconds for connection if self.connected: - logger.error(f"MQTT fresh connection successful on attempt {attempt}!") + logger.error( + f"MQTT fresh connection successful on attempt {attempt}!" + ) return time.sleep(1) - - logger.error(f"MQTT fresh connection attempt {attempt} timed out, will retry") + + logger.error( + f"MQTT fresh connection attempt {attempt} timed out, will retry" + ) # Continue the loop to retry except Exception as e: logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}") # Continue the loop to retry - + logger.error("MQTT reconnection loop finished") From 44c07aac12f52d5ec89854226b35ce43f1e82ffd Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 1 Sep 2025 18:08:01 -0400 Subject: [PATCH 4/5] fix mypy errors with improved MQTT reliability implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add _reason_info helper method to extract reason names safely - Update connect/disconnect callbacks to use string comparison instead of numeric - Fix type annotations with proper Optional[Client] typing - Resolve unreachable code warnings with type ignore comments for threading - Improve error handling with robust try/finally blocks in stop method - Maintain fresh client creation approach for reliable reconnection 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- frigate/comms/mqtt.py | 90 +++++++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 33 deletions(-) diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index 84271066a..b08225085 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -1,7 +1,7 @@ import logging import threading import time -from typing import Any, Callable +from typing import Any, Callable, Optional import paho.mqtt.client as mqtt from paho.mqtt.enums import CallbackAPIVersion @@ -19,18 +19,20 @@ class MqttClient(Communicator): self.config = config self.mqtt_config = config.mqtt self.connected = False - self._reconnect_thread = None + self.client: Optional[mqtt.Client] = None + self._dispatcher: Callable[[str, str], None] = lambda *_: None + self._reconnect_thread: Optional[threading.Thread] = None self._reconnect_delay = 10 # Retry every 10 seconds - self._stop_reconnect = False + self._stop_reconnect: bool = False - def subscribe(self, receiver: Callable) -> None: + def subscribe(self, receiver: Callable[[str, str], None]) -> None: """Wrapper for allowing dispatcher to subscribe.""" self._dispatcher = receiver self._start() def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Wrapper for publishing when client is in valid state.""" - if not self.connected: + if not self.connected or self.client is None: logger.debug(f"Unable to publish to {topic}: client is not connected") return @@ -43,10 +45,16 @@ class MqttClient(Communicator): def stop(self) -> None: self._stop_reconnect = True - if self._reconnect_thread and self._reconnect_thread.is_alive(): + if self._reconnect_thread is not None and self._reconnect_thread.is_alive(): self._reconnect_thread.join(timeout=5) - if hasattr(self, "client"): - self.client.disconnect() + if self.client is not None: + try: + self.client.disconnect() + finally: + try: + self.client.loop_stop() + except Exception: + pass def _set_initial_topics(self) -> None: """Set initial state topics.""" @@ -150,6 +158,20 @@ class MqttClient(Communicator): self.publish("available", "online", retain=True) + @staticmethod + def _reason_info(reason_code: object) -> str: + """Return human_readable_name for a Paho reason code.""" + # Name string + if hasattr(reason_code, "getName") and callable(getattr(reason_code, "getName")): + try: + name = str(getattr(reason_code, "getName")()) + except Exception: + name = str(reason_code) + else: + name = str(reason_code) + + return name + def on_mqtt_command( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: @@ -163,27 +185,23 @@ class MqttClient(Communicator): client: mqtt.Client, userdata: Any, flags: Any, - reason_code: mqtt.ReasonCode, # type: ignore[name-defined] + reason_code: object, properties: Any, ) -> None: """Mqtt connection callback.""" threading.current_thread().name = "mqtt" - if reason_code != 0: - if reason_code == "Server unavailable": - logger.error( - "Unable to connect to MQTT server: MQTT Server unavailable" - ) - elif reason_code == "Bad user name or password": - logger.error( - "Unable to connect to MQTT server: MQTT Bad username or password" - ) - elif reason_code == "Not authorized": + reason_name = self._reason_info(reason_code) + + # Check for connection failure by comparing reason name + if reason_name != "Success": + if reason_name == "Server unavailable": + logger.error("Unable to connect to MQTT server: MQTT Server unavailable") + elif reason_name == "Bad user name or password": + logger.error("Unable to connect to MQTT server: MQTT Bad username or password") + elif reason_name == "Not authorized": logger.error("Unable to connect to MQTT server: MQTT Not authorized") else: - logger.error( - "Unable to connect to MQTT server: Connection refused. Error code: " - + reason_code.getName() - ) + logger.error(f"Unable to connect to MQTT server: Connection refused. Error: {reason_name}") # Don't set connected = True on connection failure return @@ -225,7 +243,7 @@ class MqttClient(Communicator): logger.error("MQTT will attempt reconnection...") # Start reconnection in a separate thread to avoid blocking - if not self._reconnect_thread or not self._reconnect_thread.is_alive(): + if self._reconnect_thread is None or not self._reconnect_thread.is_alive(): self._reconnect_thread = threading.Thread( target=self._reconnect_loop, name="mqtt-reconnect", daemon=True ) @@ -323,6 +341,7 @@ class MqttClient(Communicator): """Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely.""" logger.error("MQTT reconnection loop started") attempt = 0 + while not self._stop_reconnect and not self.connected: attempt += 1 @@ -331,15 +350,18 @@ class MqttClient(Communicator): ) # Wait with ability to exit early if stopping - for _ in range(self._reconnect_delay): + delay_count = 0 + while delay_count < self._reconnect_delay: if self._stop_reconnect: - logger.error("MQTT reconnection stopped during delay") + logger.error("MQTT reconnection stopped during delay") # type: ignore[unreachable] return time.sleep(1) + delay_count += 1 + # Double-check stop flag after delay if self._stop_reconnect: - logger.error("MQTT reconnection stopped after delay") - break + logger.error("MQTT reconnection stopped after delay") # type: ignore[unreachable] + return try: logger.error( @@ -347,7 +369,7 @@ class MqttClient(Communicator): ) # Clean up old client if it exists - if hasattr(self, "client"): + if self.client is not None: try: self.client.disconnect() self.client.loop_stop() @@ -358,20 +380,22 @@ class MqttClient(Communicator): self._start() # Give the connection attempt some time to complete - for _ in range(5): # Wait up to 5 seconds for connection + wait_count = 0 + while wait_count < 5: # Wait up to 5 seconds for connection if self.connected: - logger.error( + logger.error( # type: ignore[unreachable] f"MQTT fresh connection successful on attempt {attempt}!" ) return time.sleep(1) + wait_count += 1 logger.error( f"MQTT fresh connection attempt {attempt} timed out, will retry" ) - # Continue the loop to retry + # Continue the outer while loop to retry except Exception as e: logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}") - # Continue the loop to retry + # Continue the outer while loop to retry logger.error("MQTT reconnection loop finished") From 91d3a7b245fa9e29f2c59cd8915bedc5e5894f4e Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 1 Sep 2025 18:12:51 -0400 Subject: [PATCH 5/5] format MQTT code with ruff --- frigate/comms/mqtt.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index b08225085..0334b3eb9 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -162,7 +162,9 @@ class MqttClient(Communicator): def _reason_info(reason_code: object) -> str: """Return human_readable_name for a Paho reason code.""" # Name string - if hasattr(reason_code, "getName") and callable(getattr(reason_code, "getName")): + if hasattr(reason_code, "getName") and callable( + getattr(reason_code, "getName") + ): try: name = str(getattr(reason_code, "getName")()) except Exception: @@ -195,13 +197,19 @@ class MqttClient(Communicator): # Check for connection failure by comparing reason name if reason_name != "Success": if reason_name == "Server unavailable": - logger.error("Unable to connect to MQTT server: MQTT Server unavailable") + logger.error( + "Unable to connect to MQTT server: MQTT Server unavailable" + ) elif reason_name == "Bad user name or password": - logger.error("Unable to connect to MQTT server: MQTT Bad username or password") + logger.error( + "Unable to connect to MQTT server: MQTT Bad username or password" + ) elif reason_name == "Not authorized": logger.error("Unable to connect to MQTT server: MQTT Not authorized") else: - logger.error(f"Unable to connect to MQTT server: Connection refused. Error: {reason_name}") + logger.error( + f"Unable to connect to MQTT server: Connection refused. Error: {reason_name}" + ) # Don't set connected = True on connection failure return @@ -341,7 +349,7 @@ class MqttClient(Communicator): """Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely.""" logger.error("MQTT reconnection loop started") attempt = 0 - + while not self._stop_reconnect and not self.connected: attempt += 1