From 698b657af763812216a99028e39cfac746f992bf Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Mon, 1 Sep 2025 16:15:49 -0500 Subject: [PATCH] refactor to helper function --- frigate/api/auth.py | 132 ++++++++++++++++++++++++++++---------------- 1 file changed, 85 insertions(+), 47 deletions(-) diff --git a/frigate/api/auth.py b/frigate/api/auth.py index ddd84045d..8f52f6194 100644 --- a/frigate/api/auth.py +++ b/frigate/api/auth.py @@ -238,6 +238,89 @@ def require_role(required_roles: List[str]): return role_checker +def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str: + """ + Determine the effective role for a request based on proxy headers and configuration. + + Order of resolution: + 1. If a role header is defined in proxy_config.header_map.role: + - If a role_map is configured, treat the header as group claims + (split by proxy_config.separator) and map to roles. + - If no role_map is configured, treat the header as role names directly. + 2. If no valid role is found, return proxy_config.default_role. + + Args: + headers (dict): Incoming request headers (case-insensitive). + proxy_config (ProxyConfig): Proxy configuration. + + Returns: + str: Resolved role (always one of VALID_ROLES). + """ + role = proxy_config.default_role + role_header = proxy_config.header_map.role + + if not role_header: + logger.debug( + "No role header configured in proxy_config.header_map. Returning default role '%s'.", + role, + ) + return role + + raw_value = headers.get(role_header, "") + logger.debug("Raw role header value from '%s': %r", role_header, raw_value) + + if not raw_value: + logger.debug("Role header missing or empty. Returning default role '%s'.", role) + return role + + # role_map configured, treat header as group claims + if proxy_config.header_map.role_map: + groups = [ + g.strip() for g in raw_value.split(proxy_config.separator) if g.strip() + ] + logger.debug("Parsed groups from role header: %s", groups) + + matched_roles = { + role_name + for role_name, required_groups in proxy_config.header_map.role_map.items() + if any(group in groups for group in required_groups) + } + logger.debug("Matched roles from role_map: %s", matched_roles) + + if matched_roles: + resolved = next((r for r in VALID_ROLES if r in matched_roles), role) + logger.debug("Resolved role (with role_map) to '%s'.", resolved) + return resolved + + logger.debug( + "No role_map match for groups '%s'. Using default role '%s'.", + raw_value, + proxy_config.default_role, + ) + return role + + # no role_map, treat as role names directly + roles_from_header = [ + r.strip().lower() for r in raw_value.split(proxy_config.separator) if r.strip() + ] + logger.debug("Parsed roles directly from header: %s", roles_from_header) + + resolved = next( + (r for r in VALID_ROLES if r in roles_from_header), + proxy_config.default_role, + ) + if resolved == proxy_config.default_role and roles_from_header: + logger.debug( + "Provided proxy role header values '%s' did not contain a valid role. Using default role '%s'.", + raw_value, + proxy_config.default_role, + ) + else: + logger.debug("Resolved role (direct header) to '%s'.", resolved) + + return resolved + + # Endpoints @router.get("/auth") def auth(request: Request): @@ -274,53 +357,8 @@ def auth(request: Request): else "anonymous" ) - # start with default_role - role = proxy_config.default_role - - # first try: explicit role header - role_header = proxy_config.header_map.role - if role_header: - raw_value = request.headers.get(role_header, "") - if proxy_config.header_map.role_map and raw_value: - # treat as group claim split by the configured separator - groups = [ - g.strip() - for g in raw_value.split(proxy_config.separator) - if g.strip() - ] - - # collect all roles whose mapped groups intersect the provided groups - matched_roles = { - role_name - for role_name, required_groups in proxy_config.header_map.role_map.items() - if any(group in groups for group in required_groups) - } - - if matched_roles: - # choose by VALID_ROLES priority (eg, 'admin' before 'viewer') - role = next( - (r for r in VALID_ROLES if r in matched_roles), - proxy_config.default_role, - ) - else: - logger.info( - f"No role_map match for groups '{raw_value}'. Using default role '{proxy_config.default_role}'." - ) - elif raw_value: - # no role map specified, so header may contain role name(s) directly - roles_from_header = [ - r.strip().lower() - for r in raw_value.split(proxy_config.separator) - if r.strip() - ] - role = next( - (r for r in VALID_ROLES if r in roles_from_header), - proxy_config.default_role, - ) - if role == proxy_config.default_role and roles_from_header: - logger.warning( - f"Provided proxy role header values '{raw_value}' did not contain a valid role. Using default role '{proxy_config.default_role}'." - ) + # parse header and + role = resolve_role(request.headers, proxy_config) success_response.headers["remote-role"] = role return success_response