blakeblackshear.frigate/frigate/comms/ws.py
Martin Weinelt ab50d0b006
Add isort and ruff linter (#6575)
* Add isort and ruff linter

Both linters are pretty common among modern python code bases.

The isort tool provides stable sorting and grouping, as well as pruning
of unused imports.

Ruff is a modern linter, that is very fast due to being written in rust.
It can detect many common issues in a python codebase.

Removes the pylint dev requirement, since ruff replaces it.

* treewide: fix issues detected by ruff

* treewide: fix bare except clauses

* .devcontainer: Set up isort

* treewide: optimize imports

* treewide: apply black

* treewide: make regex patterns raw strings

This is necessary for escape sequences to be properly recognized.
2023-05-29 05:31:17 -05:00

97 lines
3.1 KiB
Python

"""Websocket communicator."""
import json
import logging
import threading
from typing import Callable
from wsgiref.simple_server import make_server
from ws4py.server.wsgirefserver import (
WebSocketWSGIHandler,
WebSocketWSGIRequestHandler,
WSGIServer,
)
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.websocket import WebSocket
from frigate.comms.dispatcher import Communicator
from frigate.config import FrigateConfig
logger = logging.getLogger(__name__)
class WebSocketClient(Communicator): # type: ignore[misc]
"""Frigate wrapper for ws client."""
def __init__(self, config: FrigateConfig) -> None:
self.config = config
def subscribe(self, receiver: Callable) -> None:
self._dispatcher = receiver
self.start()
def start(self) -> None:
"""Start the websocket client."""
class _WebSocketHandler(WebSocket): # type: ignore[misc]
receiver = self._dispatcher
def received_message(self, message: WebSocket.received_message) -> None:
try:
json_message = json.loads(message.data.decode("utf-8"))
json_message = {
"topic": json_message.get("topic"),
"payload": json_message.get("payload"),
}
except Exception:
logger.warning(
f"Unable to parse websocket message as valid json: {message.data.decode('utf-8')}"
)
return
logger.debug(
f"Publishing mqtt message from websockets at {json_message['topic']}."
)
self.receiver(
json_message["topic"],
json_message["payload"],
)
# start a websocket server on 5002
WebSocketWSGIHandler.http_version = "1.1"
self.websocket_server = make_server(
"127.0.0.1",
5002,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=_WebSocketHandler),
)
self.websocket_server.initialize_websockets_manager()
self.websocket_thread = threading.Thread(
target=self.websocket_server.serve_forever
)
self.websocket_thread.start()
def publish(self, topic: str, payload: str, _: bool) -> None:
try:
ws_message = json.dumps(
{
"topic": topic,
"payload": payload,
}
)
except Exception:
# if the payload can't be decoded don't relay to clients
logger.debug(f"payload for {topic} wasn't text. Skipping...")
return
self.websocket_server.manager.broadcast(ws_message)
def stop(self) -> None:
self.websocket_server.manager.close_all()
self.websocket_server.manager.stop()
self.websocket_server.manager.join()
self.websocket_server.shutdown()
self.websocket_thread.join()
logger.info("Exiting websocket client...")