mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-09-05 17:51:36 +02:00
Merge 91d3a7b245
into fd6e7afea9
This commit is contained in:
commit
01056cf07b
@ -1,6 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
from typing import Any, Callable
|
import time
|
||||||
|
from typing import Any, Callable, Optional
|
||||||
|
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
from paho.mqtt.enums import CallbackAPIVersion
|
from paho.mqtt.enums import CallbackAPIVersion
|
||||||
@ -18,15 +19,20 @@ class MqttClient(Communicator):
|
|||||||
self.config = config
|
self.config = config
|
||||||
self.mqtt_config = config.mqtt
|
self.mqtt_config = config.mqtt
|
||||||
self.connected = False
|
self.connected = False
|
||||||
|
self.client: Optional[mqtt.Client] = None
|
||||||
|
self._dispatcher: Callable[[str, str], None] = lambda *_: None
|
||||||
|
self._reconnect_thread: Optional[threading.Thread] = None
|
||||||
|
self._reconnect_delay = 10 # Retry every 10 seconds
|
||||||
|
self._stop_reconnect: bool = False
|
||||||
|
|
||||||
def subscribe(self, receiver: Callable) -> None:
|
def subscribe(self, receiver: Callable[[str, str], None]) -> None:
|
||||||
"""Wrapper for allowing dispatcher to subscribe."""
|
"""Wrapper for allowing dispatcher to subscribe."""
|
||||||
self._dispatcher = receiver
|
self._dispatcher = receiver
|
||||||
self._start()
|
self._start()
|
||||||
|
|
||||||
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
|
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
|
||||||
"""Wrapper for publishing when client is in valid state."""
|
"""Wrapper for publishing when client is in valid state."""
|
||||||
if not self.connected:
|
if not self.connected or self.client is None:
|
||||||
logger.debug(f"Unable to publish to {topic}: client is not connected")
|
logger.debug(f"Unable to publish to {topic}: client is not connected")
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -38,7 +44,17 @@ class MqttClient(Communicator):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self) -> None:
|
||||||
self.client.disconnect()
|
self._stop_reconnect = True
|
||||||
|
if self._reconnect_thread is not None and self._reconnect_thread.is_alive():
|
||||||
|
self._reconnect_thread.join(timeout=5)
|
||||||
|
if self.client is not None:
|
||||||
|
try:
|
||||||
|
self.client.disconnect()
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
self.client.loop_stop()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def _set_initial_topics(self) -> None:
|
def _set_initial_topics(self) -> None:
|
||||||
"""Set initial state topics."""
|
"""Set initial state topics."""
|
||||||
@ -142,6 +158,22 @@ class MqttClient(Communicator):
|
|||||||
|
|
||||||
self.publish("available", "online", retain=True)
|
self.publish("available", "online", retain=True)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _reason_info(reason_code: object) -> str:
|
||||||
|
"""Return human_readable_name for a Paho reason code."""
|
||||||
|
# Name string
|
||||||
|
if hasattr(reason_code, "getName") and callable(
|
||||||
|
getattr(reason_code, "getName")
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
name = str(getattr(reason_code, "getName")())
|
||||||
|
except Exception:
|
||||||
|
name = str(reason_code)
|
||||||
|
else:
|
||||||
|
name = str(reason_code)
|
||||||
|
|
||||||
|
return name
|
||||||
|
|
||||||
def on_mqtt_command(
|
def on_mqtt_command(
|
||||||
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
|
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -155,27 +187,31 @@ class MqttClient(Communicator):
|
|||||||
client: mqtt.Client,
|
client: mqtt.Client,
|
||||||
userdata: Any,
|
userdata: Any,
|
||||||
flags: Any,
|
flags: Any,
|
||||||
reason_code: mqtt.ReasonCode, # type: ignore[name-defined]
|
reason_code: object,
|
||||||
properties: Any,
|
properties: Any,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Mqtt connection callback."""
|
"""Mqtt connection callback."""
|
||||||
threading.current_thread().name = "mqtt"
|
threading.current_thread().name = "mqtt"
|
||||||
if reason_code != 0:
|
reason_name = self._reason_info(reason_code)
|
||||||
if reason_code == "Server unavailable":
|
|
||||||
|
# Check for connection failure by comparing reason name
|
||||||
|
if reason_name != "Success":
|
||||||
|
if reason_name == "Server unavailable":
|
||||||
logger.error(
|
logger.error(
|
||||||
"Unable to connect to MQTT server: MQTT Server unavailable"
|
"Unable to connect to MQTT server: MQTT Server unavailable"
|
||||||
)
|
)
|
||||||
elif reason_code == "Bad user name or password":
|
elif reason_name == "Bad user name or password":
|
||||||
logger.error(
|
logger.error(
|
||||||
"Unable to connect to MQTT server: MQTT Bad username or password"
|
"Unable to connect to MQTT server: MQTT Bad username or password"
|
||||||
)
|
)
|
||||||
elif reason_code == "Not authorized":
|
elif reason_name == "Not authorized":
|
||||||
logger.error("Unable to connect to MQTT server: MQTT Not authorized")
|
logger.error("Unable to connect to MQTT server: MQTT Not authorized")
|
||||||
else:
|
else:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Unable to connect to MQTT server: Connection refused. Error code: "
|
f"Unable to connect to MQTT server: Connection refused. Error: {reason_name}"
|
||||||
+ reason_code.getName()
|
|
||||||
)
|
)
|
||||||
|
# Don't set connected = True on connection failure
|
||||||
|
return
|
||||||
|
|
||||||
self.connected = True
|
self.connected = True
|
||||||
logger.debug("MQTT connected")
|
logger.debug("MQTT connected")
|
||||||
@ -192,7 +228,34 @@ class MqttClient(Communicator):
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Mqtt disconnection callback."""
|
"""Mqtt disconnection callback."""
|
||||||
self.connected = False
|
self.connected = False
|
||||||
logger.error("MQTT disconnected")
|
# Debug reason code thoroughly
|
||||||
|
reason_name = (
|
||||||
|
reason_code.getName()
|
||||||
|
if hasattr(reason_code, "getName")
|
||||||
|
else str(reason_code)
|
||||||
|
)
|
||||||
|
reason_value = getattr(reason_code, "value", reason_code)
|
||||||
|
logger.error(
|
||||||
|
f"MQTT disconnected - reason: '{reason_name}', code: {reason_value}, type: {type(reason_code)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Don't attempt reconnection if we're stopping or if it was a clean disconnect
|
||||||
|
if self._stop_reconnect:
|
||||||
|
logger.error("MQTT not reconnecting - stop flag set")
|
||||||
|
return
|
||||||
|
|
||||||
|
if reason_code == 0:
|
||||||
|
logger.error("MQTT not reconnecting - clean disconnect (code 0)")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.error("MQTT will attempt reconnection...")
|
||||||
|
|
||||||
|
# Start reconnection in a separate thread to avoid blocking
|
||||||
|
if self._reconnect_thread is None or not self._reconnect_thread.is_alive():
|
||||||
|
self._reconnect_thread = threading.Thread(
|
||||||
|
target=self._reconnect_loop, name="mqtt-reconnect", daemon=True
|
||||||
|
)
|
||||||
|
self._reconnect_thread.start()
|
||||||
|
|
||||||
def _start(self) -> None:
|
def _start(self) -> None:
|
||||||
"""Start mqtt client."""
|
"""Start mqtt client."""
|
||||||
@ -281,3 +344,66 @@ class MqttClient(Communicator):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Unable to connect to MQTT server: {e}")
|
logger.error(f"Unable to connect to MQTT server: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def _reconnect_loop(self) -> None:
|
||||||
|
"""Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely."""
|
||||||
|
logger.error("MQTT reconnection loop started")
|
||||||
|
attempt = 0
|
||||||
|
|
||||||
|
while not self._stop_reconnect and not self.connected:
|
||||||
|
attempt += 1
|
||||||
|
|
||||||
|
logger.error(
|
||||||
|
f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wait with ability to exit early if stopping
|
||||||
|
delay_count = 0
|
||||||
|
while delay_count < self._reconnect_delay:
|
||||||
|
if self._stop_reconnect:
|
||||||
|
logger.error("MQTT reconnection stopped during delay") # type: ignore[unreachable]
|
||||||
|
return
|
||||||
|
time.sleep(1)
|
||||||
|
delay_count += 1
|
||||||
|
|
||||||
|
# Double-check stop flag after delay
|
||||||
|
if self._stop_reconnect:
|
||||||
|
logger.error("MQTT reconnection stopped after delay") # type: ignore[unreachable]
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.error(
|
||||||
|
f"Creating fresh MQTT client for reconnection attempt {attempt}..."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Clean up old client if it exists
|
||||||
|
if self.client is not None:
|
||||||
|
try:
|
||||||
|
self.client.disconnect()
|
||||||
|
self.client.loop_stop()
|
||||||
|
except Exception:
|
||||||
|
pass # Ignore cleanup errors
|
||||||
|
|
||||||
|
# Create completely fresh client and attempt connection
|
||||||
|
self._start()
|
||||||
|
|
||||||
|
# Give the connection attempt some time to complete
|
||||||
|
wait_count = 0
|
||||||
|
while wait_count < 5: # Wait up to 5 seconds for connection
|
||||||
|
if self.connected:
|
||||||
|
logger.error( # type: ignore[unreachable]
|
||||||
|
f"MQTT fresh connection successful on attempt {attempt}!"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
time.sleep(1)
|
||||||
|
wait_count += 1
|
||||||
|
|
||||||
|
logger.error(
|
||||||
|
f"MQTT fresh connection attempt {attempt} timed out, will retry"
|
||||||
|
)
|
||||||
|
# Continue the outer while loop to retry
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}")
|
||||||
|
# Continue the outer while loop to retry
|
||||||
|
|
||||||
|
logger.error("MQTT reconnection loop finished")
|
||||||
|
Loading…
Reference in New Issue
Block a user