mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			204 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			204 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Handle sending notifications for Frigate via Firebase."""
 | 
						|
 | 
						|
import datetime
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import os
 | 
						|
from typing import Any, Callable
 | 
						|
 | 
						|
from py_vapid import Vapid01
 | 
						|
from pywebpush import WebPusher
 | 
						|
 | 
						|
from frigate.comms.config_updater import ConfigSubscriber
 | 
						|
from frigate.comms.dispatcher import Communicator
 | 
						|
from frigate.config import FrigateConfig
 | 
						|
from frigate.const import CONFIG_DIR
 | 
						|
from frigate.models import User
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class WebPushClient(Communicator):  # type: ignore[misc]
 | 
						|
    """Frigate wrapper for webpush client."""
 | 
						|
 | 
						|
    def __init__(self, config: FrigateConfig) -> None:
 | 
						|
        self.config = config
 | 
						|
        self.claim_headers: dict[str, dict[str, str]] = {}
 | 
						|
        self.refresh: int = 0
 | 
						|
        self.web_pushers: dict[str, list[WebPusher]] = {}
 | 
						|
        self.expired_subs: dict[str, list[str]] = {}
 | 
						|
 | 
						|
        if not self.config.notifications.email:
 | 
						|
            logger.warning("Email must be provided for push notifications to be sent.")
 | 
						|
 | 
						|
        # Pull keys from PEM or generate if they do not exist
 | 
						|
        self.vapid = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem"))
 | 
						|
 | 
						|
        users: list[User] = (
 | 
						|
            User.select(User.username, User.notification_tokens).dicts().iterator()
 | 
						|
        )
 | 
						|
        for user in users:
 | 
						|
            self.web_pushers[user["username"]] = []
 | 
						|
            for sub in user["notification_tokens"]:
 | 
						|
                self.web_pushers[user["username"]].append(WebPusher(sub))
 | 
						|
 | 
						|
        # notification config updater
 | 
						|
        self.config_subscriber = ConfigSubscriber("config/notifications")
 | 
						|
 | 
						|
    def subscribe(self, receiver: Callable) -> None:
 | 
						|
        """Wrapper for allowing dispatcher to subscribe."""
 | 
						|
        pass
 | 
						|
 | 
						|
    def check_registrations(self) -> None:
 | 
						|
        # check for valid claim or create new one
 | 
						|
        now = datetime.datetime.now().timestamp()
 | 
						|
        if len(self.claim_headers) == 0 or self.refresh < now:
 | 
						|
            self.refresh = int(
 | 
						|
                (datetime.datetime.now() + datetime.timedelta(hours=1)).timestamp()
 | 
						|
            )
 | 
						|
            endpoints: set[str] = set()
 | 
						|
 | 
						|
            # get a unique set of push endpoints
 | 
						|
            for pushers in self.web_pushers.values():
 | 
						|
                for push in pushers:
 | 
						|
                    endpoint: str = push.subscription_info["endpoint"]
 | 
						|
                    endpoints.add(endpoint[0 : endpoint.index("/", 10)])
 | 
						|
 | 
						|
            # create new claim
 | 
						|
            for endpoint in endpoints:
 | 
						|
                claim = {
 | 
						|
                    "sub": f"mailto:{self.config.notifications.email}",
 | 
						|
                    "aud": endpoint,
 | 
						|
                    "exp": self.refresh,
 | 
						|
                }
 | 
						|
                self.claim_headers[endpoint] = self.vapid.sign(claim)
 | 
						|
 | 
						|
    def cleanup_registrations(self) -> None:
 | 
						|
        # delete any expired subs
 | 
						|
        if len(self.expired_subs) > 0:
 | 
						|
            for user, expired in self.expired_subs.items():
 | 
						|
                user_subs = []
 | 
						|
 | 
						|
                # get all subscriptions, removing ones that are expired
 | 
						|
                stored_user: User = User.get_by_id(user)
 | 
						|
                for token in stored_user.notification_tokens:
 | 
						|
                    if token["endpoint"] in expired:
 | 
						|
                        continue
 | 
						|
 | 
						|
                    user_subs.append(token)
 | 
						|
 | 
						|
                # overwrite the database and reset web pushers
 | 
						|
                User.update(notification_tokens=user_subs).where(
 | 
						|
                    User.username == user
 | 
						|
                ).execute()
 | 
						|
 | 
						|
                self.web_pushers[user] = []
 | 
						|
 | 
						|
                for sub in user_subs:
 | 
						|
                    self.web_pushers[user].append(WebPusher(sub))
 | 
						|
 | 
						|
                logger.info(
 | 
						|
                    f"Cleaned up {len(expired)} notification subscriptions for {user}"
 | 
						|
                )
 | 
						|
 | 
						|
        self.expired_subs = {}
 | 
						|
 | 
						|
    def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
 | 
						|
        """Wrapper for publishing when client is in valid state."""
 | 
						|
        # check for updated notification config
 | 
						|
        _, updated_notification_config = self.config_subscriber.check_for_update()
 | 
						|
 | 
						|
        if updated_notification_config:
 | 
						|
            self.config.notifications = updated_notification_config
 | 
						|
 | 
						|
        if not self.config.notifications.enabled:
 | 
						|
            return
 | 
						|
 | 
						|
        if topic == "reviews":
 | 
						|
            self.send_alert(json.loads(payload))
 | 
						|
 | 
						|
    def send_alert(self, payload: dict[str, any]) -> None:
 | 
						|
        if not self.config.notifications.email:
 | 
						|
            return
 | 
						|
 | 
						|
        self.check_registrations()
 | 
						|
 | 
						|
        # Only notify for alerts
 | 
						|
        if payload["after"]["severity"] != "alert":
 | 
						|
            return
 | 
						|
 | 
						|
        state = payload["type"]
 | 
						|
 | 
						|
        # Don't notify if message is an update and important fields don't have an update
 | 
						|
        if (
 | 
						|
            state == "update"
 | 
						|
            and len(payload["before"]["data"]["objects"])
 | 
						|
            == len(payload["after"]["data"]["objects"])
 | 
						|
            and len(payload["before"]["data"]["zones"])
 | 
						|
            == len(payload["after"]["data"]["zones"])
 | 
						|
        ):
 | 
						|
            return
 | 
						|
 | 
						|
        reviewId = payload["after"]["id"]
 | 
						|
        sorted_objects: set[str] = set()
 | 
						|
 | 
						|
        for obj in payload["after"]["data"]["objects"]:
 | 
						|
            if "-verified" not in obj:
 | 
						|
                sorted_objects.add(obj)
 | 
						|
 | 
						|
        sorted_objects.update(payload["after"]["data"]["sub_labels"])
 | 
						|
 | 
						|
        camera: str = payload["after"]["camera"]
 | 
						|
        title = f"{', '.join(sorted_objects).replace('_', ' ').title()}{' was' if state == 'end' else ''} detected in {', '.join(payload['after']['data']['zones']).replace('_', ' ').title()}"
 | 
						|
        message = f"Detected on {camera.replace('_', ' ').title()}"
 | 
						|
        image = f"{payload['after']['thumb_path'].replace('/media/frigate', '')}"
 | 
						|
 | 
						|
        # if event is ongoing open to live view otherwise open to recordings view
 | 
						|
        direct_url = f"/review?id={reviewId}" if state == "end" else f"/#{camera}"
 | 
						|
 | 
						|
        for user, pushers in self.web_pushers.items():
 | 
						|
            for pusher in pushers:
 | 
						|
                endpoint = pusher.subscription_info["endpoint"]
 | 
						|
 | 
						|
                # set headers for notification behavior
 | 
						|
                headers = self.claim_headers[
 | 
						|
                    endpoint[0 : endpoint.index("/", 10)]
 | 
						|
                ].copy()
 | 
						|
                headers["urgency"] = "high"
 | 
						|
                ttl = 3600 if state == "end" else 0
 | 
						|
 | 
						|
                # send message
 | 
						|
                resp = pusher.send(
 | 
						|
                    headers=headers,
 | 
						|
                    ttl=ttl,
 | 
						|
                    data=json.dumps(
 | 
						|
                        {
 | 
						|
                            "title": title,
 | 
						|
                            "message": message,
 | 
						|
                            "direct_url": direct_url,
 | 
						|
                            "image": image,
 | 
						|
                            "id": reviewId,
 | 
						|
                            "type": "alert",
 | 
						|
                        }
 | 
						|
                    ),
 | 
						|
                )
 | 
						|
 | 
						|
                if resp.status_code == 201:
 | 
						|
                    pass
 | 
						|
                elif resp.status_code == 404 or resp.status_code == 410:
 | 
						|
                    # subscription is not found or has been unsubscribed
 | 
						|
                    if not self.expired_subs.get(user):
 | 
						|
                        self.expired_subs[user] = []
 | 
						|
 | 
						|
                    self.expired_subs[user].append(pusher.subscription_info["endpoint"])
 | 
						|
                    # the subscription no longer exists and should be removed
 | 
						|
                else:
 | 
						|
                    logger.warning(
 | 
						|
                        f"Failed to send notification to {user} :: {resp.headers}"
 | 
						|
                    )
 | 
						|
 | 
						|
        self.cleanup_registrations()
 | 
						|
 | 
						|
    def stop(self) -> None:
 | 
						|
        pass
 |