From 44c07aac12f52d5ec89854226b35ce43f1e82ffd Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 1 Sep 2025 18:08:01 -0400 Subject: [PATCH] fix mypy errors with improved MQTT reliability implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add _reason_info helper method to extract reason names safely - Update connect/disconnect callbacks to use string comparison instead of numeric - Fix type annotations with proper Optional[Client] typing - Resolve unreachable code warnings with type ignore comments for threading - Improve error handling with robust try/finally blocks in stop method - Maintain fresh client creation approach for reliable reconnection 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- frigate/comms/mqtt.py | 90 +++++++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 33 deletions(-) diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index 84271066a..b08225085 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -1,7 +1,7 @@ import logging import threading import time -from typing import Any, Callable +from typing import Any, Callable, Optional import paho.mqtt.client as mqtt from paho.mqtt.enums import CallbackAPIVersion @@ -19,18 +19,20 @@ class MqttClient(Communicator): self.config = config self.mqtt_config = config.mqtt self.connected = False - self._reconnect_thread = None + self.client: Optional[mqtt.Client] = None + self._dispatcher: Callable[[str, str], None] = lambda *_: None + self._reconnect_thread: Optional[threading.Thread] = None self._reconnect_delay = 10 # Retry every 10 seconds - self._stop_reconnect = False + self._stop_reconnect: bool = False - def subscribe(self, receiver: Callable) -> None: + def subscribe(self, receiver: Callable[[str, str], None]) -> None: """Wrapper for allowing dispatcher to subscribe.""" self._dispatcher = receiver self._start() def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Wrapper for publishing when client is in valid state.""" - if not self.connected: + if not self.connected or self.client is None: logger.debug(f"Unable to publish to {topic}: client is not connected") return @@ -43,10 +45,16 @@ class MqttClient(Communicator): def stop(self) -> None: self._stop_reconnect = True - if self._reconnect_thread and self._reconnect_thread.is_alive(): + if self._reconnect_thread is not None and self._reconnect_thread.is_alive(): self._reconnect_thread.join(timeout=5) - if hasattr(self, "client"): - self.client.disconnect() + if self.client is not None: + try: + self.client.disconnect() + finally: + try: + self.client.loop_stop() + except Exception: + pass def _set_initial_topics(self) -> None: """Set initial state topics.""" @@ -150,6 +158,20 @@ class MqttClient(Communicator): self.publish("available", "online", retain=True) + @staticmethod + def _reason_info(reason_code: object) -> str: + """Return human_readable_name for a Paho reason code.""" + # Name string + if hasattr(reason_code, "getName") and callable(getattr(reason_code, "getName")): + try: + name = str(getattr(reason_code, "getName")()) + except Exception: + name = str(reason_code) + else: + name = str(reason_code) + + return name + def on_mqtt_command( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: @@ -163,27 +185,23 @@ class MqttClient(Communicator): client: mqtt.Client, userdata: Any, flags: Any, - reason_code: mqtt.ReasonCode, # type: ignore[name-defined] + reason_code: object, properties: Any, ) -> None: """Mqtt connection callback.""" threading.current_thread().name = "mqtt" - if reason_code != 0: - if reason_code == "Server unavailable": - logger.error( - "Unable to connect to MQTT server: MQTT Server unavailable" - ) - elif reason_code == "Bad user name or password": - logger.error( - "Unable to connect to MQTT server: MQTT Bad username or password" - ) - elif reason_code == "Not authorized": + reason_name = self._reason_info(reason_code) + + # Check for connection failure by comparing reason name + if reason_name != "Success": + if reason_name == "Server unavailable": + logger.error("Unable to connect to MQTT server: MQTT Server unavailable") + elif reason_name == "Bad user name or password": + logger.error("Unable to connect to MQTT server: MQTT Bad username or password") + elif reason_name == "Not authorized": logger.error("Unable to connect to MQTT server: MQTT Not authorized") else: - logger.error( - "Unable to connect to MQTT server: Connection refused. Error code: " - + reason_code.getName() - ) + logger.error(f"Unable to connect to MQTT server: Connection refused. Error: {reason_name}") # Don't set connected = True on connection failure return @@ -225,7 +243,7 @@ class MqttClient(Communicator): logger.error("MQTT will attempt reconnection...") # Start reconnection in a separate thread to avoid blocking - if not self._reconnect_thread or not self._reconnect_thread.is_alive(): + if self._reconnect_thread is None or not self._reconnect_thread.is_alive(): self._reconnect_thread = threading.Thread( target=self._reconnect_loop, name="mqtt-reconnect", daemon=True ) @@ -323,6 +341,7 @@ class MqttClient(Communicator): """Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely.""" logger.error("MQTT reconnection loop started") attempt = 0 + while not self._stop_reconnect and not self.connected: attempt += 1 @@ -331,15 +350,18 @@ class MqttClient(Communicator): ) # Wait with ability to exit early if stopping - for _ in range(self._reconnect_delay): + delay_count = 0 + while delay_count < self._reconnect_delay: if self._stop_reconnect: - logger.error("MQTT reconnection stopped during delay") + logger.error("MQTT reconnection stopped during delay") # type: ignore[unreachable] return time.sleep(1) + delay_count += 1 + # Double-check stop flag after delay if self._stop_reconnect: - logger.error("MQTT reconnection stopped after delay") - break + logger.error("MQTT reconnection stopped after delay") # type: ignore[unreachable] + return try: logger.error( @@ -347,7 +369,7 @@ class MqttClient(Communicator): ) # Clean up old client if it exists - if hasattr(self, "client"): + if self.client is not None: try: self.client.disconnect() self.client.loop_stop() @@ -358,20 +380,22 @@ class MqttClient(Communicator): self._start() # Give the connection attempt some time to complete - for _ in range(5): # Wait up to 5 seconds for connection + wait_count = 0 + while wait_count < 5: # Wait up to 5 seconds for connection if self.connected: - logger.error( + logger.error( # type: ignore[unreachable] f"MQTT fresh connection successful on attempt {attempt}!" ) return time.sleep(1) + wait_count += 1 logger.error( f"MQTT fresh connection attempt {attempt} timed out, will retry" ) - # Continue the loop to retry + # Continue the outer while loop to retry except Exception as e: logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}") - # Continue the loop to retry + # Continue the outer while loop to retry logger.error("MQTT reconnection loop finished")