Refactor mqtt to handle reconnects and not cause frigate to stop. (#4440)

* Refactor mqtt client

* Protect callback method

* Use async to handle reconnects

* Set types and clenup

* Don't set connected until rc code is checked
This commit is contained in:
Nicolas Mowen 2022-11-20 06:36:01 -07:00 committed by GitHub
parent 65825040a3
commit ebdf36e0b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 230 additions and 150 deletions

View File

@ -20,7 +20,7 @@ from frigate.events import EventCleanup, EventProcessor
from frigate.http import create_app from frigate.http import create_app
from frigate.log import log_process, root_configurer from frigate.log import log_process, root_configurer
from frigate.models import Event, Recordings from frigate.models import Event, Recordings
from frigate.mqtt import MqttSocketRelay, create_mqtt_client from frigate.mqtt import FrigateMqttClient, MqttSocketRelay
from frigate.object_processing import TrackedObjectProcessor from frigate.object_processing import TrackedObjectProcessor
from frigate.output import output_frames from frigate.output import output_frames
from frigate.plus import PlusApi from frigate.plus import PlusApi
@ -169,7 +169,7 @@ class FrigateApp:
self.restream.add_cameras() self.restream.add_cameras()
def init_mqtt(self) -> None: def init_mqtt(self) -> None:
self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics) self.mqtt_client = FrigateMqttClient(self.config, self.camera_metrics)
def start_mqtt_relay(self) -> None: def start_mqtt_relay(self) -> None:
self.mqtt_relay = MqttSocketRelay( self.mqtt_relay = MqttSocketRelay(

View File

@ -1,3 +1,4 @@
import datetime
import json import json
import logging import logging
import threading import threading
@ -13,21 +14,82 @@ from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.websocket import WebSocket from ws4py.websocket import WebSocket
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.types import CameraMetricsTypes
from frigate.util import restart_frigate from frigate.util import restart_frigate
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def create_mqtt_client(config: FrigateConfig, camera_metrics): class FrigateMqttClient:
mqtt_config = config.mqtt """Frigate wrapper for mqtt client."""
def on_recordings_command(client, userdata, message): def __init__(
self, config: FrigateConfig, camera_metrics: dict[str, CameraMetricsTypes]
) -> None:
self.config = config
self.mqtt_config = config.mqtt
self.camera_metrics = camera_metrics
self.connected: bool = False
self._start()
def _set_initial_topics(self) -> None:
"""Set initial state topics."""
for camera_name, camera in self.config.cameras.items():
self.publish(
f"{self.mqtt_config.topic_prefix}/{camera_name}/recordings/state",
"ON" if camera.record.enabled else "OFF",
retain=True,
)
self.publish(
f"{self.mqtt_config.topic_prefix}/{camera_name}/snapshots/state",
"ON" if camera.snapshots.enabled else "OFF",
retain=True,
)
self.publish(
f"{self.mqtt_config.topic_prefix}/{camera_name}/detect/state",
"ON" if camera.detect.enabled else "OFF",
retain=True,
)
self.publish(
f"{self.mqtt_config.topic_prefix}/{camera_name}/motion/state",
"ON",
retain=True,
)
self.publish(
f"{self.mqtt_config.topic_prefix}/{camera_name}/improve_contrast/state",
"ON" if camera.motion.improve_contrast else "OFF",
retain=True,
)
self.publish(
f"{self.mqtt_config.topic_prefix}/{camera_name}/motion_threshold/state",
camera.motion.threshold,
retain=True,
)
self.publish(
f"{self.mqtt_config.topic_prefix}/{camera_name}/motion_contour_area/state",
camera.motion.contour_area,
retain=True,
)
self.publish(
f"{self.mqtt_config.topic_prefix}/{camera_name}/motion",
"OFF",
retain=False,
)
self.publish(
self.mqtt_config.topic_prefix + "/available", "online", retain=True
)
def on_recordings_command(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
) -> None:
"""Callback for recordings topic."""
payload = message.payload.decode() payload = message.payload.decode()
logger.debug(f"on_recordings_toggle: {message.topic} {payload}") logger.debug(f"on_recordings_toggle: {message.topic} {payload}")
camera_name = message.topic.split("/")[-3] camera_name = message.topic.split("/")[-3]
record_settings = config.cameras[camera_name].record record_settings = self.config.cameras[camera_name].record
if payload == "ON": if payload == "ON":
if not record_settings.enabled: if not record_settings.enabled:
@ -41,15 +103,18 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics):
logger.warning(f"Received unsupported value at {message.topic}: {payload}") logger.warning(f"Received unsupported value at {message.topic}: {payload}")
state_topic = f"{message.topic[:-4]}/state" state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True) self.publish(state_topic, payload, retain=True)
def on_snapshots_command(client, userdata, message): def on_snapshots_command(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
) -> None:
"""Callback for snapshots topic."""
payload = message.payload.decode() payload = message.payload.decode()
logger.debug(f"on_snapshots_toggle: {message.topic} {payload}") logger.debug(f"on_snapshots_toggle: {message.topic} {payload}")
camera_name = message.topic.split("/")[-3] camera_name = message.topic.split("/")[-3]
snapshots_settings = config.cameras[camera_name].snapshots snapshots_settings = self.config.cameras[camera_name].snapshots
if payload == "ON": if payload == "ON":
if not snapshots_settings.enabled: if not snapshots_settings.enabled:
@ -63,91 +128,107 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics):
logger.warning(f"Received unsupported value at {message.topic}: {payload}") logger.warning(f"Received unsupported value at {message.topic}: {payload}")
state_topic = f"{message.topic[:-4]}/state" state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True) self.publish(state_topic, payload, retain=True)
def on_detect_command(client, userdata, message): def on_detect_command(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
) -> None:
"""Callback for detect topic."""
payload = message.payload.decode() payload = message.payload.decode()
logger.debug(f"on_detect_toggle: {message.topic} {payload}") logger.debug(f"on_detect_toggle: {message.topic} {payload}")
camera_name = message.topic.split("/")[-3] camera_name = message.topic.split("/")[-3]
detect_settings = config.cameras[camera_name].detect detect_settings = self.config.cameras[camera_name].detect
if payload == "ON": if payload == "ON":
if not camera_metrics[camera_name]["detection_enabled"].value: if not self.camera_metrics[camera_name]["detection_enabled"].value:
logger.info(f"Turning on detection for {camera_name} via mqtt") logger.info(f"Turning on detection for {camera_name} via mqtt")
camera_metrics[camera_name]["detection_enabled"].value = True self.camera_metrics[camera_name]["detection_enabled"].value = True
detect_settings.enabled = True detect_settings.enabled = True
if not camera_metrics[camera_name]["motion_enabled"].value: if not self.camera_metrics[camera_name]["motion_enabled"].value:
logger.info( logger.info(
f"Turning on motion for {camera_name} due to detection being enabled." f"Turning on motion for {camera_name} due to detection being enabled."
) )
camera_metrics[camera_name]["motion_enabled"].value = True self.camera_metrics[camera_name]["motion_enabled"].value = True
state_topic = f"{message.topic[:-11]}/motion/state" state_topic = f"{message.topic[:-11]}/motion/state"
client.publish(state_topic, payload, retain=True) self.publish(state_topic, payload, retain=True)
elif payload == "OFF": elif payload == "OFF":
if camera_metrics[camera_name]["detection_enabled"].value: if self.camera_metrics[camera_name]["detection_enabled"].value:
logger.info(f"Turning off detection for {camera_name} via mqtt") logger.info(f"Turning off detection for {camera_name} via mqtt")
camera_metrics[camera_name]["detection_enabled"].value = False self.camera_metrics[camera_name]["detection_enabled"].value = False
detect_settings.enabled = False detect_settings.enabled = False
else: else:
logger.warning(f"Received unsupported value at {message.topic}: {payload}") logger.warning(f"Received unsupported value at {message.topic}: {payload}")
state_topic = f"{message.topic[:-4]}/state" state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True) self.publish(state_topic, payload, retain=True)
def on_motion_command(client, userdata, message): def on_motion_command(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
) -> None:
"""Callback for motion topic."""
payload = message.payload.decode() payload = message.payload.decode()
logger.debug(f"on_motion_toggle: {message.topic} {payload}") logger.debug(f"on_motion_toggle: {message.topic} {payload}")
camera_name = message.topic.split("/")[-3] camera_name = message.topic.split("/")[-3]
if payload == "ON": if payload == "ON":
if not camera_metrics[camera_name]["motion_enabled"].value: if not self.camera_metrics[camera_name]["motion_enabled"].value:
logger.info(f"Turning on motion for {camera_name} via mqtt") logger.info(f"Turning on motion for {camera_name} via mqtt")
camera_metrics[camera_name]["motion_enabled"].value = True self.camera_metrics[camera_name]["motion_enabled"].value = True
elif payload == "OFF": elif payload == "OFF":
if camera_metrics[camera_name]["detection_enabled"].value: if self.camera_metrics[camera_name]["detection_enabled"].value:
logger.error( logger.error(
f"Turning off motion is not allowed when detection is enabled." f"Turning off motion is not allowed when detection is enabled."
) )
return return
if camera_metrics[camera_name]["motion_enabled"].value: if self.camera_metrics[camera_name]["motion_enabled"].value:
logger.info(f"Turning off motion for {camera_name} via mqtt") logger.info(f"Turning off motion for {camera_name} via mqtt")
camera_metrics[camera_name]["motion_enabled"].value = False self.camera_metrics[camera_name]["motion_enabled"].value = False
else: else:
logger.warning(f"Received unsupported value at {message.topic}: {payload}") logger.warning(f"Received unsupported value at {message.topic}: {payload}")
state_topic = f"{message.topic[:-4]}/state" state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True) self.publish(state_topic, payload, retain=True)
def on_improve_contrast_command(client, userdata, message): def on_improve_contrast_command(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
) -> None:
"""Callback for improve_contrast topic."""
payload = message.payload.decode() payload = message.payload.decode()
logger.debug(f"on_improve_contrast_toggle: {message.topic} {payload}") logger.debug(f"on_improve_contrast_toggle: {message.topic} {payload}")
camera_name = message.topic.split("/")[-3] camera_name = message.topic.split("/")[-3]
motion_settings = config.cameras[camera_name].motion motion_settings = self.config.cameras[camera_name].motion
if payload == "ON": if payload == "ON":
if not camera_metrics[camera_name]["improve_contrast_enabled"].value: if not self.camera_metrics[camera_name]["improve_contrast_enabled"].value:
logger.info(f"Turning on improve contrast for {camera_name} via mqtt") logger.info(f"Turning on improve contrast for {camera_name} via mqtt")
camera_metrics[camera_name]["improve_contrast_enabled"].value = True self.camera_metrics[camera_name][
"improve_contrast_enabled"
].value = True
motion_settings.improve_contrast = True motion_settings.improve_contrast = True
elif payload == "OFF": elif payload == "OFF":
if camera_metrics[camera_name]["improve_contrast_enabled"].value: if self.camera_metrics[camera_name]["improve_contrast_enabled"].value:
logger.info(f"Turning off improve contrast for {camera_name} via mqtt") logger.info(f"Turning off improve contrast for {camera_name} via mqtt")
camera_metrics[camera_name]["improve_contrast_enabled"].value = False self.camera_metrics[camera_name][
"improve_contrast_enabled"
].value = False
motion_settings.improve_contrast = False motion_settings.improve_contrast = False
else: else:
logger.warning(f"Received unsupported value at {message.topic}: {payload}") logger.warning(f"Received unsupported value at {message.topic}: {payload}")
state_topic = f"{message.topic[:-4]}/state" state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True) self.publish(state_topic, payload, retain=True)
def on_motion_threshold_command(client, userdata, message): def on_motion_threshold_command(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
) -> None:
"""Callback for motion threshold topic."""
try: try:
payload = int(message.payload.decode()) payload = int(message.payload.decode())
except ValueError: except ValueError:
@ -160,16 +241,19 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics):
camera_name = message.topic.split("/")[-3] camera_name = message.topic.split("/")[-3]
motion_settings = config.cameras[camera_name].motion motion_settings = self.config.cameras[camera_name].motion
logger.info(f"Setting motion threshold for {camera_name} via mqtt: {payload}") logger.info(f"Setting motion threshold for {camera_name} via mqtt: {payload}")
camera_metrics[camera_name]["motion_threshold"].value = payload self.camera_metrics[camera_name]["motion_threshold"].value = payload
motion_settings.threshold = payload motion_settings.threshold = payload
state_topic = f"{message.topic[:-4]}/state" state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True) self.publish(state_topic, payload, retain=True)
def on_motion_contour_area_command(client, userdata, message): def on_motion_contour_area_command(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
) -> None:
"""Callback for motion contour topic."""
try: try:
payload = int(message.payload.decode()) payload = int(message.payload.decode())
except ValueError: except ValueError:
@ -182,21 +266,25 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics):
camera_name = message.topic.split("/")[-3] camera_name = message.topic.split("/")[-3]
motion_settings = config.cameras[camera_name].motion motion_settings = self.config.cameras[camera_name].motion
logger.info( logger.info(
f"Setting motion contour area for {camera_name} via mqtt: {payload}" f"Setting motion contour area for {camera_name} via mqtt: {payload}"
) )
camera_metrics[camera_name]["motion_contour_area"].value = payload self.camera_metrics[camera_name]["motion_contour_area"].value = payload
motion_settings.contour_area = payload motion_settings.contour_area = payload
state_topic = f"{message.topic[:-4]}/state" state_topic = f"{message.topic[:-4]}/state"
client.publish(state_topic, payload, retain=True) self.publish(state_topic, payload, retain=True)
def on_restart_command(client, userdata, message): def on_restart_command(
client: mqtt.Client, userdata, message: mqtt.MQTTMessage
) -> None:
"""Callback to restart frigate."""
restart_frigate() restart_frigate()
def on_connect(client, userdata, flags, rc): def _on_connect(self, client: mqtt.Client, userdata, flags, rc) -> None:
"""Mqtt connection callback."""
threading.current_thread().name = "mqtt" threading.current_thread().name = "mqtt"
if rc != 0: if rc != 0:
if rc == 3: if rc == 3:
@ -215,118 +303,103 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics):
+ str(rc) + str(rc)
) )
self.connected = True
logger.debug("MQTT connected") logger.debug("MQTT connected")
client.subscribe(f"{mqtt_config.topic_prefix}/#") client.subscribe(f"{self.mqtt_config.topic_prefix}/#")
client.publish(mqtt_config.topic_prefix + "/available", "online", retain=True) self._set_initial_topics()
client = mqtt.Client(client_id=mqtt_config.client_id) def _on_disconnect(self, client: mqtt.Client, userdata, flags, rc) -> None:
client.on_connect = on_connect """Mqtt disconnection callback."""
client.will_set( self.connected = False
mqtt_config.topic_prefix + "/available", payload="offline", qos=1, retain=True logger.error("MQTT disconnected")
def _start(self) -> None:
"""Start mqtt client."""
self.client = mqtt.Client(client_id=self.mqtt_config.client_id)
self.client.on_connect = self._on_connect
self.client.will_set(
self.mqtt_config.topic_prefix + "/available",
payload="offline",
qos=1,
retain=True,
) )
# register callbacks # register callbacks
for name in config.cameras.keys(): for name in self.config.cameras.keys():
client.message_callback_add( self.client.message_callback_add(
f"{mqtt_config.topic_prefix}/{name}/recordings/set", on_recordings_command f"{self.mqtt_config.topic_prefix}/{name}/recordings/set",
self.on_recordings_command,
) )
client.message_callback_add( self.client.message_callback_add(
f"{mqtt_config.topic_prefix}/{name}/snapshots/set", on_snapshots_command f"{self.mqtt_config.topic_prefix}/{name}/snapshots/set",
self.on_snapshots_command,
) )
client.message_callback_add( self.client.message_callback_add(
f"{mqtt_config.topic_prefix}/{name}/detect/set", on_detect_command f"{self.mqtt_config.topic_prefix}/{name}/detect/set",
self.on_detect_command,
) )
client.message_callback_add( self.client.message_callback_add(
f"{mqtt_config.topic_prefix}/{name}/motion/set", on_motion_command f"{self.mqtt_config.topic_prefix}/{name}/motion/set",
self.on_motion_command,
) )
client.message_callback_add( self.client.message_callback_add(
f"{mqtt_config.topic_prefix}/{name}/improve_contrast/set", f"{self.mqtt_config.topic_prefix}/{name}/improve_contrast/set",
on_improve_contrast_command, self.on_improve_contrast_command,
) )
client.message_callback_add( self.client.message_callback_add(
f"{mqtt_config.topic_prefix}/{name}/motion_threshold/set", f"{self.mqtt_config.topic_prefix}/{name}/motion_threshold/set",
on_motion_threshold_command, self.on_motion_threshold_command,
) )
client.message_callback_add( self.client.message_callback_add(
f"{mqtt_config.topic_prefix}/{name}/motion_contour_area/set", f"{self.mqtt_config.topic_prefix}/{name}/motion_contour_area/set",
on_motion_contour_area_command, self.on_motion_contour_area_command,
) )
client.message_callback_add( self.client.message_callback_add(
f"{mqtt_config.topic_prefix}/restart", on_restart_command f"{self.mqtt_config.topic_prefix}/restart", self.on_restart_command
) )
if not mqtt_config.tls_ca_certs is None: if not self.mqtt_config.tls_ca_certs is None:
if ( if (
not mqtt_config.tls_client_cert is None not self.mqtt_config.tls_client_cert is None
and not mqtt_config.tls_client_key is None and not self.mqtt_config.tls_client_key is None
): ):
client.tls_set( self.client.tls_set(
mqtt_config.tls_ca_certs, self.mqtt_config.tls_ca_certs,
mqtt_config.tls_client_cert, self.mqtt_config.tls_client_cert,
mqtt_config.tls_client_key, self.mqtt_config.tls_client_key,
) )
else: else:
client.tls_set(mqtt_config.tls_ca_certs) self.client.tls_set(self.mqtt_config.tls_ca_certs)
if not mqtt_config.tls_insecure is None: if not self.mqtt_config.tls_insecure is None:
client.tls_insecure_set(mqtt_config.tls_insecure) self.client.tls_insecure_set(self.mqtt_config.tls_insecure)
if not mqtt_config.user is None: if not self.mqtt_config.user is None:
client.username_pw_set(mqtt_config.user, password=mqtt_config.password) self.client.username_pw_set(
self.mqtt_config.user, password=self.mqtt_config.password
)
try: try:
client.connect(mqtt_config.host, mqtt_config.port, 60) # https://stackoverflow.com/a/55390477
# with connect_async, retries are handled automatically
self.client.connect_async(self.mqtt_config.host, self.mqtt_config.port, 60)
self.client.loop_start()
except Exception as e: except Exception as e:
logger.error(f"Unable to connect to MQTT server: {e}") logger.error(f"Unable to connect to MQTT server: {e}")
raise return
client.loop_start() def publish(self, topic: str, payload, retain: bool = False) -> None:
"""Wrapper for publishing when client is in valid state."""
if not self.connected:
logger.error(f"Unable to publish to {topic}: client is not connected")
return
for name in config.cameras.keys(): self.client.publish(topic, payload, retain=retain)
client.publish(
f"{mqtt_config.topic_prefix}/{name}/recordings/state",
"ON" if config.cameras[name].record.enabled else "OFF",
retain=True,
)
client.publish(
f"{mqtt_config.topic_prefix}/{name}/snapshots/state",
"ON" if config.cameras[name].snapshots.enabled else "OFF",
retain=True,
)
client.publish(
f"{mqtt_config.topic_prefix}/{name}/detect/state",
"ON" if config.cameras[name].detect.enabled else "OFF",
retain=True,
)
client.publish(
f"{mqtt_config.topic_prefix}/{name}/motion/state",
"ON",
retain=True,
)
client.publish(
f"{mqtt_config.topic_prefix}/{name}/improve_contrast/state",
"ON" if config.cameras[name].motion.improve_contrast else "OFF",
retain=True,
)
client.publish(
f"{mqtt_config.topic_prefix}/{name}/motion_threshold/state",
config.cameras[name].motion.threshold,
retain=True,
)
client.publish(
f"{mqtt_config.topic_prefix}/{name}/motion_contour_area/state",
config.cameras[name].motion.contour_area,
retain=True,
)
client.publish(
f"{mqtt_config.topic_prefix}/{name}/motion",
"OFF",
retain=False,
)
return client def add_topic_callback(self, topic: str, callback) -> None:
self.client.message_callback_add(topic, callback)
class MqttSocketRelay: class MqttSocketRelay:
def __init__(self, mqtt_client, topic_prefix): def __init__(self, mqtt_client: FrigateMqttClient, topic_prefix: str):
self.mqtt_client = mqtt_client self.mqtt_client = mqtt_client
self.topic_prefix = topic_prefix self.topic_prefix = topic_prefix
@ -389,7 +462,7 @@ class MqttSocketRelay:
self.websocket_server.manager.broadcast(ws_message) self.websocket_server.manager.broadcast(ws_message)
self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) self.mqtt_client.add_topic_callback(f"{self.topic_prefix}/#", send)
self.websocket_thread.start() self.websocket_thread.start()

View File

@ -12,8 +12,15 @@ from typing import Callable
import cv2 import cv2
import numpy as np import numpy as np
from frigate.config import CameraConfig, SnapshotsConfig, RecordConfig, FrigateConfig from frigate.config import (
CameraConfig,
MqttConfig,
SnapshotsConfig,
RecordConfig,
FrigateConfig,
)
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.mqtt import FrigateMqttClient
from frigate.util import ( from frigate.util import (
SharedMemoryFrameManager, SharedMemoryFrameManager,
calculate_region, calculate_region,
@ -626,7 +633,7 @@ class TrackedObjectProcessor(threading.Thread):
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
client, client: FrigateMqttClient,
topic_prefix, topic_prefix,
tracked_objects_queue, tracked_objects_queue,
event_queue, event_queue,
@ -724,7 +731,7 @@ class TrackedObjectProcessor(threading.Thread):
self.event_queue.put(("end", camera, obj.to_dict(include_thumbnail=True))) self.event_queue.put(("end", camera, obj.to_dict(include_thumbnail=True)))
def snapshot(camera, obj: TrackedObject, current_frame_time): def snapshot(camera, obj: TrackedObject, current_frame_time):
mqtt_config = self.config.cameras[camera].mqtt mqtt_config: MqttConfig = self.config.cameras[camera].mqtt
if mqtt_config.enabled and self.should_mqtt_snapshot(camera, obj): if mqtt_config.enabled and self.should_mqtt_snapshot(camera, obj):
jpg_bytes = obj.get_jpg_bytes( jpg_bytes = obj.get_jpg_bytes(
timestamp=mqtt_config.timestamp, timestamp=mqtt_config.timestamp,

View File

@ -7,11 +7,11 @@ import shutil
import os import os
import requests import requests
from typing import Optional, Any from typing import Optional, Any
from paho.mqtt.client import Client
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
from frigate.mqtt import FrigateMqttClient
from frigate.types import StatsTrackingTypes, CameraMetricsTypes from frigate.types import StatsTrackingTypes, CameraMetricsTypes
from frigate.version import VERSION from frigate.version import VERSION
from frigate.util import get_cpu_stats from frigate.util import get_cpu_stats
@ -146,7 +146,7 @@ class StatsEmitter(threading.Thread):
self, self,
config: FrigateConfig, config: FrigateConfig,
stats_tracking: StatsTrackingTypes, stats_tracking: StatsTrackingTypes,
mqtt_client: Client, mqtt_client: FrigateMqttClient,
topic_prefix: str, topic_prefix: str,
stop_event: MpEvent, stop_event: MpEvent,
): ):