mirror of
synced 2025-03-09 00:16:54 +01:00
Use config data classes to eliminate some of the boilerplate associated with setting up the configuration. In particular, using dataclasses removes a lot of the boilerplate around assigning properties to the object and allows these to be easily immutable by freezing them. In the case of simple, non-nested dataclasses, this also provides more convenient `asdict` helpers. To set this up, where previously the objects would be parsed from the config via the `__init__` method, create a `build` classmethod that does this and calls the dataclass initializer. Some of the objects are mutated at runtime, in particular some of the zones are mutated to set the color (this might be able to be refactored out) and some of the camera functionality can be enabled/disabled. Some of the configs with `enabled` properties don't seem to have mqtt hooks to be able to toggle this, in particular, the clips, snapshots, and detect can be toggled but rtmp and record configs do not, but all of these configs are still not frozen in case there is some other functionality I am missing. There are a couple other minor fixes here, one that was introduced by me recently where `max_seconds` was not defined, the other to properly `get()` the message payload when handling publishing mqtt messages sent via websocket.
147 lines
5.4 KiB
147 lines
5.4 KiB
import logging
import threading
import paho.mqtt.client as mqtt
from frigate.config import FrigateConfig
logger = logging.getLogger(__name__)
def create_mqtt_client(config: FrigateConfig, camera_metrics):
mqtt_config = config.mqtt
def on_clips_command(client, userdata, message):
payload = message.payload.decode()
logger.debug(f"on_clips_toggle: {message.topic} {payload}")
camera_name = message.topic.split("/")[-3]
clips_settings = config.cameras[camera_name].clips
if payload == "ON":
if not clips_settings.enabled:
logger.info(f"Turning on clips for {camera_name} via mqtt")
clips_settings.enabled = True
elif payload == "OFF":
if clips_settings.enabled:
logger.info(f"Turning off clips for {camera_name} via mqtt")
clips_settings.enabled = False
logger.warning(f"Received unsupported value at {message.topic}: {payload}")
state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True)
def on_snapshots_command(client, userdata, message):
payload = message.payload.decode()
logger.debug(f"on_snapshots_toggle: {message.topic} {payload}")
camera_name = message.topic.split("/")[-3]
snapshots_settings = config.cameras[camera_name].snapshots
if payload == "ON":
if not snapshots_settings.enabled:
logger.info(f"Turning on snapshots for {camera_name} via mqtt")
snapshots_settings.enabled = True
elif payload == "OFF":
if snapshots_settings.enabled:
logger.info(f"Turning off snapshots for {camera_name} via mqtt")
snapshots_settings.enabled = False
logger.warning(f"Received unsupported value at {message.topic}: {payload}")
state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True)
def on_detect_command(client, userdata, message):
payload = message.payload.decode()
logger.debug(f"on_detect_toggle: {message.topic} {payload}")
camera_name = message.topic.split("/")[-3]
detect_settings = config.cameras[camera_name].detect
if payload == "ON":
if not camera_metrics[camera_name]["detection_enabled"].value:
logger.info(f"Turning on detection for {camera_name} via mqtt")
camera_metrics[camera_name]["detection_enabled"].value = True
detect_settings.enabled = True
elif payload == "OFF":
if camera_metrics[camera_name]["detection_enabled"].value:
logger.info(f"Turning off detection for {camera_name} via mqtt")
camera_metrics[camera_name]["detection_enabled"].value = False
detect_settings.enabled = False
logger.warning(f"Received unsupported value at {message.topic}: {payload}")
state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True)
def on_connect(client, userdata, flags, rc):
threading.current_thread().name = "mqtt"
if rc != 0:
if rc == 3:
logger.error("MQTT Server unavailable")
elif rc == 4:
logger.error("MQTT Bad username or password")
elif rc == 5:
logger.error("MQTT Not authorized")
"Unable to connect to MQTT: Connection refused. Error code: "
+ str(rc)
logger.info("MQTT connected")
client.publish(mqtt_config.topic_prefix + "/available", "online", retain=True)
client = mqtt.Client(client_id=mqtt_config.client_id)
client.on_connect = on_connect
mqtt_config.topic_prefix + "/available", payload="offline", qos=1, retain=True
# register callbacks
for name in config.cameras.keys():
f"{mqtt_config.topic_prefix}/{name}/clips/set", on_clips_command
f"{mqtt_config.topic_prefix}/{name}/snapshots/set", on_snapshots_command
f"{mqtt_config.topic_prefix}/{name}/detect/set", on_detect_command
if not mqtt_config.user is None:
client.username_pw_set(mqtt_config.user, password=mqtt_config.password)
client.connect(mqtt_config.host, mqtt_config.port, 60)
except Exception as e:
logger.error(f"Unable to connect to MQTT server: {e}")
for name in config.cameras.keys():
"ON" if config.cameras[name].clips.enabled else "OFF",
"ON" if config.cameras[name].snapshots.enabled else "OFF",
"ON" if config.cameras[name].detect.enabled else "OFF",
return client