blakeblackshear.frigate/frigate/api/auth.py
Josh Hawkins ed1e3a7c9a
Enhance user roles to limit camera access (#20024)
* update config for roles and add validator

* ensure admin and viewer are never overridden

* add class method to user to retrieve all allowed cameras

* enforce config roles in auth api endpoints

* add camera access api dependency functions

* protect review endpoints

* protect preview endpoints

* rename param name for better fastapi injection matching

* remove unneeded

* protect export endpoints

* protect event endpoints

* protect media endpoints

* update auth hook for allowed cameras

* update default app view

* ensure anonymous user always returns all cameras

* limit cameras in explore

* cameras is already a list

* limit cameras in review/history

* limit cameras in live view

* limit cameras in camera groups

* only show face library and classification in sidebar for admin

* remove check in delete reviews

since admin role is required, no need to check camera access. fixes failing test

* pass request with camera access for tests

* more async

* camera access tests

* fix proxy auth tests

* allowed cameras for review tests

* combine event tests and refactor for camera access

* fix post validation for roles

* don't limit roles in create user dialog

* fix triggers endpoints

no need to run require camera access dep since the required role is admin

* fix type

* create and edit role dialogs

* delete role dialog

* fix role change dialog

* update settings view for roles

* i18n changes

* minor spacing tweaks

* docs

* use badges and camera name label component

* clarify docs

* display all cameras badge for admin and viewer

* i18n fix

* use validator to prevent reserved and empty roles from being assigned

* split users and roles into separate tabs in settings

* tweak docs

* clarify docs

* change icon

* don't memoize roles

always recalculate on component render
2025-09-12 05:19:29 -06:00

666 lines
23 KiB
Python

"""Auth apis."""
import base64
import hashlib
import ipaddress
import json
import logging
import os
import re
import secrets
import time
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import JSONResponse, RedirectResponse
from joserfc import jwt
from peewee import DoesNotExist
from slowapi import Limiter
from frigate.api.defs.request.app_body import (
AppPostLoginBody,
AppPostUsersBody,
AppPutPasswordBody,
AppPutRoleBody,
)
from frigate.api.defs.tags import Tags
from frigate.config import AuthConfig, ProxyConfig
from frigate.const import CONFIG_DIR, JWT_SECRET_ENV_VAR, PASSWORD_HASH_ALGORITHM
from frigate.models import User
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.auth])
class RateLimiter:
_limit = ""
def set_limit(self, limit: str):
self._limit = limit
def get_limit(self) -> str:
return self._limit
rateLimiter = RateLimiter()
def get_remote_addr(request: Request):
route = list(reversed(request.headers.get("x-forwarded-for").split(",")))
logger.debug(f"IP Route: {[r for r in route]}")
trusted_proxies = []
for proxy in request.app.frigate_config.auth.trusted_proxies:
try:
network = ipaddress.ip_network(proxy)
except ValueError:
logger.warning(f"Unable to parse trusted network: {proxy}")
trusted_proxies.append(network)
# return the first remote address that is not trusted
for addr in route:
ip = ipaddress.ip_address(addr.strip())
logger.debug(f"Checking {ip} (v{ip.version})")
trusted = False
for trusted_proxy in trusted_proxies:
logger.debug(
f"Checking against trusted proxy: {trusted_proxy} (v{trusted_proxy.version})"
)
if trusted_proxy.version == 4:
ipv4 = ip.ipv4_mapped if ip.version == 6 else ip
if ipv4 is not None and ipv4 in trusted_proxy:
trusted = True
logger.debug(f"Trusted: {str(ip)} by {str(trusted_proxy)}")
break
elif trusted_proxy.version == 6 and ip.version == 6:
if ip in trusted_proxy:
trusted = True
logger.debug(f"Trusted: {str(ip)} by {str(trusted_proxy)}")
break
if trusted:
logger.debug(f"{ip} is trusted")
continue
else:
logger.debug(f"First untrusted IP: {str(ip)}")
return str(ip)
# if there wasn't anything in the route, just return the default
remote_addr = None
if hasattr(request, "remote_addr"):
remote_addr = request.remote_addr
return remote_addr or "127.0.0.1"
def get_jwt_secret() -> str:
jwt_secret = None
# check env var
if JWT_SECRET_ENV_VAR in os.environ:
logger.debug(
f"Using jwt secret from {JWT_SECRET_ENV_VAR} environment variable."
)
jwt_secret = os.environ.get(JWT_SECRET_ENV_VAR)
# check docker secrets
elif os.path.isfile(os.path.join("/run/secrets", JWT_SECRET_ENV_VAR)):
logger.debug(f"Using jwt secret from {JWT_SECRET_ENV_VAR} docker secret file.")
jwt_secret = (
Path(os.path.join("/run/secrets", JWT_SECRET_ENV_VAR)).read_text().strip()
)
# check for the add-on options file
elif os.path.isfile("/data/options.json"):
with open("/data/options.json") as f:
raw_options = f.read()
logger.debug("Using jwt secret from Home Assistant Add-on options file.")
options = json.loads(raw_options)
jwt_secret = options.get("jwt_secret")
if jwt_secret is None:
jwt_secret_file = os.path.join(CONFIG_DIR, ".jwt_secret")
# check .jwt_secrets file
if not os.path.isfile(jwt_secret_file):
logger.debug(
"No jwt secret found. Generating one and storing in .jwt_secret file in config directory."
)
jwt_secret = secrets.token_hex(64)
try:
with open(jwt_secret_file, "w") as f:
f.write(str(jwt_secret))
except Exception:
logger.warning(
"Unable to write jwt token file to config directory. A new jwt token will be created at each startup."
)
else:
logger.debug("Using jwt secret from .jwt_secret file in config directory.")
with open(jwt_secret_file) as f:
try:
jwt_secret = f.readline().strip()
except Exception:
logger.warning(
"Unable to read jwt token from .jwt_secret file in config directory. A new jwt token will be created at each startup."
)
jwt_secret = secrets.token_hex(64)
if len(jwt_secret) < 64:
logger.warning("JWT Secret is recommended to be 64 characters or more")
return jwt_secret
def hash_password(password: str, salt=None, iterations=600000):
if salt is None:
salt = secrets.token_hex(16)
assert salt and isinstance(salt, str) and "$" not in salt
assert isinstance(password, str)
pw_hash = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations
)
b64_hash = base64.b64encode(pw_hash).decode("ascii").strip()
return "{}${}${}${}".format(PASSWORD_HASH_ALGORITHM, iterations, salt, b64_hash)
def verify_password(password, password_hash):
if (password_hash or "").count("$") != 3:
return False
algorithm, iterations, salt, b64_hash = password_hash.split("$", 3)
iterations = int(iterations)
assert algorithm == PASSWORD_HASH_ALGORITHM
compare_hash = hash_password(password, salt, iterations)
return secrets.compare_digest(password_hash, compare_hash)
def create_encoded_jwt(user, role, expiration, secret):
return jwt.encode(
{"alg": "HS256"}, {"sub": user, "role": role, "exp": expiration}, secret
)
def set_jwt_cookie(response: Response, cookie_name, encoded_jwt, expiration, secure):
# TODO: ideally this would set secure as well, but that requires TLS
response.set_cookie(
key=cookie_name,
value=encoded_jwt,
httponly=True,
expires=expiration,
secure=secure,
)
async def get_current_user(request: Request):
username = request.headers.get("remote-user")
role = request.headers.get("remote-role")
if not username or not role:
return JSONResponse(
content={"message": "No authorization headers."}, status_code=401
)
return {"username": username, "role": role}
def require_role(required_roles: List[str]):
async def role_checker(request: Request):
proxy_config: ProxyConfig = request.app.frigate_config.proxy
config_roles = list(request.app.frigate_config.auth.roles.keys())
# Get role from header (could be comma-separated)
role_header = request.headers.get("remote-role")
roles = (
[r.strip() for r in role_header.split(proxy_config.separator)]
if role_header
else []
)
# Check if we have any roles
if not roles:
raise HTTPException(status_code=403, detail="Role not provided")
# enforce config roles
valid_roles = [r for r in roles if r in config_roles]
if not valid_roles:
raise HTTPException(
status_code=403,
detail=f"No valid roles found in {roles}. Required: {', '.join(required_roles)}. Available: {', '.join(config_roles)}",
)
if not any(role in required_roles for role in valid_roles):
raise HTTPException(
status_code=403,
detail=f"Role {', '.join(valid_roles)} not authorized. Required: {', '.join(required_roles)}",
)
return next(
(role for role in valid_roles if role in required_roles), valid_roles[0]
)
return role_checker
def resolve_role(
headers: dict, proxy_config: ProxyConfig, config_roles: set[str]
) -> str:
"""
Determine the effective role for a request based on proxy headers and configuration.
Order of resolution:
1. If a role header is defined in proxy_config.header_map.role:
- If a role_map is configured, treat the header as group claims
(split by proxy_config.separator) and map to roles.
- If no role_map is configured, treat the header as role names directly.
2. If no valid role is found, return proxy_config.default_role if it's valid in config_roles, else 'viewer'.
Args:
headers (dict): Incoming request headers (case-insensitive).
proxy_config (ProxyConfig): Proxy configuration.
config_roles (set[str]): Set of valid roles from config.
Returns:
str: Resolved role (one of config_roles or validated default).
"""
default_role = proxy_config.default_role
role_header = proxy_config.header_map.role
# Validate default_role against config; fallback to 'viewer' if invalid
validated_default = default_role if default_role in config_roles else "viewer"
if not config_roles:
validated_default = "viewer" # Edge case: no roles defined
if not role_header:
logger.debug(
"No role header configured in proxy_config.header_map. Returning validated default role '%s'.",
validated_default,
)
return validated_default
raw_value = headers.get(role_header, "")
logger.debug("Raw role header value from '%s': %r", role_header, raw_value)
if not raw_value:
logger.debug(
"Role header missing or empty. Returning validated default role '%s'.",
validated_default,
)
return validated_default
# role_map configured, treat header as group claims
if proxy_config.header_map.role_map:
groups = [
g.strip() for g in raw_value.split(proxy_config.separator) if g.strip()
]
logger.debug("Parsed groups from role header: %s", groups)
matched_roles = {
role_name
for role_name, required_groups in proxy_config.header_map.role_map.items()
if any(group in groups for group in required_groups)
}
logger.debug("Matched roles from role_map: %s", matched_roles)
if matched_roles:
resolved = next(
(r for r in config_roles if r in matched_roles), validated_default
)
logger.debug("Resolved role (with role_map) to '%s'.", resolved)
return resolved
logger.debug(
"No role_map match for groups '%s'. Using validated default role '%s'.",
raw_value,
validated_default,
)
return validated_default
# no role_map, treat as role names directly
roles_from_header = [
r.strip().lower() for r in raw_value.split(proxy_config.separator) if r.strip()
]
logger.debug("Parsed roles directly from header: %s", roles_from_header)
resolved = next(
(r for r in config_roles if r in roles_from_header),
validated_default,
)
if resolved == validated_default and roles_from_header:
logger.debug(
"Provided proxy role header values '%s' did not contain a valid role. Using validated default role '%s'.",
raw_value,
validated_default,
)
else:
logger.debug("Resolved role (direct header) to '%s'.", resolved)
return resolved
# Endpoints
@router.get("/auth")
def auth(request: Request):
auth_config: AuthConfig = request.app.frigate_config.auth
proxy_config: ProxyConfig = request.app.frigate_config.proxy
success_response = Response("", status_code=202)
# dont require auth if the request is on the internal port
# this header is set by Frigate's nginx proxy, so it cant be spoofed
if int(request.headers.get("x-server-port", default=0)) == 5000:
success_response.headers["remote-user"] = "anonymous"
success_response.headers["remote-role"] = "admin"
return success_response
fail_response = Response("", status_code=401)
# ensure the proxy secret matches if configured
if (
proxy_config.auth_secret is not None
and request.headers.get("x-proxy-secret", "") != proxy_config.auth_secret
):
logger.debug("X-Proxy-Secret header does not match configured secret value")
return fail_response
# if auth is disabled, just apply the proxy header map and return success
if not auth_config.enabled:
# pass the user header value from the upstream proxy if a mapping is specified
# or use anonymous if none are specified
user_header = proxy_config.header_map.user
success_response.headers["remote-user"] = (
request.headers.get(user_header, default="anonymous")
if user_header
else "anonymous"
)
# parse header and resolve a valid role
config_roles_set = set(auth_config.roles.keys())
role = resolve_role(request.headers, proxy_config, config_roles_set)
success_response.headers["remote-role"] = role
return success_response
# now apply authentication
fail_response.headers["location"] = "/login"
JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name
JWT_COOKIE_SECURE = request.app.frigate_config.auth.cookie_secure
JWT_REFRESH = request.app.frigate_config.auth.refresh_time
JWT_SESSION_LENGTH = request.app.frigate_config.auth.session_length
jwt_source = None
encoded_token = None
if "authorization" in request.headers and request.headers[
"authorization"
].startswith("Bearer "):
jwt_source = "authorization"
logger.debug("Found authorization header")
encoded_token = request.headers["authorization"].replace("Bearer ", "")
elif JWT_COOKIE_NAME in request.cookies:
jwt_source = "cookie"
logger.debug("Found jwt cookie")
encoded_token = request.cookies[JWT_COOKIE_NAME]
if encoded_token is None:
logger.debug("No jwt token found")
return fail_response
try:
token = jwt.decode(encoded_token, request.app.jwt_token)
if "sub" not in token.claims:
logger.debug("user not set in jwt token")
return fail_response
if "role" not in token.claims:
logger.debug("role not set in jwt token")
return fail_response
if "exp" not in token.claims:
logger.debug("exp not set in jwt token")
return fail_response
user = token.claims.get("sub")
role = token.claims.get("role")
current_time = int(time.time())
# if the jwt is expired
expiration = int(token.claims.get("exp"))
logger.debug(
f"current time: {datetime.fromtimestamp(current_time).strftime('%c')}"
)
logger.debug(
f"jwt expires at: {datetime.fromtimestamp(expiration).strftime('%c')}"
)
logger.debug(
f"jwt refresh at: {datetime.fromtimestamp(expiration - JWT_REFRESH).strftime('%c')}"
)
if expiration <= current_time:
logger.debug("jwt token expired")
return fail_response
# if the jwt cookie is expiring soon
elif jwt_source == "cookie" and expiration - JWT_REFRESH <= current_time:
logger.debug("jwt token expiring soon, refreshing cookie")
# ensure the user hasn't been deleted
try:
User.get_by_id(user)
except DoesNotExist:
return fail_response
new_expiration = current_time + JWT_SESSION_LENGTH
new_encoded_jwt = create_encoded_jwt(
user, role, new_expiration, request.app.jwt_token
)
set_jwt_cookie(
success_response,
JWT_COOKIE_NAME,
new_encoded_jwt,
new_expiration,
JWT_COOKIE_SECURE,
)
success_response.headers["remote-user"] = user
success_response.headers["remote-role"] = role
return success_response
except Exception as e:
logger.error(f"Error parsing jwt: {e}")
return fail_response
@router.get("/profile")
def profile(request: Request):
username = request.headers.get("remote-user", "anonymous")
role = request.headers.get("remote-role", "viewer")
all_camera_names = set(request.app.frigate_config.cameras.keys())
roles_dict = request.app.frigate_config.auth.roles
allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names)
return JSONResponse(
content={"username": username, "role": role, "allowed_cameras": allowed_cameras}
)
@router.get("/logout")
def logout(request: Request):
auth_config: AuthConfig = request.app.frigate_config.auth
response = RedirectResponse("/login", status_code=303)
response.delete_cookie(auth_config.cookie_name)
return response
limiter = Limiter(key_func=get_remote_addr)
@router.post("/login")
@limiter.limit(limit_value=rateLimiter.get_limit)
def login(request: Request, body: AppPostLoginBody):
JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name
JWT_COOKIE_SECURE = request.app.frigate_config.auth.cookie_secure
JWT_SESSION_LENGTH = request.app.frigate_config.auth.session_length
user = body.user
password = body.password
try:
db_user: User = User.get_by_id(user)
except DoesNotExist:
return JSONResponse(content={"message": "Login failed"}, status_code=401)
password_hash = db_user.password_hash
if verify_password(password, password_hash):
role = getattr(db_user, "role", "viewer")
config_roles_set = set(request.app.frigate_config.auth.roles.keys())
if role not in config_roles_set:
logger.warning(
f"User {db_user.username} has an invalid role {role}, falling back to 'viewer'."
)
role = "viewer"
expiration = int(time.time()) + JWT_SESSION_LENGTH
encoded_jwt = create_encoded_jwt(user, role, expiration, request.app.jwt_token)
response = Response("", 200)
set_jwt_cookie(
response, JWT_COOKIE_NAME, encoded_jwt, expiration, JWT_COOKIE_SECURE
)
return response
return JSONResponse(content={"message": "Login failed"}, status_code=401)
@router.get("/users", dependencies=[Depends(require_role(["admin"]))])
def get_users():
exports = (
User.select(User.username, User.role).order_by(User.username).dicts().iterator()
)
return JSONResponse([e for e in exports])
@router.post("/users", dependencies=[Depends(require_role(["admin"]))])
def create_user(
request: Request,
body: AppPostUsersBody,
):
HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations
config_roles = list(request.app.frigate_config.auth.roles.keys())
if not re.match("^[A-Za-z0-9._]+$", body.username):
return JSONResponse(content={"message": "Invalid username"}, status_code=400)
if body.role not in config_roles:
return JSONResponse(
content={"message": f"Role must be one of: {', '.join(config_roles)}"},
status_code=400,
)
role = body.role or "viewer"
password_hash = hash_password(body.password, iterations=HASH_ITERATIONS)
User.insert(
{
User.username: body.username,
User.password_hash: password_hash,
User.role: role,
User.notification_tokens: [],
}
).execute()
return JSONResponse(content={"username": body.username})
@router.delete("/users/{username}")
def delete_user(username: str):
User.delete_by_id(username)
return JSONResponse(content={"success": True})
@router.put("/users/{username}/password")
async def update_password(
request: Request,
username: str,
body: AppPutPasswordBody,
):
current_user = await get_current_user(request)
if isinstance(current_user, JSONResponse):
# auth failed
return current_user
current_username = current_user.get("username")
current_role = current_user.get("role")
# viewers can only change their own password
if current_role == "viewer" and current_username != username:
raise HTTPException(
status_code=403, detail="Viewers can only update their own password"
)
HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations
password_hash = hash_password(body.password, iterations=HASH_ITERATIONS)
User.set_by_id(username, {User.password_hash: password_hash})
return JSONResponse(content={"success": True})
@router.put(
"/users/{username}/role",
dependencies=[Depends(require_role(["admin"]))],
)
async def update_role(
request: Request,
username: str,
body: AppPutRoleBody,
):
current_user = await get_current_user(request)
if isinstance(current_user, JSONResponse):
# auth failed
return current_user
current_role = current_user.get("role")
# viewers can't change anyone's role
if current_role == "viewer":
raise HTTPException(
status_code=403, detail="Admin role is required to change user roles"
)
if username == "admin":
return JSONResponse(
content={"message": "Cannot modify admin user's role"}, status_code=403
)
config_roles = list(request.app.frigate_config.auth.roles.keys())
if body.role not in config_roles:
return JSONResponse(
content={"message": f"Role must be one of: {', '.join(config_roles)}"},
status_code=400,
)
User.set_by_id(username, {User.role: body.role})
return JSONResponse(content={"success": True})
async def require_camera_access(
camera_name: Optional[str] = None,
request: Request = None,
):
"""Dependency to enforce camera access based on user role."""
if camera_name is None:
return # For lists, filter later
current_user = await get_current_user(request)
if isinstance(current_user, JSONResponse):
return current_user
role = current_user["role"]
all_camera_names = set(request.app.frigate_config.cameras.keys())
roles_dict = request.app.frigate_config.auth.roles
allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names)
# Admin or full access bypasses
if role == "admin" or not roles_dict.get(role):
return
if camera_name not in allowed_cameras:
raise HTTPException(
status_code=403,
detail=f"Access denied to camera '{camera_name}'. Allowed: {allowed_cameras}",
)
async def get_allowed_cameras_for_filter(request: Request):
"""Dependency to get allowed_cameras for filtering lists."""
current_user = await get_current_user(request)
if isinstance(current_user, JSONResponse):
return [] # Unauthorized: no cameras
role = current_user["role"]
all_camera_names = set(request.app.frigate_config.cameras.keys())
roles_dict = request.app.frigate_config.auth.roles
return User.get_allowed_cameras(role, roles_dict, all_camera_names)