blakeblackshear.frigate/frigate/api/notification.py
Nicolas Mowen 4869f46ab6
Fixes (#19420)
* Remove torch install

* notification fixes

the pubkey was not being returned if notifications was not enabled at the global level

* Put back

* single condition check for fetching and disabling button

---------

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
2025-08-07 15:34:25 -06:00

72 lines
2.3 KiB
Python

"""Notification apis."""
import logging
import os
from typing import Any
from cryptography.hazmat.primitives import serialization
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from peewee import DoesNotExist
from py_vapid import Vapid01, utils
from frigate.api.defs.tags import Tags
from frigate.const import CONFIG_DIR
from frigate.models import User
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.notifications])
@router.get("/notifications/pubkey")
def get_vapid_pub_key(request: Request):
config = request.app.frigate_config
notifications_enabled = config.notifications.enabled
camera_notifications_enabled = [
c for c in config.cameras.values() if c.enabled and c.notifications.enabled
]
if not (notifications_enabled or camera_notifications_enabled):
return JSONResponse(
content=({"success": False, "message": "Notifications are not enabled."}),
status_code=400,
)
key = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem"))
raw_pub = key.public_key.public_bytes(
serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint
)
return JSONResponse(content=utils.b64urlencode(raw_pub), status_code=200)
@router.post("/notifications/register")
def register_notifications(request: Request, body: dict = None):
if request.app.frigate_config.auth.enabled:
# FIXME: For FastAPI the remote-user is not being populated
username = request.headers.get("remote-user") or "admin"
else:
username = "admin"
json: dict[str, Any] = body or {}
sub = json.get("sub")
if not sub:
return JSONResponse(
content={"success": False, "message": "Subscription must be provided."},
status_code=400,
)
try:
User.update(notification_tokens=User.notification_tokens.append(sub)).where(
User.username == username
).execute()
return JSONResponse(
content=({"success": True, "message": "Successfully saved token."}),
status_code=200,
)
except DoesNotExist:
return JSONResponse(
content=({"success": False, "message": "Could not find user."}),
status_code=404,
)