mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-09-10 17:51:45 +02:00
Ensure proxy group claim uses the configured separator character (#19869)
* Ensure group claim uses the configured separator character * refactor to helper function * tests * clean up
This commit is contained in:
parent
55160f9235
commit
bd255362d6
@ -238,6 +238,89 @@ def require_role(required_roles: List[str]):
|
|||||||
return role_checker
|
return role_checker
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str:
|
||||||
|
"""
|
||||||
|
Determine the effective role for a request based on proxy headers and configuration.
|
||||||
|
|
||||||
|
Order of resolution:
|
||||||
|
1. If a role header is defined in proxy_config.header_map.role:
|
||||||
|
- If a role_map is configured, treat the header as group claims
|
||||||
|
(split by proxy_config.separator) and map to roles.
|
||||||
|
- If no role_map is configured, treat the header as role names directly.
|
||||||
|
2. If no valid role is found, return proxy_config.default_role.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
headers (dict): Incoming request headers (case-insensitive).
|
||||||
|
proxy_config (ProxyConfig): Proxy configuration.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Resolved role (always one of VALID_ROLES).
|
||||||
|
"""
|
||||||
|
role = proxy_config.default_role
|
||||||
|
role_header = proxy_config.header_map.role
|
||||||
|
|
||||||
|
if not role_header:
|
||||||
|
logger.debug(
|
||||||
|
"No role header configured in proxy_config.header_map. Returning default role '%s'.",
|
||||||
|
role,
|
||||||
|
)
|
||||||
|
return role
|
||||||
|
|
||||||
|
raw_value = headers.get(role_header, "")
|
||||||
|
logger.debug("Raw role header value from '%s': %r", role_header, raw_value)
|
||||||
|
|
||||||
|
if not raw_value:
|
||||||
|
logger.debug("Role header missing or empty. Returning default role '%s'.", role)
|
||||||
|
return role
|
||||||
|
|
||||||
|
# role_map configured, treat header as group claims
|
||||||
|
if proxy_config.header_map.role_map:
|
||||||
|
groups = [
|
||||||
|
g.strip() for g in raw_value.split(proxy_config.separator) if g.strip()
|
||||||
|
]
|
||||||
|
logger.debug("Parsed groups from role header: %s", groups)
|
||||||
|
|
||||||
|
matched_roles = {
|
||||||
|
role_name
|
||||||
|
for role_name, required_groups in proxy_config.header_map.role_map.items()
|
||||||
|
if any(group in groups for group in required_groups)
|
||||||
|
}
|
||||||
|
logger.debug("Matched roles from role_map: %s", matched_roles)
|
||||||
|
|
||||||
|
if matched_roles:
|
||||||
|
resolved = next((r for r in VALID_ROLES if r in matched_roles), role)
|
||||||
|
logger.debug("Resolved role (with role_map) to '%s'.", resolved)
|
||||||
|
return resolved
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"No role_map match for groups '%s'. Using default role '%s'.",
|
||||||
|
raw_value,
|
||||||
|
proxy_config.default_role,
|
||||||
|
)
|
||||||
|
return role
|
||||||
|
|
||||||
|
# no role_map, treat as role names directly
|
||||||
|
roles_from_header = [
|
||||||
|
r.strip().lower() for r in raw_value.split(proxy_config.separator) if r.strip()
|
||||||
|
]
|
||||||
|
logger.debug("Parsed roles directly from header: %s", roles_from_header)
|
||||||
|
|
||||||
|
resolved = next(
|
||||||
|
(r for r in VALID_ROLES if r in roles_from_header),
|
||||||
|
proxy_config.default_role,
|
||||||
|
)
|
||||||
|
if resolved == proxy_config.default_role and roles_from_header:
|
||||||
|
logger.debug(
|
||||||
|
"Provided proxy role header values '%s' did not contain a valid role. Using default role '%s'.",
|
||||||
|
raw_value,
|
||||||
|
proxy_config.default_role,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.debug("Resolved role (direct header) to '%s'.", resolved)
|
||||||
|
|
||||||
|
return resolved
|
||||||
|
|
||||||
|
|
||||||
# Endpoints
|
# Endpoints
|
||||||
@router.get("/auth")
|
@router.get("/auth")
|
||||||
def auth(request: Request):
|
def auth(request: Request):
|
||||||
@ -274,36 +357,8 @@ def auth(request: Request):
|
|||||||
else "anonymous"
|
else "anonymous"
|
||||||
)
|
)
|
||||||
|
|
||||||
# start with default_role
|
# parse header and resolve a valid role
|
||||||
role = proxy_config.default_role
|
role = resolve_role(request.headers, proxy_config)
|
||||||
|
|
||||||
# first try: explicit role header
|
|
||||||
role_header = proxy_config.header_map.role
|
|
||||||
if role_header:
|
|
||||||
raw_value = request.headers.get(role_header, "")
|
|
||||||
if proxy_config.header_map.role_map and raw_value:
|
|
||||||
# treat as group claim
|
|
||||||
groups = [
|
|
||||||
g.strip()
|
|
||||||
for g in raw_value.replace(" ", ",").split(",")
|
|
||||||
if g.strip()
|
|
||||||
]
|
|
||||||
for (
|
|
||||||
candidate_role,
|
|
||||||
required_groups,
|
|
||||||
) in proxy_config.header_map.role_map.items():
|
|
||||||
if any(group in groups for group in required_groups):
|
|
||||||
role = candidate_role
|
|
||||||
break
|
|
||||||
elif raw_value:
|
|
||||||
normalized_role = raw_value.strip().lower()
|
|
||||||
if normalized_role in VALID_ROLES:
|
|
||||||
role = normalized_role
|
|
||||||
else:
|
|
||||||
logger.warning(
|
|
||||||
f"Provided proxy role header contains invalid value '{raw_value}'. Using default role '{proxy_config.default_role}'."
|
|
||||||
)
|
|
||||||
role = proxy_config.default_role
|
|
||||||
|
|
||||||
success_response.headers["remote-role"] = role
|
success_response.headers["remote-role"] = role
|
||||||
return success_response
|
return success_response
|
||||||
|
79
frigate/test/test_proxy_auth.py
Normal file
79
frigate/test/test_proxy_auth.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
from frigate.api.auth import resolve_role
|
||||||
|
from frigate.config import HeaderMappingConfig, ProxyConfig
|
||||||
|
|
||||||
|
|
||||||
|
class TestProxyRoleResolution(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.proxy_config = ProxyConfig(
|
||||||
|
auth_secret=None,
|
||||||
|
default_role="viewer",
|
||||||
|
separator="|",
|
||||||
|
header_map=HeaderMappingConfig(
|
||||||
|
user="x-remote-user",
|
||||||
|
role="x-remote-role",
|
||||||
|
role_map={
|
||||||
|
"admin": ["group_admin"],
|
||||||
|
"viewer": ["group_viewer"],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_role_map_single_group_match(self):
|
||||||
|
headers = {"x-remote-role": "group_admin"}
|
||||||
|
role = resolve_role(headers, self.proxy_config)
|
||||||
|
self.assertEqual(role, "admin")
|
||||||
|
|
||||||
|
def test_role_map_multiple_groups(self):
|
||||||
|
headers = {"x-remote-role": "group_viewer|group_admin"}
|
||||||
|
role = resolve_role(headers, self.proxy_config)
|
||||||
|
# admin should win since VALID_ROLES priority puts it before viewer
|
||||||
|
self.assertEqual(role, "admin")
|
||||||
|
|
||||||
|
def test_direct_role_header_with_separator(self):
|
||||||
|
config = self.proxy_config
|
||||||
|
config.header_map.role_map = None # disable role_map
|
||||||
|
headers = {"x-remote-role": "viewer|admin"}
|
||||||
|
role = resolve_role(headers, config)
|
||||||
|
# admin should be chosen since it appears in VALID_ROLES
|
||||||
|
self.assertEqual(role, "admin")
|
||||||
|
|
||||||
|
def test_invalid_role_header(self):
|
||||||
|
config = self.proxy_config
|
||||||
|
config.header_map.role_map = None
|
||||||
|
headers = {"x-remote-role": "notarole"}
|
||||||
|
role = resolve_role(headers, config)
|
||||||
|
self.assertEqual(role, config.default_role)
|
||||||
|
|
||||||
|
def test_missing_role_header(self):
|
||||||
|
headers = {}
|
||||||
|
role = resolve_role(headers, self.proxy_config)
|
||||||
|
self.assertEqual(role, self.proxy_config.default_role)
|
||||||
|
|
||||||
|
def test_empty_role_header(self):
|
||||||
|
headers = {"x-remote-role": ""}
|
||||||
|
role = resolve_role(headers, self.proxy_config)
|
||||||
|
self.assertEqual(role, self.proxy_config.default_role)
|
||||||
|
|
||||||
|
def test_whitespace_groups(self):
|
||||||
|
headers = {"x-remote-role": " | group_admin | "}
|
||||||
|
role = resolve_role(headers, self.proxy_config)
|
||||||
|
self.assertEqual(role, "admin")
|
||||||
|
|
||||||
|
def test_mixed_valid_and_invalid_groups(self):
|
||||||
|
headers = {"x-remote-role": "bogus|group_viewer"}
|
||||||
|
role = resolve_role(headers, self.proxy_config)
|
||||||
|
self.assertEqual(role, "viewer")
|
||||||
|
|
||||||
|
def test_case_insensitive_role_direct(self):
|
||||||
|
config = self.proxy_config
|
||||||
|
config.header_map.role_map = None
|
||||||
|
headers = {"x-remote-role": "AdMiN"}
|
||||||
|
role = resolve_role(headers, config)
|
||||||
|
self.assertEqual(role, "admin")
|
||||||
|
|
||||||
|
def test_role_map_no_match_falls_back(self):
|
||||||
|
headers = {"x-remote-role": "group_unknown"}
|
||||||
|
role = resolve_role(headers, self.proxy_config)
|
||||||
|
self.assertEqual(role, self.proxy_config.default_role)
|
Loading…
Reference in New Issue
Block a user