fix mypy errors with improved MQTT reliability implementation

- Add _reason_info helper method to extract reason names safely
- Update connect/disconnect callbacks to use string comparison instead of numeric
- Fix type annotations with proper Optional[Client] typing
- Resolve unreachable code warnings with type ignore comments for threading
- Improve error handling with robust try/finally blocks in stop method
- Maintain fresh client creation approach for reliable reconnection

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Dan 2025-09-01 18:08:01 -04:00
parent 3952035579
commit 44c07aac12

View File

@ -1,7 +1,7 @@
import logging import logging
import threading import threading
import time import time
from typing import Any, Callable from typing import Any, Callable, Optional
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion from paho.mqtt.enums import CallbackAPIVersion
@ -19,18 +19,20 @@ class MqttClient(Communicator):
self.config = config self.config = config
self.mqtt_config = config.mqtt self.mqtt_config = config.mqtt
self.connected = False self.connected = False
self._reconnect_thread = None self.client: Optional[mqtt.Client] = None
self._dispatcher: Callable[[str, str], None] = lambda *_: None
self._reconnect_thread: Optional[threading.Thread] = None
self._reconnect_delay = 10 # Retry every 10 seconds self._reconnect_delay = 10 # Retry every 10 seconds
self._stop_reconnect = False self._stop_reconnect: bool = False
def subscribe(self, receiver: Callable) -> None: def subscribe(self, receiver: Callable[[str, str], None]) -> None:
"""Wrapper for allowing dispatcher to subscribe.""" """Wrapper for allowing dispatcher to subscribe."""
self._dispatcher = receiver self._dispatcher = receiver
self._start() self._start()
def publish(self, topic: str, payload: Any, retain: bool = False) -> None: def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Wrapper for publishing when client is in valid state.""" """Wrapper for publishing when client is in valid state."""
if not self.connected: if not self.connected or self.client is None:
logger.debug(f"Unable to publish to {topic}: client is not connected") logger.debug(f"Unable to publish to {topic}: client is not connected")
return return
@ -43,10 +45,16 @@ class MqttClient(Communicator):
def stop(self) -> None: def stop(self) -> None:
self._stop_reconnect = True self._stop_reconnect = True
if self._reconnect_thread and self._reconnect_thread.is_alive(): if self._reconnect_thread is not None and self._reconnect_thread.is_alive():
self._reconnect_thread.join(timeout=5) self._reconnect_thread.join(timeout=5)
if hasattr(self, "client"): if self.client is not None:
self.client.disconnect() try:
self.client.disconnect()
finally:
try:
self.client.loop_stop()
except Exception:
pass
def _set_initial_topics(self) -> None: def _set_initial_topics(self) -> None:
"""Set initial state topics.""" """Set initial state topics."""
@ -150,6 +158,20 @@ class MqttClient(Communicator):
self.publish("available", "online", retain=True) self.publish("available", "online", retain=True)
@staticmethod
def _reason_info(reason_code: object) -> str:
"""Return human_readable_name for a Paho reason code."""
# Name string
if hasattr(reason_code, "getName") and callable(getattr(reason_code, "getName")):
try:
name = str(getattr(reason_code, "getName")())
except Exception:
name = str(reason_code)
else:
name = str(reason_code)
return name
def on_mqtt_command( def on_mqtt_command(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) -> None: ) -> None:
@ -163,27 +185,23 @@ class MqttClient(Communicator):
client: mqtt.Client, client: mqtt.Client,
userdata: Any, userdata: Any,
flags: Any, flags: Any,
reason_code: mqtt.ReasonCode, # type: ignore[name-defined] reason_code: object,
properties: Any, properties: Any,
) -> None: ) -> None:
"""Mqtt connection callback.""" """Mqtt connection callback."""
threading.current_thread().name = "mqtt" threading.current_thread().name = "mqtt"
if reason_code != 0: reason_name = self._reason_info(reason_code)
if reason_code == "Server unavailable":
logger.error( # Check for connection failure by comparing reason name
"Unable to connect to MQTT server: MQTT Server unavailable" if reason_name != "Success":
) if reason_name == "Server unavailable":
elif reason_code == "Bad user name or password": logger.error("Unable to connect to MQTT server: MQTT Server unavailable")
logger.error( elif reason_name == "Bad user name or password":
"Unable to connect to MQTT server: MQTT Bad username or password" logger.error("Unable to connect to MQTT server: MQTT Bad username or password")
) elif reason_name == "Not authorized":
elif reason_code == "Not authorized":
logger.error("Unable to connect to MQTT server: MQTT Not authorized") logger.error("Unable to connect to MQTT server: MQTT Not authorized")
else: else:
logger.error( logger.error(f"Unable to connect to MQTT server: Connection refused. Error: {reason_name}")
"Unable to connect to MQTT server: Connection refused. Error code: "
+ reason_code.getName()
)
# Don't set connected = True on connection failure # Don't set connected = True on connection failure
return return
@ -225,7 +243,7 @@ class MqttClient(Communicator):
logger.error("MQTT will attempt reconnection...") logger.error("MQTT will attempt reconnection...")
# Start reconnection in a separate thread to avoid blocking # Start reconnection in a separate thread to avoid blocking
if not self._reconnect_thread or not self._reconnect_thread.is_alive(): if self._reconnect_thread is None or not self._reconnect_thread.is_alive():
self._reconnect_thread = threading.Thread( self._reconnect_thread = threading.Thread(
target=self._reconnect_loop, name="mqtt-reconnect", daemon=True target=self._reconnect_loop, name="mqtt-reconnect", daemon=True
) )
@ -323,6 +341,7 @@ class MqttClient(Communicator):
"""Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely.""" """Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely."""
logger.error("MQTT reconnection loop started") logger.error("MQTT reconnection loop started")
attempt = 0 attempt = 0
while not self._stop_reconnect and not self.connected: while not self._stop_reconnect and not self.connected:
attempt += 1 attempt += 1
@ -331,15 +350,18 @@ class MqttClient(Communicator):
) )
# Wait with ability to exit early if stopping # Wait with ability to exit early if stopping
for _ in range(self._reconnect_delay): delay_count = 0
while delay_count < self._reconnect_delay:
if self._stop_reconnect: if self._stop_reconnect:
logger.error("MQTT reconnection stopped during delay") logger.error("MQTT reconnection stopped during delay") # type: ignore[unreachable]
return return
time.sleep(1) time.sleep(1)
delay_count += 1
# Double-check stop flag after delay
if self._stop_reconnect: if self._stop_reconnect:
logger.error("MQTT reconnection stopped after delay") logger.error("MQTT reconnection stopped after delay") # type: ignore[unreachable]
break return
try: try:
logger.error( logger.error(
@ -347,7 +369,7 @@ class MqttClient(Communicator):
) )
# Clean up old client if it exists # Clean up old client if it exists
if hasattr(self, "client"): if self.client is not None:
try: try:
self.client.disconnect() self.client.disconnect()
self.client.loop_stop() self.client.loop_stop()
@ -358,20 +380,22 @@ class MqttClient(Communicator):
self._start() self._start()
# Give the connection attempt some time to complete # Give the connection attempt some time to complete
for _ in range(5): # Wait up to 5 seconds for connection wait_count = 0
while wait_count < 5: # Wait up to 5 seconds for connection
if self.connected: if self.connected:
logger.error( logger.error( # type: ignore[unreachable]
f"MQTT fresh connection successful on attempt {attempt}!" f"MQTT fresh connection successful on attempt {attempt}!"
) )
return return
time.sleep(1) time.sleep(1)
wait_count += 1
logger.error( logger.error(
f"MQTT fresh connection attempt {attempt} timed out, will retry" f"MQTT fresh connection attempt {attempt} timed out, will retry"
) )
# Continue the loop to retry # Continue the outer while loop to retry
except Exception as e: except Exception as e:
logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}") logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}")
# Continue the loop to retry # Continue the outer while loop to retry
logger.error("MQTT reconnection loop finished") logger.error("MQTT reconnection loop finished")