diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index 1ff0c5e0b..a8d9eae4c 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -38,9 +38,6 @@ RUN pip3 install \ peewee_migrate \ zeroconf \ voluptuous\ - Flask-Sockets \ - gevent \ - gevent-websocket \ ws4py COPY --from=nginx /usr/local/nginx/ /usr/local/nginx/ diff --git a/docker/Dockerfile.wheels b/docker/Dockerfile.wheels index d81795673..a6fa222ec 100644 --- a/docker/Dockerfile.wheels +++ b/docker/Dockerfile.wheels @@ -34,8 +34,7 @@ RUN pip3 wheel --wheel-dir=/wheels \ matplotlib \ click \ setproctitle \ - peewee \ - gevent + peewee FROM scratch diff --git a/docker/rootfs/usr/local/nginx/conf/nginx.conf b/docker/rootfs/usr/local/nginx/conf/nginx.conf index 3c5ccec3c..7e372d722 100644 --- a/docker/rootfs/usr/local/nginx/conf/nginx.conf +++ b/docker/rootfs/usr/local/nginx/conf/nginx.conf @@ -33,6 +33,11 @@ http { keepalive 1024; } + upstream mqtt_ws { + server localhost:5002; + keepalive 1024; + } + upstream jsmpeg { server localhost:8082; keepalive 1024; @@ -139,7 +144,7 @@ http { } location /ws { - proxy_pass http://frigate_api/ws; + proxy_pass http://mqtt_ws/; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; diff --git a/frigate/app.py b/frigate/app.py index b2eac50db..86aacfec2 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -2,26 +2,25 @@ import json import logging import multiprocessing as mp import os +import signal +import sys +import threading from logging.handlers import QueueHandler from typing import Dict, List -import sys -import signal import yaml -from gevent import pywsgi -from geventwebsocket.handler import WebSocketHandler from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase from frigate.config import FrigateConfig -from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR +from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.edgetpu import EdgeTPUProcess -from frigate.events import EventProcessor, EventCleanup +from frigate.events import EventCleanup, EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer from frigate.models import Event, Recordings -from frigate.mqtt import create_mqtt_client +from frigate.mqtt import create_mqtt_client, MqttSocketRelay from frigate.object_processing import TrackedObjectProcessor from frigate.output import output_frames from frigate.record import RecordingMaintainer @@ -121,8 +120,8 @@ class FrigateApp: for log, level in self.config.logger.logs.items(): logging.getLogger(log).setLevel(level) - if not "geventwebsocket.handler" in self.config.logger.logs: - logging.getLogger("geventwebsocket.handler").setLevel("ERROR") + if not "werkzeug" in self.config.logger.logs: + logging.getLogger("werkzeug").setLevel("ERROR") def init_queues(self): # Queues for clip processing @@ -166,12 +165,18 @@ class FrigateApp: self.db, self.stats_tracking, self.detected_frames_processor, - self.mqtt_client, + # self.mqtt_client, ) def init_mqtt(self): self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics) + def start_mqtt_relay(self): + self.mqtt_relay = MqttSocketRelay( + self.mqtt_client, self.config.mqtt.topic_prefix + ) + self.mqtt_relay.start() + def start_detectors(self): model_shape = (self.config.model.height, self.config.model.width) for name in self.config.cameras.keys(): @@ -267,10 +272,6 @@ class FrigateApp: capture_process.start() logger.info(f"Capture process started for {name}: {capture_process.pid}") - def start_birdseye_outputter(self): - self.birdseye_outputter = BirdsEyeFrameOutputter(self.stop_event) - self.birdseye_outputter.start() - def start_event_processor(self): self.event_processor = EventProcessor( self.config, @@ -330,6 +331,7 @@ class FrigateApp: self.start_camera_capture_processes() self.init_stats() self.init_web_server() + self.start_mqtt_relay() self.start_event_processor() self.start_event_cleanup() self.start_recording_maintainer() @@ -343,12 +345,8 @@ class FrigateApp: signal.signal(signal.SIGTERM, receiveSignal) - server = pywsgi.WSGIServer( - ("127.0.0.1", 5001), self.flask_app, handler_class=WebSocketHandler - ) - try: - server.serve_forever() + self.flask_app.run(host="127.0.0.1", port=5001, debug=False) except KeyboardInterrupt: pass @@ -358,6 +356,7 @@ class FrigateApp: logger.info(f"Stopping...") self.stop_event.set() + self.mqtt_relay.stop() self.detected_frames_processor.join() self.event_processor.join() self.event_cleanup.join() diff --git a/frigate/http.py b/frigate/http.py index 9575ab707..3b962f16f 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -11,7 +11,7 @@ from functools import reduce from pathlib import Path import cv2 -import gevent + import numpy as np from flask import ( Blueprint, @@ -22,7 +22,7 @@ from flask import ( make_response, request, ) -from flask_sockets import Sockets + from peewee import SqliteDatabase, operator, fn, DoesNotExist, Value from playhouse.shortcuts import model_to_dict @@ -35,74 +35,6 @@ from frigate.version import VERSION logger = logging.getLogger(__name__) bp = Blueprint("frigate", __name__) -ws = Blueprint("ws", __name__) - - -class MqttBackend: - """Interface for registering and updating WebSocket clients.""" - - def __init__(self, mqtt_client, topic_prefix): - self.clients = list() - self.mqtt_client = mqtt_client - self.topic_prefix = topic_prefix - - def register(self, client): - """Register a WebSocket connection for Mqtt updates.""" - self.clients.append(client) - - def publish(self, message): - try: - json_message = json.loads(message) - json_message = { - "topic": f"{self.topic_prefix}/{json_message['topic']}", - "payload": json_message["payload"], - "retain": json_message.get("retain", False), - } - except: - logger.warning("Unable to parse websocket message as valid json.") - return - - logger.debug( - f"Publishing mqtt message from websockets at {json_message['topic']}." - ) - self.mqtt_client.publish( - json_message["topic"], - json_message["payload"], - retain=json_message["retain"], - ) - - def run(self): - def send(client, userdata, message): - """Sends mqtt messages to clients.""" - try: - logger.debug(f"Received mqtt message on {message.topic}.") - ws_message = json.dumps( - { - "topic": message.topic.replace(f"{self.topic_prefix}/", ""), - "payload": message.payload.decode(), - } - ) - except: - # if the payload can't be decoded don't relay to clients - logger.debug( - f"MQTT payload for {message.topic} wasn't text. Skipping..." - ) - return - - for client in self.clients: - try: - client.send(ws_message) - except: - logger.debug( - "Removing websocket client due to a closed connection." - ) - self.clients.remove(client) - - self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) - - def start(self): - """Maintains mqtt subscription in the background.""" - gevent.spawn(self.run) def create_app( @@ -110,10 +42,8 @@ def create_app( database: SqliteDatabase, stats_tracking, detected_frames_processor, - mqtt_client, ): app = Flask(__name__) - sockets = Sockets(app) @app.before_request def _db_connect(): @@ -129,10 +59,6 @@ def create_app( app.detected_frames_processor = detected_frames_processor app.register_blueprint(bp) - sockets.register_blueprint(ws) - - app.mqtt_backend = MqttBackend(mqtt_client, frigate_config.mqtt.topic_prefix) - app.mqtt_backend.start() return app @@ -613,7 +539,7 @@ def vod(year_month, day, hour, camera): def imagestream(detected_frames_processor, camera_name, fps, height, draw_options): while True: # max out at specified FPS - gevent.sleep(1 / fps) + time.sleep(1 / fps) frame = detected_frames_processor.get_current_frame(camera_name, draw_options) if frame is None: frame = np.zeros((height, int(height * 16 / 9), 3), np.uint8) @@ -626,16 +552,3 @@ def imagestream(detected_frames_processor, camera_name, fps, height, draw_option b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + jpg.tobytes() + b"\r\n\r\n" ) - - -@ws.route("/ws") -def echo_socket(socket): - current_app.mqtt_backend.register(socket) - - while not socket.closed: - # Sleep to prevent *constant* context-switches. - gevent.sleep(0.1) - - message = socket.receive() - if message: - current_app.mqtt_backend.publish(message) diff --git a/frigate/mqtt.py b/frigate/mqtt.py index 9eb63e016..bc10e95c8 100644 --- a/frigate/mqtt.py +++ b/frigate/mqtt.py @@ -1,7 +1,16 @@ +import json import logging import threading +from wsgiref.simple_server import make_server import paho.mqtt.client as mqtt +from ws4py.server.wsgirefserver import ( + WebSocketWSGIHandler, + WebSocketWSGIRequestHandler, + WSGIServer, +) +from ws4py.server.wsgiutils import WebSocketWSGIApplication +from ws4py.websocket import WebSocket from frigate.config import FrigateConfig @@ -117,8 +126,15 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics): ) if not mqtt_config.tls_ca_certs is None: - if not mqtt_config.tls_client_cert is None and not mqtt_config.tls_client_key is None: - client.tls_set(mqtt_config.tls_ca_certs, mqtt_config.tls_client_cert, mqtt_config.tls_client_key) + if ( + not mqtt_config.tls_client_cert is None + and not mqtt_config.tls_client_key is None + ): + client.tls_set( + mqtt_config.tls_ca_certs, + mqtt_config.tls_client_cert, + mqtt_config.tls_client_key, + ) else: client.tls_set(mqtt_config.tls_ca_certs) if not mqtt_config.tls_insecure is None: @@ -151,3 +167,79 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics): ) return client + + +class MqttSocketRelay: + def __init__(self, mqtt_client, topic_prefix): + self.mqtt_client = mqtt_client + self.topic_prefix = topic_prefix + + def start(self): + class MqttWebSocket(WebSocket): + topic_prefix = self.topic_prefix + mqtt_client = self.mqtt_client + + def received_message(self, message): + try: + json_message = json.loads(message.data.decode("utf-8")) + json_message = { + "topic": f"{self.topic_prefix}/{json_message['topic']}", + "payload": json_message["payload"], + "retain": json_message.get("retain", False), + } + except Exception as e: + logger.warning("Unable to parse websocket message as valid json.") + return + + logger.debug( + f"Publishing mqtt message from websockets at {json_message['topic']}." + ) + self.mqtt_client.publish( + json_message["topic"], + json_message["payload"], + retain=json_message["retain"], + ) + + # start a websocket server on 5002 + WebSocketWSGIHandler.http_version = "1.1" + self.websocket_server = make_server( + "127.0.0.1", + 5002, + server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=MqttWebSocket), + ) + self.websocket_server.initialize_websockets_manager() + self.websocket_thread = threading.Thread( + target=self.websocket_server.serve_forever + ) + + def send(client, userdata, message): + """Sends mqtt messages to clients.""" + try: + logger.debug(f"Received mqtt message on {message.topic}.") + ws_message = json.dumps( + { + "topic": message.topic.replace(f"{self.topic_prefix}/", ""), + "payload": message.payload.decode(), + } + ) + except Exception as e: + # if the payload can't be decoded don't relay to clients + logger.debug( + f"MQTT payload for {message.topic} wasn't text. Skipping..." + ) + return + + self.websocket_server.manager.broadcast(ws_message) + + self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) + + self.websocket_thread.start() + + def stop(self): + self.websocket_server.manager.close_all() + self.websocket_server.manager.stop() + self.websocket_server.manager.join() + self.websocket_server.shutdown() + self.websocket_thread.join()