blakeblackshear.frigate/frigate/api/notification.py
Martin Weinelt 4d4d54d030
Fix various typing issues (#18187)
* Fix the `Any` typing hint treewide

There has been confusion between the Any type[1] and the any function[2]
in typing hints.

[1] https://docs.python.org/3/library/typing.html#typing.Any
[2] https://docs.python.org/3/library/functions.html#any

* Fix typing for various frame_shape members

Frame shapes are most likely defined by height and width, so a single int
cannot express that.

* Wrap gpu stats functions in Optional[]

These can return `None`, so they need to be `Type | None`, which is what
`Optional` expresses very nicely.

* Fix return type in get_latest_segment_datetime

Returns a datetime object, not an integer.

* Make the return type of FrameManager.write optional

This is necessary since the SharedMemoryFrameManager.write function can
return None.

* Fix total_seconds() return type in get_tz_modifiers

The function returns a float, not an int.

https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds

* Account for floating point results in to_relative_box

Because the function uses division the return types may either be int or
float.

* Resolve ruff deprecation warning

The config has been split into formatter and linter, and the global
options are deprecated.
2025-05-13 08:27:20 -06:00

67 lines
2.0 KiB
Python

"""Notification apis."""
import logging
import os
from typing import Any
from cryptography.hazmat.primitives import serialization
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from peewee import DoesNotExist
from py_vapid import Vapid01, utils
from frigate.api.defs.tags import Tags
from frigate.const import CONFIG_DIR
from frigate.models import User
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.notifications])
@router.get("/notifications/pubkey")
def get_vapid_pub_key(request: Request):
if not request.app.frigate_config.notifications.enabled:
return JSONResponse(
content=({"success": False, "message": "Notifications are not enabled."}),
status_code=400,
)
key = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem"))
raw_pub = key.public_key.public_bytes(
serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint
)
return JSONResponse(content=utils.b64urlencode(raw_pub), status_code=200)
@router.post("/notifications/register")
def register_notifications(request: Request, body: dict = None):
if request.app.frigate_config.auth.enabled:
# FIXME: For FastAPI the remote-user is not being populated
username = request.headers.get("remote-user") or "admin"
else:
username = "admin"
json: dict[str, Any] = body or {}
sub = json.get("sub")
if not sub:
return JSONResponse(
content={"success": False, "message": "Subscription must be provided."},
status_code=400,
)
try:
User.update(notification_tokens=User.notification_tokens.append(sub)).where(
User.username == username
).execute()
return JSONResponse(
content=({"success": True, "message": "Successfully saved token."}),
status_code=200,
)
except DoesNotExist:
return JSONResponse(
content=({"success": False, "message": "Could not find user."}),
status_code=404,
)