mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	* Replace logging.warn with logging.warning * Install config global state early * Split config.py into more manageable pieces
		
			
				
	
	
		
			384 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			384 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Auth apis."""
 | 
						|
 | 
						|
import base64
 | 
						|
import hashlib
 | 
						|
import ipaddress
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import re
 | 
						|
import secrets
 | 
						|
import time
 | 
						|
from datetime import datetime
 | 
						|
from pathlib import Path
 | 
						|
 | 
						|
from fastapi import APIRouter, Request, Response
 | 
						|
from fastapi.responses import JSONResponse, RedirectResponse
 | 
						|
from joserfc import jwt
 | 
						|
from peewee import DoesNotExist
 | 
						|
from slowapi import Limiter
 | 
						|
 | 
						|
from frigate.api.defs.app_body import (
 | 
						|
    AppPostLoginBody,
 | 
						|
    AppPostUsersBody,
 | 
						|
    AppPutPasswordBody,
 | 
						|
)
 | 
						|
from frigate.api.defs.tags import Tags
 | 
						|
from frigate.config import AuthConfig, ProxyConfig
 | 
						|
from frigate.const import CONFIG_DIR, JWT_SECRET_ENV_VAR, PASSWORD_HASH_ALGORITHM
 | 
						|
from frigate.models import User
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
router = APIRouter(tags=[Tags.auth])
 | 
						|
 | 
						|
 | 
						|
class RateLimiter:
 | 
						|
    _limit = ""
 | 
						|
 | 
						|
    def set_limit(self, limit: str):
 | 
						|
        self._limit = limit
 | 
						|
 | 
						|
    def get_limit(self) -> str:
 | 
						|
        return self._limit
 | 
						|
 | 
						|
 | 
						|
rateLimiter = RateLimiter()
 | 
						|
 | 
						|
 | 
						|
def get_remote_addr(request: Request):
 | 
						|
    route = list(reversed(request.headers.get("x-forwarded-for").split(",")))
 | 
						|
    logger.debug(f"IP Route: {[r for r in route]}")
 | 
						|
    trusted_proxies = []
 | 
						|
    for proxy in request.app.frigate_config.auth.trusted_proxies:
 | 
						|
        try:
 | 
						|
            network = ipaddress.ip_network(proxy)
 | 
						|
        except ValueError:
 | 
						|
            logger.warning(f"Unable to parse trusted network: {proxy}")
 | 
						|
        trusted_proxies.append(network)
 | 
						|
 | 
						|
    # return the first remote address that is not trusted
 | 
						|
    for addr in route:
 | 
						|
        ip = ipaddress.ip_address(addr.strip())
 | 
						|
        logger.debug(f"Checking {ip} (v{ip.version})")
 | 
						|
        trusted = False
 | 
						|
        for trusted_proxy in trusted_proxies:
 | 
						|
            logger.debug(
 | 
						|
                f"Checking against trusted proxy: {trusted_proxy} (v{trusted_proxy.version})"
 | 
						|
            )
 | 
						|
            if trusted_proxy.version == 4:
 | 
						|
                ipv4 = ip.ipv4_mapped if ip.version == 6 else ip
 | 
						|
                if ipv4 in trusted_proxy:
 | 
						|
                    trusted = True
 | 
						|
                    logger.debug(f"Trusted: {str(ip)} by {str(trusted_proxy)}")
 | 
						|
                    break
 | 
						|
            elif trusted_proxy.version == 6 and ip.version == 6:
 | 
						|
                if ip in trusted_proxy:
 | 
						|
                    trusted = True
 | 
						|
                    logger.debug(f"Trusted: {str(ip)} by {str(trusted_proxy)}")
 | 
						|
                    break
 | 
						|
        if trusted:
 | 
						|
            logger.debug(f"{ip} is trusted")
 | 
						|
            continue
 | 
						|
        else:
 | 
						|
            logger.debug(f"First untrusted IP: {str(ip)}")
 | 
						|
            return str(ip)
 | 
						|
 | 
						|
    # if there wasn't anything in the route, just return the default
 | 
						|
    return request.remote_addr or "127.0.0.1"
 | 
						|
 | 
						|
 | 
						|
def get_jwt_secret() -> str:
 | 
						|
    jwt_secret = None
 | 
						|
    # check env var
 | 
						|
    if JWT_SECRET_ENV_VAR in os.environ:
 | 
						|
        logger.debug(
 | 
						|
            f"Using jwt secret from {JWT_SECRET_ENV_VAR} environment variable."
 | 
						|
        )
 | 
						|
        jwt_secret = os.environ.get(JWT_SECRET_ENV_VAR)
 | 
						|
    # check docker secrets
 | 
						|
    elif os.path.isfile(os.path.join("/run/secrets", JWT_SECRET_ENV_VAR)):
 | 
						|
        logger.debug(f"Using jwt secret from {JWT_SECRET_ENV_VAR} docker secret file.")
 | 
						|
        jwt_secret = (
 | 
						|
            Path(os.path.join("/run/secrets", JWT_SECRET_ENV_VAR)).read_text().strip()
 | 
						|
        )
 | 
						|
    # check for the addon options file
 | 
						|
    elif os.path.isfile("/data/options.json"):
 | 
						|
        with open("/data/options.json") as f:
 | 
						|
            raw_options = f.read()
 | 
						|
        logger.debug("Using jwt secret from Home Assistant addon options file.")
 | 
						|
        options = json.loads(raw_options)
 | 
						|
        jwt_secret = options.get("jwt_secret")
 | 
						|
 | 
						|
    if jwt_secret is None:
 | 
						|
        jwt_secret_file = os.path.join(CONFIG_DIR, ".jwt_secret")
 | 
						|
        # check .jwt_secrets file
 | 
						|
        if not os.path.isfile(jwt_secret_file):
 | 
						|
            logger.debug(
 | 
						|
                "No jwt secret found. Generating one and storing in .jwt_secret file in config directory."
 | 
						|
            )
 | 
						|
            jwt_secret = secrets.token_hex(64)
 | 
						|
            try:
 | 
						|
                with open(jwt_secret_file, "w") as f:
 | 
						|
                    f.write(str(jwt_secret))
 | 
						|
            except Exception:
 | 
						|
                logger.warning(
 | 
						|
                    "Unable to write jwt token file to config directory. A new jwt token will be created at each startup."
 | 
						|
                )
 | 
						|
        else:
 | 
						|
            logger.debug("Using jwt secret from .jwt_secret file in config directory.")
 | 
						|
            with open(jwt_secret_file) as f:
 | 
						|
                try:
 | 
						|
                    jwt_secret = f.readline()
 | 
						|
                except Exception:
 | 
						|
                    logger.warning(
 | 
						|
                        "Unable to read jwt token from .jwt_secret file in config directory. A new jwt token will be created at each startup."
 | 
						|
                    )
 | 
						|
                    jwt_secret = secrets.token_hex(64)
 | 
						|
 | 
						|
    if len(jwt_secret) < 64:
 | 
						|
        logger.warning("JWT Secret is recommended to be 64 characters or more")
 | 
						|
 | 
						|
    return jwt_secret
 | 
						|
 | 
						|
 | 
						|
def hash_password(password: str, salt=None, iterations=600000):
 | 
						|
    if salt is None:
 | 
						|
        salt = secrets.token_hex(16)
 | 
						|
    assert salt and isinstance(salt, str) and "$" not in salt
 | 
						|
    assert isinstance(password, str)
 | 
						|
    pw_hash = hashlib.pbkdf2_hmac(
 | 
						|
        "sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations
 | 
						|
    )
 | 
						|
    b64_hash = base64.b64encode(pw_hash).decode("ascii").strip()
 | 
						|
    return "{}${}${}${}".format(PASSWORD_HASH_ALGORITHM, iterations, salt, b64_hash)
 | 
						|
 | 
						|
 | 
						|
def verify_password(password, password_hash):
 | 
						|
    if (password_hash or "").count("$") != 3:
 | 
						|
        return False
 | 
						|
    algorithm, iterations, salt, b64_hash = password_hash.split("$", 3)
 | 
						|
    iterations = int(iterations)
 | 
						|
    assert algorithm == PASSWORD_HASH_ALGORITHM
 | 
						|
    compare_hash = hash_password(password, salt, iterations)
 | 
						|
    return secrets.compare_digest(password_hash, compare_hash)
 | 
						|
 | 
						|
 | 
						|
def create_encoded_jwt(user, expiration, secret):
 | 
						|
    return jwt.encode({"alg": "HS256"}, {"sub": user, "exp": expiration}, secret)
 | 
						|
 | 
						|
 | 
						|
def set_jwt_cookie(response: Response, cookie_name, encoded_jwt, expiration, secure):
 | 
						|
    # TODO: ideally this would set secure as well, but that requires TLS
 | 
						|
    response.set_cookie(
 | 
						|
        key=cookie_name,
 | 
						|
        value=encoded_jwt,
 | 
						|
        httponly=True,
 | 
						|
        expires=expiration,
 | 
						|
        secure=secure,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
# Endpoint for use with nginx auth_request
 | 
						|
@router.get("/auth")
 | 
						|
def auth(request: Request):
 | 
						|
    auth_config: AuthConfig = request.app.frigate_config.auth
 | 
						|
    proxy_config: ProxyConfig = request.app.frigate_config.proxy
 | 
						|
 | 
						|
    success_response = Response("", status_code=202)
 | 
						|
 | 
						|
    # dont require auth if the request is on the internal port
 | 
						|
    # this header is set by Frigate's nginx proxy, so it cant be spoofed
 | 
						|
    if int(request.headers.get("x-server-port", default=0)) == 5000:
 | 
						|
        return success_response
 | 
						|
 | 
						|
    fail_response = Response("", status_code=401)
 | 
						|
 | 
						|
    # ensure the proxy secret matches if configured
 | 
						|
    if (
 | 
						|
        proxy_config.auth_secret is not None
 | 
						|
        and request.headers.get("x-proxy-secret", "") != proxy_config.auth_secret
 | 
						|
    ):
 | 
						|
        logger.debug("X-Proxy-Secret header does not match configured secret value")
 | 
						|
        return fail_response
 | 
						|
 | 
						|
    # if auth is disabled, just apply the proxy header map and return success
 | 
						|
    if not auth_config.enabled:
 | 
						|
        # pass the user header value from the upstream proxy if a mapping is specified
 | 
						|
        # or use anonymous if none are specified
 | 
						|
        if proxy_config.header_map.user is not None:
 | 
						|
            upstream_user_header_value = request.headers.get(
 | 
						|
                proxy_config.header_map.user,
 | 
						|
                default="anonymous",
 | 
						|
            )
 | 
						|
            success_response.headers["remote-user"] = upstream_user_header_value
 | 
						|
        else:
 | 
						|
            success_response.headers["remote-user"] = "anonymous"
 | 
						|
        return success_response
 | 
						|
 | 
						|
    # now apply authentication
 | 
						|
    fail_response.headers["location"] = "/login"
 | 
						|
 | 
						|
    JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name
 | 
						|
    JWT_COOKIE_SECURE = request.app.frigate_config.auth.cookie_secure
 | 
						|
    JWT_REFRESH = request.app.frigate_config.auth.refresh_time
 | 
						|
    JWT_SESSION_LENGTH = request.app.frigate_config.auth.session_length
 | 
						|
 | 
						|
    jwt_source = None
 | 
						|
    encoded_token = None
 | 
						|
    if "authorization" in request.headers and request.headers[
 | 
						|
        "authorization"
 | 
						|
    ].startswith("Bearer "):
 | 
						|
        jwt_source = "authorization"
 | 
						|
        logger.debug("Found authorization header")
 | 
						|
        encoded_token = request.headers["authorization"].replace("Bearer ", "")
 | 
						|
    elif JWT_COOKIE_NAME in request.cookies:
 | 
						|
        jwt_source = "cookie"
 | 
						|
        logger.debug("Found jwt cookie")
 | 
						|
        encoded_token = request.cookies[JWT_COOKIE_NAME]
 | 
						|
 | 
						|
    if encoded_token is None:
 | 
						|
        logger.debug("No jwt token found")
 | 
						|
        return fail_response
 | 
						|
 | 
						|
    try:
 | 
						|
        token = jwt.decode(encoded_token, request.app.jwt_token)
 | 
						|
        if "sub" not in token.claims:
 | 
						|
            logger.debug("user not set in jwt token")
 | 
						|
            return fail_response
 | 
						|
        if "exp" not in token.claims:
 | 
						|
            logger.debug("exp not set in jwt token")
 | 
						|
            return fail_response
 | 
						|
 | 
						|
        user = token.claims.get("sub")
 | 
						|
        current_time = int(time.time())
 | 
						|
 | 
						|
        # if the jwt is expired
 | 
						|
        expiration = int(token.claims.get("exp"))
 | 
						|
        logger.debug(
 | 
						|
            f"current time:   {datetime.fromtimestamp(current_time).strftime('%c')}"
 | 
						|
        )
 | 
						|
        logger.debug(
 | 
						|
            f"jwt expires at: {datetime.fromtimestamp(expiration).strftime('%c')}"
 | 
						|
        )
 | 
						|
        logger.debug(
 | 
						|
            f"jwt refresh at: {datetime.fromtimestamp(expiration - JWT_REFRESH).strftime('%c')}"
 | 
						|
        )
 | 
						|
        if expiration <= current_time:
 | 
						|
            logger.debug("jwt token expired")
 | 
						|
            return fail_response
 | 
						|
 | 
						|
        # if the jwt cookie is expiring soon
 | 
						|
        elif jwt_source == "cookie" and expiration - JWT_REFRESH <= current_time:
 | 
						|
            logger.debug("jwt token expiring soon, refreshing cookie")
 | 
						|
            # ensure the user hasn't been deleted
 | 
						|
            try:
 | 
						|
                User.get_by_id(user)
 | 
						|
            except DoesNotExist:
 | 
						|
                return fail_response
 | 
						|
            new_expiration = current_time + JWT_SESSION_LENGTH
 | 
						|
            new_encoded_jwt = create_encoded_jwt(
 | 
						|
                user, new_expiration, request.app.jwt_token
 | 
						|
            )
 | 
						|
            set_jwt_cookie(
 | 
						|
                success_response,
 | 
						|
                JWT_COOKIE_NAME,
 | 
						|
                new_encoded_jwt,
 | 
						|
                new_expiration,
 | 
						|
                JWT_COOKIE_SECURE,
 | 
						|
            )
 | 
						|
 | 
						|
        success_response.headers["remote-user"] = user
 | 
						|
        return success_response
 | 
						|
    except Exception as e:
 | 
						|
        logger.error(f"Error parsing jwt: {e}")
 | 
						|
        return fail_response
 | 
						|
 | 
						|
 | 
						|
@router.get("/profile")
 | 
						|
def profile(request: Request):
 | 
						|
    username = request.headers.get("remote-user")
 | 
						|
    return JSONResponse(content={"username": username})
 | 
						|
 | 
						|
 | 
						|
@router.get("/logout")
 | 
						|
def logout(request: Request):
 | 
						|
    auth_config: AuthConfig = request.app.frigate_config.auth
 | 
						|
    response = RedirectResponse("/login", status_code=303)
 | 
						|
    response.delete_cookie(auth_config.cookie_name)
 | 
						|
    return response
 | 
						|
 | 
						|
 | 
						|
limiter = Limiter(key_func=get_remote_addr)
 | 
						|
 | 
						|
 | 
						|
@router.post("/login")
 | 
						|
@limiter.limit(limit_value=rateLimiter.get_limit)
 | 
						|
def login(request: Request, body: AppPostLoginBody):
 | 
						|
    JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name
 | 
						|
    JWT_COOKIE_SECURE = request.app.frigate_config.auth.cookie_secure
 | 
						|
    JWT_SESSION_LENGTH = request.app.frigate_config.auth.session_length
 | 
						|
    user = body.user
 | 
						|
    password = body.password
 | 
						|
 | 
						|
    try:
 | 
						|
        db_user: User = User.get_by_id(user)
 | 
						|
    except DoesNotExist:
 | 
						|
        return JSONResponse(content={"message": "Login failed"}, status_code=400)
 | 
						|
 | 
						|
    password_hash = db_user.password_hash
 | 
						|
    if verify_password(password, password_hash):
 | 
						|
        expiration = int(time.time()) + JWT_SESSION_LENGTH
 | 
						|
        encoded_jwt = create_encoded_jwt(user, expiration, request.app.jwt_token)
 | 
						|
        response = Response("", 200)
 | 
						|
        set_jwt_cookie(
 | 
						|
            response, JWT_COOKIE_NAME, encoded_jwt, expiration, JWT_COOKIE_SECURE
 | 
						|
        )
 | 
						|
        return response
 | 
						|
    return JSONResponse(content={"message": "Login failed"}, status_code=400)
 | 
						|
 | 
						|
 | 
						|
@router.get("/users")
 | 
						|
def get_users():
 | 
						|
    exports = User.select(User.username).order_by(User.username).dicts().iterator()
 | 
						|
    return JSONResponse([e for e in exports])
 | 
						|
 | 
						|
 | 
						|
@router.post("/users")
 | 
						|
def create_user(request: Request, body: AppPostUsersBody):
 | 
						|
    HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations
 | 
						|
 | 
						|
    if not re.match("^[A-Za-z0-9._]+$", body.username):
 | 
						|
        JSONResponse(content={"message": "Invalid username"}, status_code=400)
 | 
						|
 | 
						|
    password_hash = hash_password(body.password, iterations=HASH_ITERATIONS)
 | 
						|
 | 
						|
    User.insert(
 | 
						|
        {
 | 
						|
            User.username: body.username,
 | 
						|
            User.password_hash: password_hash,
 | 
						|
        }
 | 
						|
    ).execute()
 | 
						|
    return JSONResponse(content={"username": body.username})
 | 
						|
 | 
						|
 | 
						|
@router.delete("/users/{username}")
 | 
						|
def delete_user(username: str):
 | 
						|
    User.delete_by_id(username)
 | 
						|
    return JSONResponse(content={"success": True})
 | 
						|
 | 
						|
 | 
						|
@router.put("/users/{username}/password")
 | 
						|
def update_password(request: Request, username: str, body: AppPutPasswordBody):
 | 
						|
    HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations
 | 
						|
 | 
						|
    password_hash = hash_password(body.password, iterations=HASH_ITERATIONS)
 | 
						|
 | 
						|
    User.set_by_id(
 | 
						|
        username,
 | 
						|
        {
 | 
						|
            User.password_hash: password_hash,
 | 
						|
        },
 | 
						|
    )
 | 
						|
    return JSONResponse(content={"success": True})
 |