mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	* Remove torch install * notification fixes the pubkey was not being returned if notifications was not enabled at the global level * Put back * single condition check for fetching and disabling button --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
		
			
				
	
	
		
			72 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			72 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Notification apis."""
 | 
						|
 | 
						|
import logging
 | 
						|
import os
 | 
						|
from typing import Any
 | 
						|
 | 
						|
from cryptography.hazmat.primitives import serialization
 | 
						|
from fastapi import APIRouter, Request
 | 
						|
from fastapi.responses import JSONResponse
 | 
						|
from peewee import DoesNotExist
 | 
						|
from py_vapid import Vapid01, utils
 | 
						|
 | 
						|
from frigate.api.defs.tags import Tags
 | 
						|
from frigate.const import CONFIG_DIR
 | 
						|
from frigate.models import User
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
router = APIRouter(tags=[Tags.notifications])
 | 
						|
 | 
						|
 | 
						|
@router.get("/notifications/pubkey")
 | 
						|
def get_vapid_pub_key(request: Request):
 | 
						|
    config = request.app.frigate_config
 | 
						|
    notifications_enabled = config.notifications.enabled
 | 
						|
    camera_notifications_enabled = [
 | 
						|
        c for c in config.cameras.values() if c.enabled and c.notifications.enabled
 | 
						|
    ]
 | 
						|
    if not (notifications_enabled or camera_notifications_enabled):
 | 
						|
        return JSONResponse(
 | 
						|
            content=({"success": False, "message": "Notifications are not enabled."}),
 | 
						|
            status_code=400,
 | 
						|
        )
 | 
						|
 | 
						|
    key = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem"))
 | 
						|
    raw_pub = key.public_key.public_bytes(
 | 
						|
        serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint
 | 
						|
    )
 | 
						|
    return JSONResponse(content=utils.b64urlencode(raw_pub), status_code=200)
 | 
						|
 | 
						|
 | 
						|
@router.post("/notifications/register")
 | 
						|
def register_notifications(request: Request, body: dict = None):
 | 
						|
    if request.app.frigate_config.auth.enabled:
 | 
						|
        # FIXME: For FastAPI the remote-user is not being populated
 | 
						|
        username = request.headers.get("remote-user") or "admin"
 | 
						|
    else:
 | 
						|
        username = "admin"
 | 
						|
 | 
						|
    json: dict[str, Any] = body or {}
 | 
						|
    sub = json.get("sub")
 | 
						|
 | 
						|
    if not sub:
 | 
						|
        return JSONResponse(
 | 
						|
            content={"success": False, "message": "Subscription must be provided."},
 | 
						|
            status_code=400,
 | 
						|
        )
 | 
						|
 | 
						|
    try:
 | 
						|
        User.update(notification_tokens=User.notification_tokens.append(sub)).where(
 | 
						|
            User.username == username
 | 
						|
        ).execute()
 | 
						|
        return JSONResponse(
 | 
						|
            content=({"success": True, "message": "Successfully saved token."}),
 | 
						|
            status_code=200,
 | 
						|
        )
 | 
						|
    except DoesNotExist:
 | 
						|
        return JSONResponse(
 | 
						|
            content=({"success": False, "message": "Could not find user."}),
 | 
						|
            status_code=404,
 | 
						|
        )
 |