blakeblackshear.frigate/frigate/api/notification.py

66 lines
1.9 KiB
Python
Raw Normal View History

"""Notification apis."""
import logging
import os
from cryptography.hazmat.primitives import serialization
from flask import (
Blueprint,
current_app,
jsonify,
make_response,
request,
)
from peewee import DoesNotExist
from py_vapid import Vapid01, utils
from frigate.const import CONFIG_DIR
from frigate.models import User
logger = logging.getLogger(__name__)
NotificationBp = Blueprint("notifications", __name__)
@NotificationBp.route("/notifications/pubkey", methods=["GET"])
def get_vapid_pub_key():
if not current_app.frigate_config.notifications.enabled:
return make_response(
jsonify({"success": False, "message": "Notifications are not enabled."}),
400,
)
key = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem"))
raw_pub = key.public_key.public_bytes(
serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint
)
return jsonify(utils.b64urlencode(raw_pub)), 200
@NotificationBp.route("/notifications/register", methods=["POST"])
def register_notifications():
if current_app.frigate_config.auth.enabled:
username = request.headers.get("remote-user", type=str) or "admin"
else:
username = "admin"
json: dict[str, any] = request.get_json(silent=True) or {}
sub = json.get("sub")
if not sub:
return jsonify(
{"success": False, "message": "Subscription must be provided."}
), 400
try:
User.update(notification_tokens=User.notification_tokens.append(sub)).where(
User.username == username
).execute()
return make_response(
jsonify({"success": True, "message": "Successfully saved token."}), 200
)
except DoesNotExist:
return make_response(
jsonify({"success": False, "message": "Could not find user."}), 404
)