From 6c0978498db05e27c6cdb0fd2ff21cfb561f2235 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 23 Nov 2022 19:03:20 -0700 Subject: [PATCH] Abstract MQTT from communication and make mqtt optional (#4462) * Add option for mqtt config * Setup communication layer * Have a dispatcher which is responsible for handling and sending messages * Move mqtt to communication * Separate ws communications module * Make ws client conform to communicator * Cleanup imports * Migrate to new dispatcher * Clean up * Need to set topic prefix * Remove references to mqtt in dispatcher * Don't start mqtt until dispatcher is subscribed * Cleanup * Shorten package * Formatting * Remove unused * Cleanup * Rename mqtt to ws on web * Fix ws mypy * Fix mypy * Reformat * Cleanup if/else chain * Catch bad set commands --- docs/docs/configuration/index.md | 2 + docs/docs/guides/getting_started.md | 11 +- docs/docs/installation.md | 2 +- frigate/app.py | 30 +- frigate/comms/dispatcher.py | 197 ++++++++ frigate/comms/mqtt.py | 201 ++++++++ frigate/comms/ws.py | 97 ++++ frigate/config.py | 1 + frigate/mqtt.py | 474 ------------------ frigate/mypy.ini | 3 +- frigate/object_processing.py | 26 +- frigate/stats.py | 8 +- web/src/AppBar.jsx | 2 +- web/src/api/__tests__/index.test.jsx | 4 +- .../__tests__/{mqtt.test.jsx => ws.test.jsx} | 36 +- web/src/api/index.jsx | 8 +- web/src/api/{mqtt.jsx => ws.jsx} | 24 +- web/src/context/__tests__/index.test.jsx | 4 +- web/src/routes/Cameras.jsx | 2 +- web/src/routes/System.jsx | 4 +- web/src/routes/__tests__/Camera.test.jsx | 4 +- web/src/routes/__tests__/Cameras.test.jsx | 10 +- web/src/routes/__tests__/Recording.test.jsx | 4 +- 23 files changed, 594 insertions(+), 560 deletions(-) create mode 100644 frigate/comms/dispatcher.py create mode 100644 frigate/comms/mqtt.py create mode 100644 frigate/comms/ws.py delete mode 100644 frigate/mqtt.py rename web/src/api/__tests__/{mqtt.test.jsx => ws.test.jsx} (80%) rename web/src/api/{mqtt.jsx => ws.jsx} (79%) diff --git a/docs/docs/configuration/index.md b/docs/docs/configuration/index.md index eacb8cb1f..26f089695 100644 --- a/docs/docs/configuration/index.md +++ b/docs/docs/configuration/index.md @@ -39,6 +39,8 @@ It is not recommended to copy this full configuration file. Only specify values ```yaml mqtt: + # Optional: Enable mqtt server (default: shown below) + enabled: True # Required: host name host: mqtt.server.com # Optional: port (default: shown below) diff --git a/docs/docs/guides/getting_started.md b/docs/docs/guides/getting_started.md index 5fe51cb00..57936fdcd 100644 --- a/docs/docs/guides/getting_started.md +++ b/docs/docs/guides/getting_started.md @@ -5,15 +5,22 @@ title: Creating a config file This guide walks through the steps to build a configuration file for Frigate. It assumes that you already have an environment setup as described in [Installation](/installation). You should also configure your cameras according to the [camera setup guide](/guides/camera_setup) -### Step 1: Configure the MQTT server +### Step 1: Configure the MQTT server (Optional) -Frigate requires a functioning MQTT server. Start by adding the mqtt section at the top level in your config: +Use of a functioning MQTT server is optional for frigate, but required for the home assistant integration. Start by adding the mqtt section at the top level in your config: +If using mqtt: ```yaml mqtt: host: ``` +If not using mqtt: +```yaml +mqtt: + enabled: False +``` + If using the Mosquitto Addon in Home Assistant, a username and password is required. For example: ```yaml diff --git a/docs/docs/installation.md b/docs/docs/installation.md index 5fe8e59fd..7935f82f8 100644 --- a/docs/docs/installation.md +++ b/docs/docs/installation.md @@ -7,7 +7,7 @@ Frigate is a Docker container that can be run on any Docker host including as a ## Dependencies -**MQTT broker** - Frigate requires an MQTT broker. If using Home Assistant, Frigate and Home Assistant must be connected to the same MQTT broker. +**MQTT broker (optional)** - An MQTT broker is optional with Frigate, but is required for the Home Assistant integration. If using Home Assistant, Frigate and Home Assistant must be connected to the same MQTT broker. ## Preparing your hardware diff --git a/frigate/app.py b/frigate/app.py index d3bae31b6..6d86f07e6 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -13,14 +13,16 @@ from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase -from frigate.config import DetectorTypeEnum, FrigateConfig +from frigate.comms.dispatcher import Communicator, Dispatcher +from frigate.comms.mqtt import MqttClient +from frigate.comms.ws import WebSocketClient +from frigate.config import FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.object_detection import ObjectDetectProcess from frigate.events import EventCleanup, EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer from frigate.models import Event, Recordings -from frigate.mqtt import FrigateMqttClient, MqttSocketRelay from frigate.object_processing import TrackedObjectProcessor from frigate.output import output_frames from frigate.plus import PlusApi @@ -168,14 +170,15 @@ class FrigateApp: self.restream = RestreamApi(self.config) self.restream.add_cameras() - def init_mqtt(self) -> None: - self.mqtt_client = FrigateMqttClient(self.config, self.camera_metrics) + def init_dispatcher(self) -> None: + comms: list[Communicator] = [] - def start_mqtt_relay(self) -> None: - self.mqtt_relay = MqttSocketRelay( - self.mqtt_client, self.config.mqtt.topic_prefix - ) - self.mqtt_relay.start() + if self.config.mqtt.enabled: + comms.append(MqttClient(self.config)) + + self.ws_client = WebSocketClient(self.config) + comms.append(self.ws_client) + self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms) def start_detectors(self) -> None: for name in self.config.cameras.keys(): @@ -214,7 +217,7 @@ class FrigateApp: def start_detected_frames_processor(self) -> None: self.detected_frames_processor = TrackedObjectProcessor( self.config, - self.mqtt_client, + self.dispatcher, self.config.mqtt.topic_prefix, self.detected_frames_queue, self.event_queue, @@ -312,7 +315,7 @@ class FrigateApp: self.stats_emitter = StatsEmitter( self.config, self.stats_tracking, - self.mqtt_client, + self.dispatcher, self.config.mqtt.topic_prefix, self.stop_event, ) @@ -350,7 +353,7 @@ class FrigateApp: self.set_log_levels() self.init_queues() self.init_database() - self.init_mqtt() + self.init_dispatcher() except Exception as e: print(e) self.log_process.terminate() @@ -363,7 +366,6 @@ class FrigateApp: self.start_camera_capture_processes() self.init_stats() self.init_web_server() - self.start_mqtt_relay() self.start_event_processor() self.start_event_cleanup() self.start_recording_maintainer() @@ -390,7 +392,7 @@ class FrigateApp: logger.info(f"Stopping...") self.stop_event.set() - self.mqtt_relay.stop() + self.ws_client.stop() self.detected_frames_processor.join() self.event_processor.join() self.event_cleanup.join() diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py new file mode 100644 index 000000000..79954d5bd --- /dev/null +++ b/frigate/comms/dispatcher.py @@ -0,0 +1,197 @@ +"""Handle communication between frigate and other applications.""" + +import logging + +from typing import Any, Callable + +from abc import ABC, abstractmethod + +from frigate.config import FrigateConfig +from frigate.types import CameraMetricsTypes +from frigate.util import restart_frigate + + +logger = logging.getLogger(__name__) + + +class Communicator(ABC): + """pub/sub model via specific protocol.""" + + @abstractmethod + def publish(self, topic: str, payload: Any, retain: bool = False) -> None: + """Send data via specific protocol.""" + pass + + @abstractmethod + def subscribe(self, receiver: Callable) -> None: + """Pass receiver so communicators can pass commands.""" + pass + + +class Dispatcher: + """Handle communication between frigate and communicators.""" + + def __init__( + self, + config: FrigateConfig, + camera_metrics: dict[str, CameraMetricsTypes], + communicators: list[Communicator], + ) -> None: + self.config = config + self.camera_metrics = camera_metrics + self.comms = communicators + + for comm in self.comms: + comm.subscribe(self._receive) + + self._camera_settings_handlers: dict[str, Callable] = { + "detect": self._on_detect_command, + "improve_contrast": self._on_motion_improve_contrast_command, + "motion": self._on_motion_command, + "motion_contour_area": self._on_motion_contour_area_command, + "motion_threshold": self._on_motion_threshold_command, + "recording": self._on_recordings_command, + "snapshots": self._on_snapshots_command, + } + + def _receive(self, topic: str, payload: str) -> None: + """Handle receiving of payload from communicators.""" + if topic.endswith("set"): + try: + camera_name = topic.split("/")[-3] + command = topic.split("/")[-2] + self._camera_settings_handlers[command](camera_name, payload) + except Exception as e: + logger.error(f"Received invalid set command: {topic}") + return + elif topic == "restart": + restart_frigate() + + def publish(self, topic: str, payload: Any, retain: bool = False) -> None: + """Handle publishing to communicators.""" + for comm in self.comms: + comm.publish(topic, payload, retain) + + def _on_detect_command(self, camera_name: str, payload: str) -> None: + """Callback for detect topic.""" + detect_settings = self.config.cameras[camera_name].detect + + if payload == "ON": + if not self.camera_metrics[camera_name]["detection_enabled"].value: + logger.info(f"Turning on detection for {camera_name}") + self.camera_metrics[camera_name]["detection_enabled"].value = True + detect_settings.enabled = True + + if not self.camera_metrics[camera_name]["motion_enabled"].value: + logger.info( + f"Turning on motion for {camera_name} due to detection being enabled." + ) + self.camera_metrics[camera_name]["motion_enabled"].value = True + self.publish(f"{camera_name}/motion/state", payload, retain=True) + elif payload == "OFF": + if self.camera_metrics[camera_name]["detection_enabled"].value: + logger.info(f"Turning off detection for {camera_name}") + self.camera_metrics[camera_name]["detection_enabled"].value = False + detect_settings.enabled = False + + self.publish(f"{camera_name}/detect/state", payload, retain=True) + + def _on_motion_command(self, camera_name: str, payload: str) -> None: + """Callback for motion topic.""" + if payload == "ON": + if not self.camera_metrics[camera_name]["motion_enabled"].value: + logger.info(f"Turning on motion for {camera_name}") + self.camera_metrics[camera_name]["motion_enabled"].value = True + elif payload == "OFF": + if self.camera_metrics[camera_name]["detection_enabled"].value: + logger.error( + f"Turning off motion is not allowed when detection is enabled." + ) + return + + if self.camera_metrics[camera_name]["motion_enabled"].value: + logger.info(f"Turning off motion for {camera_name}") + self.camera_metrics[camera_name]["motion_enabled"].value = False + + self.publish(f"{camera_name}/motion/state", payload, retain=True) + + def _on_motion_improve_contrast_command( + self, camera_name: str, payload: str + ) -> None: + """Callback for improve_contrast topic.""" + motion_settings = self.config.cameras[camera_name].motion + + if payload == "ON": + if not self.camera_metrics[camera_name]["improve_contrast_enabled"].value: + logger.info(f"Turning on improve contrast for {camera_name}") + self.camera_metrics[camera_name][ + "improve_contrast_enabled" + ].value = True + motion_settings.improve_contrast = True # type: ignore[union-attr] + elif payload == "OFF": + if self.camera_metrics[camera_name]["improve_contrast_enabled"].value: + logger.info(f"Turning off improve contrast for {camera_name}") + self.camera_metrics[camera_name][ + "improve_contrast_enabled" + ].value = False + motion_settings.improve_contrast = False # type: ignore[union-attr] + + self.publish(f"{camera_name}/improve_contrast/state", payload, retain=True) + + def _on_motion_contour_area_command(self, camera_name: str, payload: int) -> None: + """Callback for motion contour topic.""" + try: + payload = int(payload) + except ValueError: + f"Received unsupported value for motion contour area: {payload}" + return + + motion_settings = self.config.cameras[camera_name].motion + logger.info(f"Setting motion contour area for {camera_name}: {payload}") + self.camera_metrics[camera_name]["motion_contour_area"].value = payload + motion_settings.contour_area = payload # type: ignore[union-attr] + self.publish(f"{camera_name}/motion_contour_area/state", payload, retain=True) + + def _on_motion_threshold_command(self, camera_name: str, payload: int) -> None: + """Callback for motion threshold topic.""" + try: + payload = int(payload) + except ValueError: + f"Received unsupported value for motion threshold: {payload}" + return + + motion_settings = self.config.cameras[camera_name].motion + logger.info(f"Setting motion threshold for {camera_name}: {payload}") + self.camera_metrics[camera_name]["motion_threshold"].value = payload + motion_settings.threshold = payload # type: ignore[union-attr] + self.publish(f"{camera_name}/motion_threshold/state", payload, retain=True) + + def _on_recordings_command(self, camera_name: str, payload: str) -> None: + """Callback for recordings topic.""" + record_settings = self.config.cameras[camera_name].record + + if payload == "ON": + if not record_settings.enabled: + logger.info(f"Turning on recordings for {camera_name}") + record_settings.enabled = True + elif payload == "OFF": + if record_settings.enabled: + logger.info(f"Turning off recordings for {camera_name}") + record_settings.enabled = False + + self.publish(f"{camera_name}/recordings/state", payload, retain=True) + + def _on_snapshots_command(self, camera_name: str, payload: str) -> None: + """Callback for snapshots topic.""" + snapshots_settings = self.config.cameras[camera_name].snapshots + + if payload == "ON": + if not snapshots_settings.enabled: + logger.info(f"Turning on snapshots for {camera_name}") + snapshots_settings.enabled = True + elif payload == "OFF": + if snapshots_settings.enabled: + logger.info(f"Turning off snapshots for {camera_name}") + snapshots_settings.enabled = False + + self.publish(f"{camera_name}/snapshots/state", payload, retain=True) diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py new file mode 100644 index 000000000..076b96608 --- /dev/null +++ b/frigate/comms/mqtt.py @@ -0,0 +1,201 @@ +import logging +import threading + +from typing import Any, Callable + +import paho.mqtt.client as mqtt + +from frigate.comms.dispatcher import Communicator +from frigate.config import FrigateConfig + + +logger = logging.getLogger(__name__) + + +class MqttClient(Communicator): # type: ignore[misc] + """Frigate wrapper for mqtt client.""" + + def __init__(self, config: FrigateConfig) -> None: + self.config = config + self.mqtt_config = config.mqtt + self.connected: bool = False + + def subscribe(self, receiver: Callable) -> None: + """Wrapper for allowing dispatcher to subscribe.""" + self._dispatcher = receiver + self._start() + + def publish(self, topic: str, payload: Any, retain: bool = False) -> None: + """Wrapper for publishing when client is in valid state.""" + if not self.connected: + logger.error(f"Unable to publish to {topic}: client is not connected") + return + + self.client.publish( + f"{self.mqtt_config.topic_prefix}/{topic}", payload, retain=retain + ) + + def _set_initial_topics(self) -> None: + """Set initial state topics.""" + for camera_name, camera in self.config.cameras.items(): + self.publish( + f"{camera_name}/recordings/state", + "ON" if camera.record.enabled else "OFF", + retain=True, + ) + self.publish( + f"{camera_name}/snapshots/state", + "ON" if camera.snapshots.enabled else "OFF", + retain=True, + ) + self.publish( + f"{camera_name}/detect/state", + "ON" if camera.detect.enabled else "OFF", + retain=True, + ) + self.publish( + f"{camera_name}/motion/state", + "ON", + retain=True, + ) + self.publish( + f"{camera_name}/improve_contrast/state", + "ON" if camera.motion.improve_contrast else "OFF", # type: ignore[union-attr] + retain=True, + ) + self.publish( + f"{camera_name}/motion_threshold/state", + camera.motion.threshold, # type: ignore[union-attr] + retain=True, + ) + self.publish( + f"{camera_name}/motion_contour_area/state", + camera.motion.contour_area, # type: ignore[union-attr] + retain=True, + ) + self.publish( + f"{camera_name}/motion", + "OFF", + retain=False, + ) + + self.publish("available", "online", retain=True) + + def on_mqtt_command( + self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage + ) -> None: + self._dispatcher( + message.topic.replace(f"{self.mqtt_config.topic_prefix}/", ""), + message.payload.decode(), + ) + + def _on_connect( + self, + client: mqtt.Client, + userdata: Any, + flags: Any, + rc: mqtt.ReasonCodes, + ) -> None: + """Mqtt connection callback.""" + threading.current_thread().name = "mqtt" + if rc != 0: + if rc == 3: + logger.error( + "Unable to connect to MQTT server: MQTT Server unavailable" + ) + elif rc == 4: + logger.error( + "Unable to connect to MQTT server: MQTT Bad username or password" + ) + elif rc == 5: + logger.error("Unable to connect to MQTT server: MQTT Not authorized") + else: + logger.error( + "Unable to connect to MQTT server: Connection refused. Error code: " + + str(rc) + ) + + self.connected = True + logger.debug("MQTT connected") + client.subscribe(f"{self.mqtt_config.topic_prefix}/#") + self._set_initial_topics() + + def _on_disconnect( + self, client: mqtt.Client, userdata: Any, flags: Any, rc: mqtt + ) -> None: + """Mqtt disconnection callback.""" + self.connected = False + logger.error("MQTT disconnected") + + def _start(self) -> None: + """Start mqtt client.""" + self.client = mqtt.Client(client_id=self.mqtt_config.client_id) + self.client.on_connect = self._on_connect + self.client.will_set( + self.mqtt_config.topic_prefix + "/available", + payload="offline", + qos=1, + retain=True, + ) + + # register callbacks + for name in self.config.cameras.keys(): + self.client.message_callback_add( + f"{self.mqtt_config.topic_prefix}/{name}/recordings/set", + self.on_mqtt_command, + ) + self.client.message_callback_add( + f"{self.mqtt_config.topic_prefix}/{name}/snapshots/set", + self.on_mqtt_command, + ) + self.client.message_callback_add( + f"{self.mqtt_config.topic_prefix}/{name}/detect/set", + self.on_mqtt_command, + ) + self.client.message_callback_add( + f"{self.mqtt_config.topic_prefix}/{name}/motion/set", + self.on_mqtt_command, + ) + self.client.message_callback_add( + f"{self.mqtt_config.topic_prefix}/{name}/improve_contrast/set", + self.on_mqtt_command, + ) + self.client.message_callback_add( + f"{self.mqtt_config.topic_prefix}/{name}/motion_threshold/set", + self.on_mqtt_command, + ) + self.client.message_callback_add( + f"{self.mqtt_config.topic_prefix}/{name}/motion_contour_area/set", + self.on_mqtt_command, + ) + + self.client.message_callback_add( + f"{self.mqtt_config.topic_prefix}/restart", self.on_mqtt_command + ) + + if not self.mqtt_config.tls_ca_certs is None: + if ( + not self.mqtt_config.tls_client_cert is None + and not self.mqtt_config.tls_client_key is None + ): + self.client.tls_set( + self.mqtt_config.tls_ca_certs, + self.mqtt_config.tls_client_cert, + self.mqtt_config.tls_client_key, + ) + else: + self.client.tls_set(self.mqtt_config.tls_ca_certs) + if not self.mqtt_config.tls_insecure is None: + self.client.tls_insecure_set(self.mqtt_config.tls_insecure) + if not self.mqtt_config.user is None: + self.client.username_pw_set( + self.mqtt_config.user, password=self.mqtt_config.password + ) + try: + # https://stackoverflow.com/a/55390477 + # with connect_async, retries are handled automatically + self.client.connect_async(self.mqtt_config.host, self.mqtt_config.port, 60) + self.client.loop_start() + except Exception as e: + logger.error(f"Unable to connect to MQTT server: {e}") + return diff --git a/frigate/comms/ws.py b/frigate/comms/ws.py new file mode 100644 index 000000000..0a3aea169 --- /dev/null +++ b/frigate/comms/ws.py @@ -0,0 +1,97 @@ +"""Websocket communicator.""" + +import json +import logging +import threading + +from typing import Callable + +from wsgiref.simple_server import make_server +from ws4py.server.wsgirefserver import ( + WebSocketWSGIHandler, + WebSocketWSGIRequestHandler, + WSGIServer, +) +from ws4py.server.wsgiutils import WebSocketWSGIApplication +from ws4py.websocket import WebSocket + +from frigate.comms.dispatcher import Communicator +from frigate.config import FrigateConfig + + +logger = logging.getLogger(__name__) + + +class WebSocketClient(Communicator): # type: ignore[misc] + """Frigate wrapper for ws client.""" + + def __init__(self, config: FrigateConfig) -> None: + self.config = config + + def subscribe(self, receiver: Callable) -> None: + self._dispatcher = receiver + self.start() + + def start(self) -> None: + """Start the websocket client.""" + + class _WebSocketHandler(WebSocket): # type: ignore[misc] + receiver = self._dispatcher + + def received_message(self, message: WebSocket.received_message) -> None: + try: + json_message = json.loads(message.data.decode("utf-8")) + json_message = { + "topic": json_message.get("topic"), + "payload": json_message.get("payload"), + } + except Exception as e: + logger.warning( + f"Unable to parse websocket message as valid json: {message.data.decode('utf-8')}" + ) + return + + logger.debug( + f"Publishing mqtt message from websockets at {json_message['topic']}." + ) + self.receiver( + json_message["topic"], + json_message["payload"], + ) + + # start a websocket server on 5002 + WebSocketWSGIHandler.http_version = "1.1" + self.websocket_server = make_server( + "127.0.0.1", + 5002, + server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=_WebSocketHandler), + ) + self.websocket_server.initialize_websockets_manager() + self.websocket_thread = threading.Thread( + target=self.websocket_server.serve_forever + ) + self.websocket_thread.start() + + def publish(self, topic: str, payload: str, _: bool) -> None: + try: + ws_message = json.dumps( + { + "topic": topic, + "payload": payload, + } + ) + except Exception as e: + # if the payload can't be decoded don't relay to clients + logger.debug(f"payload for {topic} wasn't text. Skipping...") + return + + self.websocket_server.manager.broadcast(ws_message) + + def stop(self) -> None: + self.websocket_server.manager.close_all() + self.websocket_server.manager.stop() + self.websocket_server.manager.join() + self.websocket_server.shutdown() + self.websocket_thread.join() diff --git a/frigate/config.py b/frigate/config.py index e52a81d35..a2f7f3e63 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -60,6 +60,7 @@ class UIConfig(FrigateBaseModel): class MqttConfig(FrigateBaseModel): + enabled: bool = Field(title="Enable MQTT Communication.", default=True) host: str = Field(title="MQTT Host") port: int = Field(default=1883, title="MQTT Port") topic_prefix: str = Field(default="frigate", title="MQTT Topic Prefix") diff --git a/frigate/mqtt.py b/frigate/mqtt.py deleted file mode 100644 index 53a073d5e..000000000 --- a/frigate/mqtt.py +++ /dev/null @@ -1,474 +0,0 @@ -import datetime -import json -import logging -import threading -from wsgiref.simple_server import make_server - -import paho.mqtt.client as mqtt -from ws4py.server.wsgirefserver import ( - WebSocketWSGIHandler, - WebSocketWSGIRequestHandler, - WSGIServer, -) -from ws4py.server.wsgiutils import WebSocketWSGIApplication -from ws4py.websocket import WebSocket - -from frigate.config import FrigateConfig -from frigate.types import CameraMetricsTypes -from frigate.util import restart_frigate - -logger = logging.getLogger(__name__) - - -class FrigateMqttClient: - """Frigate wrapper for mqtt client.""" - - def __init__( - self, config: FrigateConfig, camera_metrics: dict[str, CameraMetricsTypes] - ) -> None: - self.config = config - self.mqtt_config = config.mqtt - self.camera_metrics = camera_metrics - self.connected: bool = False - self._start() - - def _set_initial_topics(self) -> None: - """Set initial state topics.""" - for camera_name, camera in self.config.cameras.items(): - self.publish( - f"{self.mqtt_config.topic_prefix}/{camera_name}/recordings/state", - "ON" if camera.record.enabled else "OFF", - retain=True, - ) - self.publish( - f"{self.mqtt_config.topic_prefix}/{camera_name}/snapshots/state", - "ON" if camera.snapshots.enabled else "OFF", - retain=True, - ) - self.publish( - f"{self.mqtt_config.topic_prefix}/{camera_name}/detect/state", - "ON" if camera.detect.enabled else "OFF", - retain=True, - ) - self.publish( - f"{self.mqtt_config.topic_prefix}/{camera_name}/motion/state", - "ON", - retain=True, - ) - self.publish( - f"{self.mqtt_config.topic_prefix}/{camera_name}/improve_contrast/state", - "ON" if camera.motion.improve_contrast else "OFF", - retain=True, - ) - self.publish( - f"{self.mqtt_config.topic_prefix}/{camera_name}/motion_threshold/state", - camera.motion.threshold, - retain=True, - ) - self.publish( - f"{self.mqtt_config.topic_prefix}/{camera_name}/motion_contour_area/state", - camera.motion.contour_area, - retain=True, - ) - self.publish( - f"{self.mqtt_config.topic_prefix}/{camera_name}/motion", - "OFF", - retain=False, - ) - - self.publish( - self.mqtt_config.topic_prefix + "/available", "online", retain=True - ) - - def on_recordings_command( - self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage - ) -> None: - """Callback for recordings topic.""" - payload = message.payload.decode() - logger.debug(f"on_recordings_toggle: {message.topic} {payload}") - - camera_name = message.topic.split("/")[-3] - - record_settings = self.config.cameras[camera_name].record - - if payload == "ON": - if not record_settings.enabled: - logger.info(f"Turning on recordings for {camera_name} via mqtt") - record_settings.enabled = True - elif payload == "OFF": - if record_settings.enabled: - logger.info(f"Turning off recordings for {camera_name} via mqtt") - record_settings.enabled = False - else: - logger.warning(f"Received unsupported value at {message.topic}: {payload}") - - state_topic = f"{message.topic[:-4]}/state" - self.publish(state_topic, payload, retain=True) - - def on_snapshots_command( - self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage - ) -> None: - """Callback for snapshots topic.""" - payload = message.payload.decode() - logger.debug(f"on_snapshots_toggle: {message.topic} {payload}") - - camera_name = message.topic.split("/")[-3] - - snapshots_settings = self.config.cameras[camera_name].snapshots - - if payload == "ON": - if not snapshots_settings.enabled: - logger.info(f"Turning on snapshots for {camera_name} via mqtt") - snapshots_settings.enabled = True - elif payload == "OFF": - if snapshots_settings.enabled: - logger.info(f"Turning off snapshots for {camera_name} via mqtt") - snapshots_settings.enabled = False - else: - logger.warning(f"Received unsupported value at {message.topic}: {payload}") - - state_topic = f"{message.topic[:-4]}/state" - self.publish(state_topic, payload, retain=True) - - def on_detect_command( - self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage - ) -> None: - """Callback for detect topic.""" - payload = message.payload.decode() - logger.debug(f"on_detect_toggle: {message.topic} {payload}") - - camera_name = message.topic.split("/")[-3] - - detect_settings = self.config.cameras[camera_name].detect - - if payload == "ON": - if not self.camera_metrics[camera_name]["detection_enabled"].value: - logger.info(f"Turning on detection for {camera_name} via mqtt") - self.camera_metrics[camera_name]["detection_enabled"].value = True - detect_settings.enabled = True - - if not self.camera_metrics[camera_name]["motion_enabled"].value: - logger.info( - f"Turning on motion for {camera_name} due to detection being enabled." - ) - self.camera_metrics[camera_name]["motion_enabled"].value = True - state_topic = f"{message.topic[:-11]}/motion/state" - self.publish(state_topic, payload, retain=True) - elif payload == "OFF": - if self.camera_metrics[camera_name]["detection_enabled"].value: - logger.info(f"Turning off detection for {camera_name} via mqtt") - self.camera_metrics[camera_name]["detection_enabled"].value = False - detect_settings.enabled = False - else: - logger.warning(f"Received unsupported value at {message.topic}: {payload}") - - state_topic = f"{message.topic[:-4]}/state" - self.publish(state_topic, payload, retain=True) - - def on_motion_command( - self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage - ) -> None: - """Callback for motion topic.""" - payload = message.payload.decode() - logger.debug(f"on_motion_toggle: {message.topic} {payload}") - - camera_name = message.topic.split("/")[-3] - - if payload == "ON": - if not self.camera_metrics[camera_name]["motion_enabled"].value: - logger.info(f"Turning on motion for {camera_name} via mqtt") - self.camera_metrics[camera_name]["motion_enabled"].value = True - elif payload == "OFF": - if self.camera_metrics[camera_name]["detection_enabled"].value: - logger.error( - f"Turning off motion is not allowed when detection is enabled." - ) - return - - if self.camera_metrics[camera_name]["motion_enabled"].value: - logger.info(f"Turning off motion for {camera_name} via mqtt") - self.camera_metrics[camera_name]["motion_enabled"].value = False - else: - logger.warning(f"Received unsupported value at {message.topic}: {payload}") - - state_topic = f"{message.topic[:-4]}/state" - self.publish(state_topic, payload, retain=True) - - def on_improve_contrast_command( - self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage - ) -> None: - """Callback for improve_contrast topic.""" - payload = message.payload.decode() - logger.debug(f"on_improve_contrast_toggle: {message.topic} {payload}") - - camera_name = message.topic.split("/")[-3] - - motion_settings = self.config.cameras[camera_name].motion - - if payload == "ON": - if not self.camera_metrics[camera_name]["improve_contrast_enabled"].value: - logger.info(f"Turning on improve contrast for {camera_name} via mqtt") - self.camera_metrics[camera_name][ - "improve_contrast_enabled" - ].value = True - motion_settings.improve_contrast = True - elif payload == "OFF": - if self.camera_metrics[camera_name]["improve_contrast_enabled"].value: - logger.info(f"Turning off improve contrast for {camera_name} via mqtt") - self.camera_metrics[camera_name][ - "improve_contrast_enabled" - ].value = False - motion_settings.improve_contrast = False - else: - logger.warning(f"Received unsupported value at {message.topic}: {payload}") - - state_topic = f"{message.topic[:-4]}/state" - self.publish(state_topic, payload, retain=True) - - def on_motion_threshold_command( - self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage - ) -> None: - """Callback for motion threshold topic.""" - try: - payload = int(message.payload.decode()) - except ValueError: - logger.warning( - f"Received unsupported value at {message.topic}: {message.payload.decode()}" - ) - return - - logger.debug(f"on_motion_threshold_toggle: {message.topic} {payload}") - - camera_name = message.topic.split("/")[-3] - - motion_settings = self.config.cameras[camera_name].motion - - logger.info(f"Setting motion threshold for {camera_name} via mqtt: {payload}") - self.camera_metrics[camera_name]["motion_threshold"].value = payload - motion_settings.threshold = payload - - state_topic = f"{message.topic[:-4]}/state" - self.publish(state_topic, payload, retain=True) - - def on_motion_contour_area_command( - self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage - ) -> None: - """Callback for motion contour topic.""" - try: - payload = int(message.payload.decode()) - except ValueError: - logger.warning( - f"Received unsupported value at {message.topic}: {message.payload.decode()}" - ) - return - - logger.debug(f"on_motion_contour_area_toggle: {message.topic} {payload}") - - camera_name = message.topic.split("/")[-3] - - motion_settings = self.config.cameras[camera_name].motion - - logger.info( - f"Setting motion contour area for {camera_name} via mqtt: {payload}" - ) - self.camera_metrics[camera_name]["motion_contour_area"].value = payload - motion_settings.contour_area = payload - - state_topic = f"{message.topic[:-4]}/state" - self.publish(state_topic, payload, retain=True) - - def on_restart_command( - client: mqtt.Client, userdata, message: mqtt.MQTTMessage - ) -> None: - """Callback to restart frigate.""" - restart_frigate() - - def _on_connect(self, client: mqtt.Client, userdata, flags, rc) -> None: - """Mqtt connection callback.""" - threading.current_thread().name = "mqtt" - if rc != 0: - if rc == 3: - logger.error( - "Unable to connect to MQTT server: MQTT Server unavailable" - ) - elif rc == 4: - logger.error( - "Unable to connect to MQTT server: MQTT Bad username or password" - ) - elif rc == 5: - logger.error("Unable to connect to MQTT server: MQTT Not authorized") - else: - logger.error( - "Unable to connect to MQTT server: Connection refused. Error code: " - + str(rc) - ) - - self.connected = True - logger.debug("MQTT connected") - client.subscribe(f"{self.mqtt_config.topic_prefix}/#") - self._set_initial_topics() - - def _on_disconnect(self, client: mqtt.Client, userdata, flags, rc) -> None: - """Mqtt disconnection callback.""" - self.connected = False - logger.error("MQTT disconnected") - - def _start(self) -> None: - """Start mqtt client.""" - self.client = mqtt.Client(client_id=self.mqtt_config.client_id) - self.client.on_connect = self._on_connect - self.client.will_set( - self.mqtt_config.topic_prefix + "/available", - payload="offline", - qos=1, - retain=True, - ) - - # register callbacks - for name in self.config.cameras.keys(): - self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/{name}/recordings/set", - self.on_recordings_command, - ) - self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/{name}/snapshots/set", - self.on_snapshots_command, - ) - self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/{name}/detect/set", - self.on_detect_command, - ) - self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/{name}/motion/set", - self.on_motion_command, - ) - self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/{name}/improve_contrast/set", - self.on_improve_contrast_command, - ) - self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/{name}/motion_threshold/set", - self.on_motion_threshold_command, - ) - self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/{name}/motion_contour_area/set", - self.on_motion_contour_area_command, - ) - - self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/restart", self.on_restart_command - ) - - if not self.mqtt_config.tls_ca_certs is None: - if ( - not self.mqtt_config.tls_client_cert is None - and not self.mqtt_config.tls_client_key is None - ): - self.client.tls_set( - self.mqtt_config.tls_ca_certs, - self.mqtt_config.tls_client_cert, - self.mqtt_config.tls_client_key, - ) - else: - self.client.tls_set(self.mqtt_config.tls_ca_certs) - if not self.mqtt_config.tls_insecure is None: - self.client.tls_insecure_set(self.mqtt_config.tls_insecure) - if not self.mqtt_config.user is None: - self.client.username_pw_set( - self.mqtt_config.user, password=self.mqtt_config.password - ) - try: - # https://stackoverflow.com/a/55390477 - # with connect_async, retries are handled automatically - self.client.connect_async(self.mqtt_config.host, self.mqtt_config.port, 60) - self.client.loop_start() - except Exception as e: - logger.error(f"Unable to connect to MQTT server: {e}") - return - - def publish(self, topic: str, payload, retain: bool = False) -> None: - """Wrapper for publishing when client is in valid state.""" - if not self.connected: - logger.error(f"Unable to publish to {topic}: client is not connected") - return - - self.client.publish(topic, payload, retain=retain) - - def add_topic_callback(self, topic: str, callback) -> None: - self.client.message_callback_add(topic, callback) - - -class MqttSocketRelay: - def __init__(self, mqtt_client: FrigateMqttClient, topic_prefix: str): - self.mqtt_client = mqtt_client - self.topic_prefix = topic_prefix - - def start(self): - class MqttWebSocket(WebSocket): - topic_prefix = self.topic_prefix - mqtt_client = self.mqtt_client - - def received_message(self, message): - try: - json_message = json.loads(message.data.decode("utf-8")) - json_message = { - "topic": f"{self.topic_prefix}/{json_message['topic']}", - "payload": json_message.get("payload"), - "retain": json_message.get("retain", False), - } - except Exception as e: - logger.warning("Unable to parse websocket message as valid json.") - return - - logger.debug( - f"Publishing mqtt message from websockets at {json_message['topic']}." - ) - self.mqtt_client.publish( - json_message["topic"], - json_message["payload"], - retain=json_message["retain"], - ) - - # start a websocket server on 5002 - WebSocketWSGIHandler.http_version = "1.1" - self.websocket_server = make_server( - "127.0.0.1", - 5002, - server_class=WSGIServer, - handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=MqttWebSocket), - ) - self.websocket_server.initialize_websockets_manager() - self.websocket_thread = threading.Thread( - target=self.websocket_server.serve_forever - ) - - def send(client, userdata, message): - """Sends mqtt messages to clients.""" - try: - logger.debug(f"Received mqtt message on {message.topic}.") - ws_message = json.dumps( - { - "topic": message.topic.replace(f"{self.topic_prefix}/", ""), - "payload": message.payload.decode(), - } - ) - except Exception as e: - # if the payload can't be decoded don't relay to clients - logger.debug( - f"MQTT payload for {message.topic} wasn't text. Skipping..." - ) - return - - self.websocket_server.manager.broadcast(ws_message) - - self.mqtt_client.add_topic_callback(f"{self.topic_prefix}/#", send) - - self.websocket_thread.start() - - def stop(self): - self.websocket_server.manager.close_all() - self.websocket_server.manager.stop() - self.websocket_server.manager.join() - self.websocket_server.shutdown() - self.websocket_thread.join() diff --git a/frigate/mypy.ini b/frigate/mypy.ini index 4769ab96a..881ae6c82 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -12,7 +12,8 @@ enable_error_code = ignore-without-code check_untyped_defs = true disallow_incomplete_defs = true disallow_subclassing_any = true -disallow_untyped_calls = true +# https://github.com/python/mypy/issues/10757 +disallow_untyped_calls = false disallow_untyped_decorators = true disallow_untyped_defs = true no_implicit_optional = true diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 575e697e5..f87373569 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -12,6 +12,7 @@ from typing import Callable import cv2 import numpy as np +from frigate.comms.dispatcher import Dispatcher from frigate.config import ( CameraConfig, MqttConfig, @@ -20,7 +21,6 @@ from frigate.config import ( FrigateConfig, ) from frigate.const import CLIPS_DIR -from frigate.mqtt import FrigateMqttClient from frigate.util import ( SharedMemoryFrameManager, calculate_region, @@ -633,7 +633,7 @@ class TrackedObjectProcessor(threading.Thread): def __init__( self, config: FrigateConfig, - client: FrigateMqttClient, + dispatcher: Dispatcher, topic_prefix, tracked_objects_queue, event_queue, @@ -645,7 +645,7 @@ class TrackedObjectProcessor(threading.Thread): threading.Thread.__init__(self) self.name = "detected_frames_processor" self.config = config - self.client = client + self.dispatcher = dispatcher self.topic_prefix = topic_prefix self.tracked_objects_queue = tracked_objects_queue self.event_queue = event_queue @@ -669,7 +669,7 @@ class TrackedObjectProcessor(threading.Thread): "after": after, "type": "new" if obj.previous["false_positive"] else "update", } - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/events", json.dumps(message), retain=False ) obj.previous = after @@ -724,7 +724,7 @@ class TrackedObjectProcessor(threading.Thread): "after": obj.to_dict(), "type": "end", } - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/events", json.dumps(message), retain=False ) @@ -746,14 +746,14 @@ class TrackedObjectProcessor(threading.Thread): f"Unable to send mqtt snapshot for {obj.obj_data['id']}." ) else: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{camera}/{obj.obj_data['label']}/snapshot", jpg_bytes, retain=True, ) def object_status(camera, object_name, status): - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{camera}/{object_name}", status, retain=False ) @@ -853,7 +853,7 @@ class TrackedObjectProcessor(threading.Thread): if motion_boxes: # only send ON if motion isn't already active if self.last_motion_detected.get(camera, 0) == 0: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{camera}/motion", "ON", retain=False, @@ -866,7 +866,7 @@ class TrackedObjectProcessor(threading.Thread): # If no motion, make sure the off_delay has passed if frame_time - self.last_motion_detected.get(camera, 0) >= mqtt_delay: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{camera}/motion", "OFF", retain=False, @@ -962,7 +962,7 @@ class TrackedObjectProcessor(threading.Thread): ) new_count = sum(zone_label.values()) if new_count != current_count: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{zone}/{label}", new_count, retain=False, @@ -975,7 +975,7 @@ class TrackedObjectProcessor(threading.Thread): else: if label in obj_counter: zone_label[camera] = obj_counter[label] - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{zone}/{label}", obj_counter[label], retain=False, @@ -992,7 +992,7 @@ class TrackedObjectProcessor(threading.Thread): new_count = sum(zone_label.values()) if new_count != current_count: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{zone}/all", new_count, retain=False, @@ -1000,7 +1000,7 @@ class TrackedObjectProcessor(threading.Thread): # if this is a new zone all label for this camera else: zone_label[camera] = total_label_count - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{zone}/all", total_label_count, retain=False, diff --git a/frigate/stats.py b/frigate/stats.py index e8b278e27..23ac02172 100644 --- a/frigate/stats.py +++ b/frigate/stats.py @@ -9,9 +9,9 @@ import requests from typing import Optional, Any from multiprocessing.synchronize import Event as MpEvent +from frigate.comms.dispatcher import Dispatcher from frigate.config import FrigateConfig from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR -from frigate.mqtt import FrigateMqttClient from frigate.types import StatsTrackingTypes, CameraMetricsTypes from frigate.version import VERSION from frigate.util import get_cpu_stats @@ -146,7 +146,7 @@ class StatsEmitter(threading.Thread): self, config: FrigateConfig, stats_tracking: StatsTrackingTypes, - mqtt_client: FrigateMqttClient, + dispatcher: Dispatcher, topic_prefix: str, stop_event: MpEvent, ): @@ -154,7 +154,7 @@ class StatsEmitter(threading.Thread): self.name = "frigate_stats_emitter" self.config = config self.stats_tracking = stats_tracking - self.mqtt_client = mqtt_client + self.dispatcher = dispatcher self.topic_prefix = topic_prefix self.stop_event = stop_event @@ -162,7 +162,7 @@ class StatsEmitter(threading.Thread): time.sleep(10) while not self.stop_event.wait(self.config.mqtt.stats_interval): stats = stats_snapshot(self.stats_tracking) - self.mqtt_client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/stats", json.dumps(stats), retain=False ) logger.info(f"Exiting watchdog...") diff --git a/web/src/AppBar.jsx b/web/src/AppBar.jsx index 09fc2e9d8..95a6bfbf2 100644 --- a/web/src/AppBar.jsx +++ b/web/src/AppBar.jsx @@ -9,7 +9,7 @@ import FrigateRestartIcon from './icons/FrigateRestart'; import Prompt from './components/Prompt'; import { useDarkMode } from './context'; import { useCallback, useRef, useState } from 'preact/hooks'; -import { useRestart } from './api/mqtt'; +import { useRestart } from './api/ws'; export default function AppBar() { const [showMoreMenu, setShowMoreMenu] = useState(false); diff --git a/web/src/api/__tests__/index.test.jsx b/web/src/api/__tests__/index.test.jsx index 4c3b18fe2..3e803c3a5 100644 --- a/web/src/api/__tests__/index.test.jsx +++ b/web/src/api/__tests__/index.test.jsx @@ -1,11 +1,11 @@ import { h } from 'preact'; -import * as Mqtt from '../mqtt'; +import * as WS from '../ws'; import { ApiProvider, useApiHost } from '..'; import { render, screen } from 'testing-library'; describe('useApiHost', () => { beforeEach(() => { - vi.spyOn(Mqtt, 'MqttProvider').mockImplementation(({ children }) => children); + vi.spyOn(WS, 'WsProvider').mockImplementation(({ children }) => children); }); test('is set from the baseUrl', async () => { diff --git a/web/src/api/__tests__/mqtt.test.jsx b/web/src/api/__tests__/ws.test.jsx similarity index 80% rename from web/src/api/__tests__/mqtt.test.jsx rename to web/src/api/__tests__/ws.test.jsx index b97631abc..3b0e3420a 100644 --- a/web/src/api/__tests__/mqtt.test.jsx +++ b/web/src/api/__tests__/ws.test.jsx @@ -1,10 +1,10 @@ import { h } from 'preact'; -import { Mqtt, MqttProvider, useMqtt } from '../mqtt'; +import { WS, WsProvider, useWs } from '../ws'; import { useCallback, useContext } from 'preact/hooks'; import { fireEvent, render, screen } from 'testing-library'; function Test() { - const { state } = useContext(Mqtt); + const { state } = useContext(WS); return state.__connected ? (
{Object.keys(state).map((key) => ( @@ -18,7 +18,7 @@ function Test() { const TEST_URL = 'ws://test-foo:1234/ws'; -describe('MqttProvider', () => { +describe('WsProvider', () => { let createWebsocket, wsClient; beforeEach(() => { wsClient = { @@ -45,23 +45,23 @@ describe('MqttProvider', () => { }); }); - test('connects to the mqtt server', async () => { + test('connects to the ws server', async () => { render( - + - + ); await screen.findByTestId('data'); expect(wsClient.args).toEqual([TEST_URL]); expect(screen.getByTestId('__connected')).toHaveTextContent('true'); }); - test('receives data through useMqtt', async () => { + test('receives data through useWs', async () => { function Test() { const { value: { payload, retain }, connected, - } = useMqtt('tacos'); + } = useWs('tacos'); return connected ? (
{JSON.stringify(payload)}
@@ -71,26 +71,26 @@ describe('MqttProvider', () => { } const { rerender } = render( - + - + ); await screen.findByTestId('payload'); wsClient.onmessage({ data: JSON.stringify({ topic: 'tacos', payload: JSON.stringify({ yes: true }), retain: false }), }); rerender( - + - + ); expect(screen.getByTestId('payload')).toHaveTextContent('{"yes":true}'); expect(screen.getByTestId('retain')).toHaveTextContent('false'); }); - test('can send values through useMqtt', async () => { + test('can send values through useWs', async () => { function Test() { - const { send, connected } = useMqtt('tacos'); + const { send, connected } = useWs('tacos'); const handleClick = useCallback(() => { send({ yes: true }); }, [send]); @@ -98,9 +98,9 @@ describe('MqttProvider', () => { } render( - + - + ); await screen.findByRole('button'); fireEvent.click(screen.getByRole('button')); @@ -118,9 +118,9 @@ describe('MqttProvider', () => { }, }; render( - + - + ); await screen.findByTestId('data'); expect(screen.getByTestId('front/detect/state')).toHaveTextContent( diff --git a/web/src/api/index.jsx b/web/src/api/index.jsx index 00c78db48..9f256dbbb 100644 --- a/web/src/api/index.jsx +++ b/web/src/api/index.jsx @@ -1,7 +1,7 @@ import { h } from 'preact'; import { baseUrl } from './baseUrl'; import useSWR, { SWRConfig } from 'swr'; -import { MqttProvider } from './mqtt'; +import { WsProvider } from './ws'; import axios from 'axios'; axios.defaults.baseURL = `${baseUrl}api/`; @@ -14,14 +14,14 @@ export function ApiProvider({ children, options }) { ...options, }} > - {children} + {children} ); } -function MqttWithConfig({ children }) { +function WsWithConfig({ children }) { const { data } = useSWR('config'); - return data ? {children} : children; + return data ? {children} : children; } export function useApiHost() { diff --git a/web/src/api/mqtt.jsx b/web/src/api/ws.jsx similarity index 79% rename from web/src/api/mqtt.jsx rename to web/src/api/ws.jsx index 600cd1fc1..734200215 100644 --- a/web/src/api/mqtt.jsx +++ b/web/src/api/ws.jsx @@ -4,7 +4,7 @@ import produce from 'immer'; import { useCallback, useContext, useEffect, useRef, useReducer } from 'preact/hooks'; const initialState = Object.freeze({ __connected: false }); -export const Mqtt = createContext({ state: initialState, connection: null }); +export const WS = createContext({ state: initialState, connection: null }); const defaultCreateWebsocket = (url) => new WebSocket(url); @@ -30,11 +30,11 @@ function reducer(state, { topic, payload, retain }) { } } -export function MqttProvider({ +export function WsProvider({ config, children, createWebsocket = defaultCreateWebsocket, - mqttUrl = `${baseUrl.replace(/^http/, 'ws')}ws`, + wsUrl = `${baseUrl.replace(/^http/, 'ws')}ws`, }) { const [state, dispatch] = useReducer(reducer, initialState); const wsRef = useRef(); @@ -50,7 +50,7 @@ export function MqttProvider({ useEffect( () => { - const ws = createWebsocket(mqttUrl); + const ws = createWebsocket(wsUrl); ws.onopen = () => { dispatch({ topic: '__CLIENT_CONNECTED' }); }; @@ -66,14 +66,14 @@ export function MqttProvider({ }; }, // Forces reconnecting - [state.__reconnectAttempts, mqttUrl] // eslint-disable-line react-hooks/exhaustive-deps + [state.__reconnectAttempts, wsUrl] // eslint-disable-line react-hooks/exhaustive-deps ); - return {children}; + return {children}; } -export function useMqtt(watchTopic, publishTopic) { - const { state, ws } = useContext(Mqtt); +export function useWs(watchTopic, publishTopic) { + const { state, ws } = useContext(WS); const value = state[watchTopic] || { payload: null }; @@ -98,7 +98,7 @@ export function useDetectState(camera) { value: { payload }, send, connected, - } = useMqtt(`${camera}/detect/state`, `${camera}/detect/set`); + } = useWs(`${camera}/detect/state`, `${camera}/detect/set`); return { payload, send, connected }; } @@ -107,7 +107,7 @@ export function useRecordingsState(camera) { value: { payload }, send, connected, - } = useMqtt(`${camera}/recordings/state`, `${camera}/recordings/set`); + } = useWs(`${camera}/recordings/state`, `${camera}/recordings/set`); return { payload, send, connected }; } @@ -116,7 +116,7 @@ export function useSnapshotsState(camera) { value: { payload }, send, connected, - } = useMqtt(`${camera}/snapshots/state`, `${camera}/snapshots/set`); + } = useWs(`${camera}/snapshots/state`, `${camera}/snapshots/set`); return { payload, send, connected }; } @@ -125,6 +125,6 @@ export function useRestart() { value: { payload }, send, connected, - } = useMqtt('restart', 'restart'); + } = useWs('restart', 'restart'); return { payload, send, connected }; } diff --git a/web/src/context/__tests__/index.test.jsx b/web/src/context/__tests__/index.test.jsx index 844fb6e8e..a02383a16 100644 --- a/web/src/context/__tests__/index.test.jsx +++ b/web/src/context/__tests__/index.test.jsx @@ -3,7 +3,7 @@ import { set as setData } from 'idb-keyval'; import { DarkModeProvider, useDarkMode, usePersistence } from '..'; import { fireEvent, render, screen } from 'testing-library'; import { useCallback } from 'preact/hooks'; -import * as Mqtt from '../../api/mqtt'; +import * as WS from '../../api/ws'; function DarkModeChecker() { const { currentMode } = useDarkMode(); @@ -12,7 +12,7 @@ function DarkModeChecker() { describe('DarkMode', () => { beforeEach(() => { - vi.spyOn(Mqtt, 'MqttProvider').mockImplementation(({ children }) => children); + vi.spyOn(WS, 'WsProvider').mockImplementation(({ children }) => children); }); test('uses media by default', async () => { diff --git a/web/src/routes/Cameras.jsx b/web/src/routes/Cameras.jsx index 7e75d1da7..d195a5bdc 100644 --- a/web/src/routes/Cameras.jsx +++ b/web/src/routes/Cameras.jsx @@ -5,7 +5,7 @@ import CameraImage from '../components/CameraImage'; import ClipIcon from '../icons/Clip'; import MotionIcon from '../icons/Motion'; import SnapshotIcon from '../icons/Snapshot'; -import { useDetectState, useRecordingsState, useSnapshotsState } from '../api/mqtt'; +import { useDetectState, useRecordingsState, useSnapshotsState } from '../api/ws'; import { useMemo } from 'preact/hooks'; import useSWR from 'swr'; diff --git a/web/src/routes/System.jsx b/web/src/routes/System.jsx index 96b3523c2..964e8f47a 100644 --- a/web/src/routes/System.jsx +++ b/web/src/routes/System.jsx @@ -3,7 +3,7 @@ import ActivityIndicator from '../components/ActivityIndicator'; import Button from '../components/Button'; import Heading from '../components/Heading'; import Link from '../components/Link'; -import { useMqtt } from '../api/mqtt'; +import { useWs } from '../api/ws'; import useSWR from 'swr'; import axios from 'axios'; import { Table, Tbody, Thead, Tr, Th, Td } from '../components/Table'; @@ -18,7 +18,7 @@ export default function System() { const { value: { payload: stats }, - } = useMqtt('stats'); + } = useWs('stats'); const { data: initialStats } = useSWR('stats'); const { cpu_usages, detectors, service = {}, detection_fps: _, ...cameras } = stats || initialStats || emptyObject; diff --git a/web/src/routes/__tests__/Camera.test.jsx b/web/src/routes/__tests__/Camera.test.jsx index 3a32735fc..ef679681c 100644 --- a/web/src/routes/__tests__/Camera.test.jsx +++ b/web/src/routes/__tests__/Camera.test.jsx @@ -1,6 +1,6 @@ import { h } from 'preact'; import * as AutoUpdatingCameraImage from '../../components/AutoUpdatingCameraImage'; -import * as Mqtt from '../../api/mqtt'; +import * as WS from '../../api/ws'; import Camera from '../Camera'; import { set as setData } from 'idb-keyval'; import * as JSMpegPlayer from '../../components/JSMpegPlayer'; @@ -14,7 +14,7 @@ describe('Camera Route', () => { vi.spyOn(JSMpegPlayer, 'default').mockImplementation(() => { return
; }); - vi.spyOn(Mqtt, 'MqttProvider').mockImplementation(({ children }) => children); + vi.spyOn(WS, 'WsProvider').mockImplementation(({ children }) => children); }); // eslint-disable-next-line jest/no-disabled-tests diff --git a/web/src/routes/__tests__/Cameras.test.jsx b/web/src/routes/__tests__/Cameras.test.jsx index 28673d12b..7dfaa8d53 100644 --- a/web/src/routes/__tests__/Cameras.test.jsx +++ b/web/src/routes/__tests__/Cameras.test.jsx @@ -1,13 +1,13 @@ import { h } from 'preact'; import * as CameraImage from '../../components/CameraImage'; -import * as Mqtt from '../../api/mqtt'; +import * as WS from '../../api/ws'; import Cameras from '../Cameras'; import { fireEvent, render, screen, waitForElementToBeRemoved } from 'testing-library'; describe('Cameras Route', () => { beforeEach(() => { vi.spyOn(CameraImage, 'default').mockImplementation(() =>
); - vi.spyOn(Mqtt, 'useMqtt').mockImplementation(() => ({ value: { payload: 'OFF' }, send: vi.fn() })); + vi.spyOn(WS, 'useWs').mockImplementation(() => ({ value: { payload: 'OFF' }, send: vi.fn() })); }); test('shows an ActivityIndicator if not yet loaded', async () => { @@ -39,13 +39,13 @@ describe('Cameras Route', () => { const sendDetect = vi.fn(); const sendRecordings = vi.fn(); const sendSnapshots = vi.fn(); - vi.spyOn(Mqtt, 'useDetectState').mockImplementation(() => { + vi.spyOn(WS, 'useDetectState').mockImplementation(() => { return { payload: 'ON', send: sendDetect }; }); - vi.spyOn(Mqtt, 'useRecordingsState').mockImplementation(() => { + vi.spyOn(WS, 'useRecordingsState').mockImplementation(() => { return { payload: 'OFF', send: sendRecordings }; }); - vi.spyOn(Mqtt, 'useSnapshotsState').mockImplementation(() => { + vi.spyOn(WS, 'useSnapshotsState').mockImplementation(() => { return { payload: 'ON', send: sendSnapshots }; }); diff --git a/web/src/routes/__tests__/Recording.test.jsx b/web/src/routes/__tests__/Recording.test.jsx index de71ec625..8dc33fdaf 100644 --- a/web/src/routes/__tests__/Recording.test.jsx +++ b/web/src/routes/__tests__/Recording.test.jsx @@ -1,13 +1,13 @@ import { h } from 'preact'; import * as CameraImage from '../../components/CameraImage'; -import * as Mqtt from '../../api/mqtt'; +import * as WS from '../../api/ws'; import Cameras from '../Cameras'; import { render, screen, waitForElementToBeRemoved } from 'testing-library'; describe('Recording Route', () => { beforeEach(() => { vi.spyOn(CameraImage, 'default').mockImplementation(() =>
); - vi.spyOn(Mqtt, 'useMqtt').mockImplementation(() => ({ value: { payload: 'OFF' }, send: jest.fn() })); + vi.spyOn(WS, 'useWs').mockImplementation(() => ({ value: { payload: 'OFF' }, send: jest.fn() })); }); test('shows an ActivityIndicator if not yet loaded', async () => {