diff --git a/frigate/http.py b/frigate/http.py index 752279a96..12e73f102 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -43,7 +43,7 @@ class MqttBackend(): json_message = json.loads(message) json_message = { 'topic': f"{self.topic_prefix}/{json_message['topic']}", - 'payload': json_message.get['payload'], + 'payload': json_message['payload'], 'retain': json_message.get('retain', False) } except: @@ -73,7 +73,7 @@ class MqttBackend(): except: logger.debug("Removing websocket client due to a closed connection.") self.clients.remove(client) - + self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) def start(self): diff --git a/web/src/api/__tests__/mqtt.test.jsx b/web/src/api/__tests__/mqtt.test.jsx new file mode 100644 index 000000000..23328ca24 --- /dev/null +++ b/web/src/api/__tests__/mqtt.test.jsx @@ -0,0 +1,109 @@ +import { h } from 'preact'; +import { Mqtt, MqttProvider, useMqtt } from '../mqtt'; +import { useCallback, useContext } from 'preact/hooks'; +import { fireEvent, render, screen } from '@testing-library/preact'; + +function Test() { + const { state } = useContext(Mqtt); + return state.__connected ? ( +
+ {Object.keys(state).map((key) => ( +
{JSON.stringify(state[key])}
+ ))} +
+ ) : null; +} + +const TEST_URL = 'ws://test-foo:1234/ws'; + +describe('MqttProvider', () => { + let createWebsocket, wsClient; + beforeEach(() => { + wsClient = { + close: jest.fn(), + send: jest.fn(), + }; + createWebsocket = jest.fn((url) => { + wsClient.args = [url]; + return new Proxy( + {}, + { + get(target, prop, receiver) { + return wsClient[prop]; + }, + set(target, prop, value) { + wsClient[prop] = typeof value === 'function' ? jest.fn(value) : value; + if (prop === 'onopen') { + wsClient[prop](); + } + return true; + }, + } + ); + }); + }); + + test('connects to the mqtt server', async () => { + render( + + + + ); + await screen.findByTestId('data'); + expect(wsClient.args).toEqual([TEST_URL]); + expect(screen.getByTestId('__connected')).toHaveTextContent('true'); + }); + + test('receives data through useMqtt', async () => { + function Test() { + const { + value: { payload, retain }, + connected, + } = useMqtt('tacos'); + return connected ? ( +
+
{JSON.stringify(payload)}
+
{JSON.stringify(retain)}
+
+ ) : null; + } + + const { rerender } = render( + + + + ); + await screen.findByTestId('payload'); + wsClient.onmessage({ + data: JSON.stringify({ topic: 'tacos', payload: JSON.stringify({ yes: true }), retain: false }), + }); + rerender( + + + + ); + expect(screen.getByTestId('payload')).toHaveTextContent('{"yes":true}'); + expect(screen.getByTestId('retain')).toHaveTextContent('false'); + }); + + test('can send values through useMqtt', async () => { + function Test() { + const { send, connected } = useMqtt('tacos'); + const handleClick = useCallback(() => { + send({ yes: true }); + }, [send]); + return connected ? : null; + } + + render( + + + + ); + await screen.findByRole('button'); + fireEvent.click(screen.getByRole('button')); + await expect(wsClient.send).toHaveBeenCalledWith( + JSON.stringify({ topic: 'tacos', payload: JSON.stringify({ yes: true }) }) + ); + }); +}); diff --git a/web/src/api/baseUrl.js b/web/src/api/baseUrl.js index 5746acd32..dc2303d87 100644 --- a/web/src/api/baseUrl.js +++ b/web/src/api/baseUrl.js @@ -1,2 +1,2 @@ import { API_HOST } from '../env'; -export const baseUrl = API_HOST || window.baseUrl || ''; +export const baseUrl = API_HOST || window.baseUrl || `${window.location.protocol}//${window.location.host}`; diff --git a/web/src/api/index.jsx b/web/src/api/index.jsx index 53261df73..6084aca11 100644 --- a/web/src/api/index.jsx +++ b/web/src/api/index.jsx @@ -1,5 +1,6 @@ import { baseUrl } from './baseUrl'; import { h, createContext } from 'preact'; +import { MqttProvider } from './mqtt'; import produce from 'immer'; import { useContext, useEffect, useReducer } from 'preact/hooks'; @@ -41,7 +42,11 @@ function reducer(state, { type, payload, meta }) { export const ApiProvider = ({ children }) => { const [state, dispatch] = useReducer(reducer, initialState); - return {children}; + return ( + + {children} + + ); }; function shouldFetch(state, url, fetchId = null) { diff --git a/web/src/api/mqtt.jsx b/web/src/api/mqtt.jsx new file mode 100644 index 000000000..c582d201a --- /dev/null +++ b/web/src/api/mqtt.jsx @@ -0,0 +1,78 @@ +import { h, createContext } from 'preact'; +import { baseUrl } from './baseUrl'; +import produce from 'immer'; +import { useCallback, useContext, useEffect, useRef, useReducer } from 'preact/hooks'; + +const initialState = Object.freeze({ __connected: false }); +export const Mqtt = createContext({ state: initialState, connection: null }); + +const defaultCreateWebsocket = (url) => new WebSocket(url); + +function reducer(state, { topic, payload, retain }) { + switch (topic) { + case '__CLIENT_CONNECTED': + return produce(state, (draftState) => { + draftState.__connected = true; + }); + + default: + return produce(state, (draftState) => { + let parsedPayload = payload; + try { + parsedPayload = payload && JSON.parse(payload); + } catch (e) {} + draftState[topic] = { + lastUpdate: Date.now(), + payload: parsedPayload, + retain, + }; + }); + } +} + +export function MqttProvider({ + children, + createWebsocket = defaultCreateWebsocket, + mqttUrl = `${baseUrl.replace(/^https?:/, 'ws:')}/ws`, +}) { + const [state, dispatch] = useReducer(reducer, initialState); + const wsRef = useRef(); + + useEffect( + () => { + const ws = createWebsocket(mqttUrl); + ws.onopen = () => { + dispatch({ topic: '__CLIENT_CONNECTED' }); + }; + + ws.onmessage = (event) => { + dispatch(JSON.parse(event.data)); + }; + + wsRef.current = ws; + + return () => { + ws.close(3000, 'Provider destroyed'); + }; + }, + // Forces reconnecting + [state.__reconnectAttempts, mqttUrl] // eslint-disable-line react-hooks/exhaustive-deps + ); + + return {children}; +} + +export function useMqtt(topic) { + const { state, ws } = useContext(Mqtt); + + const value = state[topic] || { payload: null }; + + const send = useCallback( + (payload) => { + ws.send(JSON.stringify({ topic, payload: typeof payload !== 'string' ? JSON.stringify(payload) : payload })); + }, + [ws, topic] + ); + + return { value, send, connected: state.__connected }; +} diff --git a/web/src/routes/Debug.jsx b/web/src/routes/Debug.jsx index 81ce75b42..f77ddf56d 100644 --- a/web/src/routes/Debug.jsx +++ b/web/src/routes/Debug.jsx @@ -1,36 +1,24 @@ -import { h } from 'preact'; +import { h, Fragment } from 'preact'; import ActivityIndicator from '../components/ActivityIndicator'; import Button from '../components/Button'; import Heading from '../components/Heading'; import Link from '../components/Link'; +import { useMqtt } from '../api/mqtt'; import { useConfig, useStats } from '../api'; import { Table, Tbody, Thead, Tr, Th, Td } from '../components/Table'; -import { useCallback, useEffect, useState } from 'preact/hooks'; +import { useCallback } from 'preact/hooks'; const emptyObject = Object.freeze({}); export default function Debug() { - const config = useConfig(); + const { data: config } = useConfig(); - const [timeoutId, setTimeoutId] = useState(null); - const { data: stats } = useStats(null, timeoutId); + const { + value: { stats }, + } = useMqtt('stats'); + const { data: initialStats } = useStats(); - const forceUpdate = useCallback(() => { - const timeoutId = setTimeout(forceUpdate, 1000); - setTimeoutId(timeoutId); - }, []); - - useEffect(() => { - forceUpdate(); - }, [forceUpdate]); - - useEffect(() => { - return () => { - clearTimeout(timeoutId); - }; - }, [timeoutId]); - - const { detectors, service, detection_fps, ...cameras } = stats || emptyObject; + const { detectors, service = {}, detection_fps, ...cameras } = stats || initialStats || emptyObject; const detectorNames = Object.keys(detectors || emptyObject); const detectorDataKeys = Object.keys(detectors ? detectors[detectorNames[0]] : emptyObject); @@ -44,61 +32,69 @@ export default function Debug() { copy(); }, [config]); - return stats === null ? ( - - ) : ( + return (
Debug {service.version} -
- - - - - {detectorDataKeys.map((name) => ( - - ))} - - - - {detectorNames.map((detector, i) => ( - - - {detectorDataKeys.map((name) => ( - + {!detectors ? ( +
+ +
+ ) : ( + +
+
detector{name.replace('_', ' ')}
{detector}{detectors[detector][name]}
+ + + + {detectorDataKeys.map((name) => ( + + ))} + + + + {detectorNames.map((detector, i) => ( + + + {detectorDataKeys.map((name) => ( + + ))} + ))} - - ))} - -
detector{name.replace('_', ' ')}
{detector}{detectors[detector][name]}
-
+ + +
-
- - - - - {cameraDataKeys.map((name) => ( - - ))} - - - - {cameraNames.map((camera, i) => ( - - - {cameraDataKeys.map((name) => ( - +
+
camera{name.replace('_', ' ')}
- {camera} - {cameras[camera][name]}
+ + + + {cameraDataKeys.map((name) => ( + + ))} + + + + {cameraNames.map((camera, i) => ( + + + {cameraDataKeys.map((name) => ( + + ))} + ))} - - ))} - -
camera{name.replace('_', ' ')}
+ {camera} + {cameras[camera][name]}
-
+ + + + +

Debug stats update automatically every {config.mqtt.stats_interval} seconds.

+ + )}
Config