diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index 0af56e259..0334b3eb9 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -1,6 +1,7 @@ import logging import threading -from typing import Any, Callable +import time +from typing import Any, Callable, Optional import paho.mqtt.client as mqtt from paho.mqtt.enums import CallbackAPIVersion @@ -18,15 +19,20 @@ class MqttClient(Communicator): self.config = config self.mqtt_config = config.mqtt self.connected = False + self.client: Optional[mqtt.Client] = None + self._dispatcher: Callable[[str, str], None] = lambda *_: None + self._reconnect_thread: Optional[threading.Thread] = None + self._reconnect_delay = 10 # Retry every 10 seconds + self._stop_reconnect: bool = False - def subscribe(self, receiver: Callable) -> None: + def subscribe(self, receiver: Callable[[str, str], None]) -> None: """Wrapper for allowing dispatcher to subscribe.""" self._dispatcher = receiver self._start() def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Wrapper for publishing when client is in valid state.""" - if not self.connected: + if not self.connected or self.client is None: logger.debug(f"Unable to publish to {topic}: client is not connected") return @@ -38,7 +44,17 @@ class MqttClient(Communicator): ) def stop(self) -> None: - self.client.disconnect() + self._stop_reconnect = True + if self._reconnect_thread is not None and self._reconnect_thread.is_alive(): + self._reconnect_thread.join(timeout=5) + if self.client is not None: + try: + self.client.disconnect() + finally: + try: + self.client.loop_stop() + except Exception: + pass def _set_initial_topics(self) -> None: """Set initial state topics.""" @@ -142,6 +158,22 @@ class MqttClient(Communicator): self.publish("available", "online", retain=True) + @staticmethod + def _reason_info(reason_code: object) -> str: + """Return human_readable_name for a Paho reason code.""" + # Name string + if hasattr(reason_code, "getName") and callable( + getattr(reason_code, "getName") + ): + try: + name = str(getattr(reason_code, "getName")()) + except Exception: + name = str(reason_code) + else: + name = str(reason_code) + + return name + def on_mqtt_command( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: @@ -155,27 +187,31 @@ class MqttClient(Communicator): client: mqtt.Client, userdata: Any, flags: Any, - reason_code: mqtt.ReasonCode, # type: ignore[name-defined] + reason_code: object, properties: Any, ) -> None: """Mqtt connection callback.""" threading.current_thread().name = "mqtt" - if reason_code != 0: - if reason_code == "Server unavailable": + reason_name = self._reason_info(reason_code) + + # Check for connection failure by comparing reason name + if reason_name != "Success": + if reason_name == "Server unavailable": logger.error( "Unable to connect to MQTT server: MQTT Server unavailable" ) - elif reason_code == "Bad user name or password": + elif reason_name == "Bad user name or password": logger.error( "Unable to connect to MQTT server: MQTT Bad username or password" ) - elif reason_code == "Not authorized": + elif reason_name == "Not authorized": logger.error("Unable to connect to MQTT server: MQTT Not authorized") else: logger.error( - "Unable to connect to MQTT server: Connection refused. Error code: " - + reason_code.getName() + f"Unable to connect to MQTT server: Connection refused. Error: {reason_name}" ) + # Don't set connected = True on connection failure + return self.connected = True logger.debug("MQTT connected") @@ -192,7 +228,34 @@ class MqttClient(Communicator): ) -> None: """Mqtt disconnection callback.""" self.connected = False - logger.error("MQTT disconnected") + # Debug reason code thoroughly + reason_name = ( + reason_code.getName() + if hasattr(reason_code, "getName") + else str(reason_code) + ) + reason_value = getattr(reason_code, "value", reason_code) + logger.error( + f"MQTT disconnected - reason: '{reason_name}', code: {reason_value}, type: {type(reason_code)}" + ) + + # Don't attempt reconnection if we're stopping or if it was a clean disconnect + if self._stop_reconnect: + logger.error("MQTT not reconnecting - stop flag set") + return + + if reason_code == 0: + logger.error("MQTT not reconnecting - clean disconnect (code 0)") + return + + logger.error("MQTT will attempt reconnection...") + + # Start reconnection in a separate thread to avoid blocking + if self._reconnect_thread is None or not self._reconnect_thread.is_alive(): + self._reconnect_thread = threading.Thread( + target=self._reconnect_loop, name="mqtt-reconnect", daemon=True + ) + self._reconnect_thread.start() def _start(self) -> None: """Start mqtt client.""" @@ -281,3 +344,66 @@ class MqttClient(Communicator): except Exception as e: logger.error(f"Unable to connect to MQTT server: {e}") return + + def _reconnect_loop(self) -> None: + """Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely.""" + logger.error("MQTT reconnection loop started") + attempt = 0 + + while not self._stop_reconnect and not self.connected: + attempt += 1 + + logger.error( + f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})" + ) + + # Wait with ability to exit early if stopping + delay_count = 0 + while delay_count < self._reconnect_delay: + if self._stop_reconnect: + logger.error("MQTT reconnection stopped during delay") # type: ignore[unreachable] + return + time.sleep(1) + delay_count += 1 + + # Double-check stop flag after delay + if self._stop_reconnect: + logger.error("MQTT reconnection stopped after delay") # type: ignore[unreachable] + return + + try: + logger.error( + f"Creating fresh MQTT client for reconnection attempt {attempt}..." + ) + + # Clean up old client if it exists + if self.client is not None: + try: + self.client.disconnect() + self.client.loop_stop() + except Exception: + pass # Ignore cleanup errors + + # Create completely fresh client and attempt connection + self._start() + + # Give the connection attempt some time to complete + wait_count = 0 + while wait_count < 5: # Wait up to 5 seconds for connection + if self.connected: + logger.error( # type: ignore[unreachable] + f"MQTT fresh connection successful on attempt {attempt}!" + ) + return + time.sleep(1) + wait_count += 1 + + logger.error( + f"MQTT fresh connection attempt {attempt} timed out, will retry" + ) + # Continue the outer while loop to retry + except Exception as e: + logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}") + # Continue the outer while loop to retry + + logger.error("MQTT reconnection loop finished")