mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-09-05 17:51:36 +02:00
implement automatic MQTT reconnection with 10s retry interval
- Fix connection state management: only set connected=True on successful connection - Add automatic reconnection loop that retries every 10 seconds indefinitely - Proper cleanup of reconnection thread on stop - Enhanced disconnect logging with reason codes - Thread-safe reconnection handling to avoid blocking main MQTT thread 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
b86e6e484f
commit
f95a49b81d
@ -1,5 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
|
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
@ -18,6 +19,9 @@ class MqttClient(Communicator):
|
|||||||
self.config = config
|
self.config = config
|
||||||
self.mqtt_config = config.mqtt
|
self.mqtt_config = config.mqtt
|
||||||
self.connected = False
|
self.connected = False
|
||||||
|
self._reconnect_thread = None
|
||||||
|
self._reconnect_delay = 10 # Retry every 10 seconds
|
||||||
|
self._stop_reconnect = False
|
||||||
|
|
||||||
def subscribe(self, receiver: Callable) -> None:
|
def subscribe(self, receiver: Callable) -> None:
|
||||||
"""Wrapper for allowing dispatcher to subscribe."""
|
"""Wrapper for allowing dispatcher to subscribe."""
|
||||||
@ -38,7 +42,11 @@ class MqttClient(Communicator):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self) -> None:
|
||||||
self.client.disconnect()
|
self._stop_reconnect = True
|
||||||
|
if self._reconnect_thread and self._reconnect_thread.is_alive():
|
||||||
|
self._reconnect_thread.join(timeout=5)
|
||||||
|
if hasattr(self, 'client'):
|
||||||
|
self.client.disconnect()
|
||||||
|
|
||||||
def _set_initial_topics(self) -> None:
|
def _set_initial_topics(self) -> None:
|
||||||
"""Set initial state topics."""
|
"""Set initial state topics."""
|
||||||
@ -176,6 +184,8 @@ class MqttClient(Communicator):
|
|||||||
"Unable to connect to MQTT server: Connection refused. Error code: "
|
"Unable to connect to MQTT server: Connection refused. Error code: "
|
||||||
+ reason_code.getName()
|
+ reason_code.getName()
|
||||||
)
|
)
|
||||||
|
# Don't set connected = True on connection failure
|
||||||
|
return
|
||||||
|
|
||||||
self.connected = True
|
self.connected = True
|
||||||
logger.debug("MQTT connected")
|
logger.debug("MQTT connected")
|
||||||
@ -192,7 +202,20 @@ class MqttClient(Communicator):
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Mqtt disconnection callback."""
|
"""Mqtt disconnection callback."""
|
||||||
self.connected = False
|
self.connected = False
|
||||||
logger.error("MQTT disconnected")
|
logger.error(f"MQTT disconnected (reason: {reason_code.getName()})")
|
||||||
|
|
||||||
|
# Don't attempt reconnection if we're stopping or if it was a clean disconnect
|
||||||
|
if self._stop_reconnect or reason_code == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Start reconnection in a separate thread to avoid blocking
|
||||||
|
if not self._reconnect_thread or not self._reconnect_thread.is_alive():
|
||||||
|
self._reconnect_thread = threading.Thread(
|
||||||
|
target=self._reconnect_loop,
|
||||||
|
name="mqtt-reconnect",
|
||||||
|
daemon=True
|
||||||
|
)
|
||||||
|
self._reconnect_thread.start()
|
||||||
|
|
||||||
def _start(self) -> None:
|
def _start(self) -> None:
|
||||||
"""Start mqtt client."""
|
"""Start mqtt client."""
|
||||||
@ -281,3 +304,28 @@ class MqttClient(Communicator):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Unable to connect to MQTT server: {e}")
|
logger.error(f"Unable to connect to MQTT server: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def _reconnect_loop(self) -> None:
|
||||||
|
"""Handle MQTT reconnection, retrying every 10 seconds indefinitely."""
|
||||||
|
attempt = 0
|
||||||
|
while not self._stop_reconnect and not self.connected:
|
||||||
|
attempt += 1
|
||||||
|
|
||||||
|
logger.debug(f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})")
|
||||||
|
|
||||||
|
# Wait with ability to exit early if stopping
|
||||||
|
for _ in range(self._reconnect_delay):
|
||||||
|
if self._stop_reconnect:
|
||||||
|
return
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
if self._stop_reconnect:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.debug(f"Attempting MQTT reconnection (attempt {attempt})...")
|
||||||
|
self.client.reconnect()
|
||||||
|
break # Let the on_connect callback handle success
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"MQTT reconnection attempt {attempt} failed: {e}")
|
||||||
|
# Continue the loop to retry
|
||||||
|
Loading…
Reference in New Issue
Block a user