authentik.api.authentication
API Authentication
1"""API Authentication""" 2 3from hmac import compare_digest 4from pathlib import Path 5from tempfile import gettempdir 6from typing import Any 7 8from django.conf import settings 9from django.contrib.auth.models import AnonymousUser 10from drf_spectacular.extensions import OpenApiAuthenticationExtension 11from rest_framework.authentication import BaseAuthentication, get_authorization_header 12from rest_framework.exceptions import AuthenticationFailed 13from rest_framework.request import Request 14from structlog.stdlib import get_logger 15 16from authentik.common.oauth.constants import SCOPE_AUTHENTIK_API 17from authentik.core.middleware import CTX_AUTH_VIA 18from authentik.core.models import Token, TokenIntents, User, UserTypes 19from authentik.outposts.models import Outpost 20 21LOGGER = get_logger() 22_tmp = Path(gettempdir()) 23try: 24 with open(_tmp / "authentik-core-ipc.key") as _f: 25 ipc_key = _f.read() 26except OSError: 27 ipc_key = None 28 29 30def validate_auth(header: bytes, format="bearer") -> str | None: 31 """Validate that the header is in a correct format, 32 returns type and credentials""" 33 auth_credentials = header.decode().strip() 34 if auth_credentials == "" or " " not in auth_credentials: 35 return None 36 auth_type, _, auth_credentials = auth_credentials.partition(" ") 37 if not compare_digest(auth_type.lower(), format): 38 LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower()) 39 return None 40 if auth_credentials == "": # nosec # noqa 41 raise AuthenticationFailed("Malformed header") 42 return auth_credentials 43 44 45class IPCUser(AnonymousUser): 46 """'Virtual' user for IPC communication between authentik core and the authentik router""" 47 48 username = "authentik:system" 49 is_active = True 50 is_superuser = True 51 52 @property 53 def type(self): 54 return UserTypes.INTERNAL_SERVICE_ACCOUNT 55 56 def has_perm(self, perm, obj=None): 57 return True 58 59 def has_perms(self, perm_list, obj=None): 60 return True 61 62 def has_module_perms(self, module): 63 return True 64 65 @property 66 def is_anonymous(self): 67 return False 68 69 @property 70 def is_authenticated(self): 71 return True 72 73 def all_roles(self): 74 return [] 75 76 77class TokenAuthentication(BaseAuthentication): 78 """Token-based authentication using HTTP Bearer authentication""" 79 80 def authenticate(self, request: Request) -> tuple[User, Any] | None: 81 """Token-based authentication using HTTP Bearer authentication""" 82 auth = get_authorization_header(request) 83 84 user_ctx = self.bearer_auth(auth) 85 # None is only returned when the header isn't set. 86 if not user_ctx: 87 return None 88 89 return user_ctx 90 91 def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None: 92 """raw_header in the Format of `Bearer ....`""" 93 user_ctx = self.auth_user_lookup(raw_header) 94 if not user_ctx: 95 return None 96 user, ctx = user_ctx 97 if not user.is_active: 98 raise AuthenticationFailed("Token invalid/expired") 99 return user, ctx 100 101 def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None: 102 """raw_header in the Format of `Bearer ....`""" 103 from authentik.providers.oauth2.models import AccessToken 104 105 auth_credentials = validate_auth(raw_header) 106 if not auth_credentials: 107 return None 108 # first, check traditional tokens 109 key_token = Token.objects.filter( 110 key=auth_credentials, intent=TokenIntents.INTENT_API 111 ).first() 112 if key_token: 113 CTX_AUTH_VIA.set("api_token") 114 return key_token.user, key_token 115 # then try to auth via JWT 116 jwt_token = AccessToken.objects.filter( 117 token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API 118 ).first() 119 if jwt_token: 120 # Double-check scopes, since they are saved in a single string 121 # we want to check the parsed version too 122 if SCOPE_AUTHENTIK_API not in jwt_token.scope: 123 raise AuthenticationFailed("Token invalid/expired") 124 CTX_AUTH_VIA.set("jwt") 125 return jwt_token.user, jwt_token 126 # then try to auth via secret key (for embedded outpost/etc) 127 user_outpost = self.token_secret_key(auth_credentials) 128 if user_outpost: 129 CTX_AUTH_VIA.set("secret_key") 130 return user_outpost 131 # then try to auth via secret key (for embedded outpost/etc) 132 user = self.token_ipc(auth_credentials) 133 if user: 134 CTX_AUTH_VIA.set("ipc") 135 return user 136 raise AuthenticationFailed("Token invalid/expired") 137 138 def token_ipc(self, value: str) -> tuple[User, None] | None: 139 """Check if the token is the secret key 140 and return the service account for the managed outpost""" 141 if not ipc_key or not compare_digest(value, ipc_key): 142 return None 143 return IPCUser(), None 144 145 def token_secret_key(self, value: str) -> tuple[User, Outpost] | None: 146 """Check if the token is the secret key 147 and return the service account for the managed outpost""" 148 from authentik.outposts.apps import MANAGED_OUTPOST 149 150 if not compare_digest(value, settings.SECRET_KEY): 151 return None 152 outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST) 153 if not outposts: 154 return None 155 outpost = outposts.first() 156 return outpost.user, outpost 157 158 159class TokenSchema(OpenApiAuthenticationExtension): 160 """Auth schema""" 161 162 target_class = TokenAuthentication 163 name = "authentik" 164 165 def get_security_definition(self, auto_schema): 166 """Auth schema""" 167 return {"type": "http", "scheme": "bearer"}
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def
validate_auth(header: bytes, format='bearer') -> str | None:
31def validate_auth(header: bytes, format="bearer") -> str | None: 32 """Validate that the header is in a correct format, 33 returns type and credentials""" 34 auth_credentials = header.decode().strip() 35 if auth_credentials == "" or " " not in auth_credentials: 36 return None 37 auth_type, _, auth_credentials = auth_credentials.partition(" ") 38 if not compare_digest(auth_type.lower(), format): 39 LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower()) 40 return None 41 if auth_credentials == "": # nosec # noqa 42 raise AuthenticationFailed("Malformed header") 43 return auth_credentials
Validate that the header is in a correct format, returns type and credentials
class
IPCUser(django.contrib.auth.models.AnonymousUser):
46class IPCUser(AnonymousUser): 47 """'Virtual' user for IPC communication between authentik core and the authentik router""" 48 49 username = "authentik:system" 50 is_active = True 51 is_superuser = True 52 53 @property 54 def type(self): 55 return UserTypes.INTERNAL_SERVICE_ACCOUNT 56 57 def has_perm(self, perm, obj=None): 58 return True 59 60 def has_perms(self, perm_list, obj=None): 61 return True 62 63 def has_module_perms(self, module): 64 return True 65 66 @property 67 def is_anonymous(self): 68 return False 69 70 @property 71 def is_authenticated(self): 72 return True 73 74 def all_roles(self): 75 return []
'Virtual' user for IPC communication between authentik core and the authentik router
class
TokenAuthentication(rest_framework.authentication.BaseAuthentication):
78class TokenAuthentication(BaseAuthentication): 79 """Token-based authentication using HTTP Bearer authentication""" 80 81 def authenticate(self, request: Request) -> tuple[User, Any] | None: 82 """Token-based authentication using HTTP Bearer authentication""" 83 auth = get_authorization_header(request) 84 85 user_ctx = self.bearer_auth(auth) 86 # None is only returned when the header isn't set. 87 if not user_ctx: 88 return None 89 90 return user_ctx 91 92 def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None: 93 """raw_header in the Format of `Bearer ....`""" 94 user_ctx = self.auth_user_lookup(raw_header) 95 if not user_ctx: 96 return None 97 user, ctx = user_ctx 98 if not user.is_active: 99 raise AuthenticationFailed("Token invalid/expired") 100 return user, ctx 101 102 def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None: 103 """raw_header in the Format of `Bearer ....`""" 104 from authentik.providers.oauth2.models import AccessToken 105 106 auth_credentials = validate_auth(raw_header) 107 if not auth_credentials: 108 return None 109 # first, check traditional tokens 110 key_token = Token.objects.filter( 111 key=auth_credentials, intent=TokenIntents.INTENT_API 112 ).first() 113 if key_token: 114 CTX_AUTH_VIA.set("api_token") 115 return key_token.user, key_token 116 # then try to auth via JWT 117 jwt_token = AccessToken.objects.filter( 118 token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API 119 ).first() 120 if jwt_token: 121 # Double-check scopes, since they are saved in a single string 122 # we want to check the parsed version too 123 if SCOPE_AUTHENTIK_API not in jwt_token.scope: 124 raise AuthenticationFailed("Token invalid/expired") 125 CTX_AUTH_VIA.set("jwt") 126 return jwt_token.user, jwt_token 127 # then try to auth via secret key (for embedded outpost/etc) 128 user_outpost = self.token_secret_key(auth_credentials) 129 if user_outpost: 130 CTX_AUTH_VIA.set("secret_key") 131 return user_outpost 132 # then try to auth via secret key (for embedded outpost/etc) 133 user = self.token_ipc(auth_credentials) 134 if user: 135 CTX_AUTH_VIA.set("ipc") 136 return user 137 raise AuthenticationFailed("Token invalid/expired") 138 139 def token_ipc(self, value: str) -> tuple[User, None] | None: 140 """Check if the token is the secret key 141 and return the service account for the managed outpost""" 142 if not ipc_key or not compare_digest(value, ipc_key): 143 return None 144 return IPCUser(), None 145 146 def token_secret_key(self, value: str) -> tuple[User, Outpost] | None: 147 """Check if the token is the secret key 148 and return the service account for the managed outpost""" 149 from authentik.outposts.apps import MANAGED_OUTPOST 150 151 if not compare_digest(value, settings.SECRET_KEY): 152 return None 153 outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST) 154 if not outposts: 155 return None 156 outpost = outposts.first() 157 return outpost.user, outpost
Token-based authentication using HTTP Bearer authentication
def
authenticate( self, request: rest_framework.request.Request) -> tuple[authentik.core.models.User, Any] | None:
81 def authenticate(self, request: Request) -> tuple[User, Any] | None: 82 """Token-based authentication using HTTP Bearer authentication""" 83 auth = get_authorization_header(request) 84 85 user_ctx = self.bearer_auth(auth) 86 # None is only returned when the header isn't set. 87 if not user_ctx: 88 return None 89 90 return user_ctx
Token-based authentication using HTTP Bearer authentication
92 def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None: 93 """raw_header in the Format of `Bearer ....`""" 94 user_ctx = self.auth_user_lookup(raw_header) 95 if not user_ctx: 96 return None 97 user, ctx = user_ctx 98 if not user.is_active: 99 raise AuthenticationFailed("Token invalid/expired") 100 return user, ctx
raw_header in the Format of Bearer ....
102 def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None: 103 """raw_header in the Format of `Bearer ....`""" 104 from authentik.providers.oauth2.models import AccessToken 105 106 auth_credentials = validate_auth(raw_header) 107 if not auth_credentials: 108 return None 109 # first, check traditional tokens 110 key_token = Token.objects.filter( 111 key=auth_credentials, intent=TokenIntents.INTENT_API 112 ).first() 113 if key_token: 114 CTX_AUTH_VIA.set("api_token") 115 return key_token.user, key_token 116 # then try to auth via JWT 117 jwt_token = AccessToken.objects.filter( 118 token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API 119 ).first() 120 if jwt_token: 121 # Double-check scopes, since they are saved in a single string 122 # we want to check the parsed version too 123 if SCOPE_AUTHENTIK_API not in jwt_token.scope: 124 raise AuthenticationFailed("Token invalid/expired") 125 CTX_AUTH_VIA.set("jwt") 126 return jwt_token.user, jwt_token 127 # then try to auth via secret key (for embedded outpost/etc) 128 user_outpost = self.token_secret_key(auth_credentials) 129 if user_outpost: 130 CTX_AUTH_VIA.set("secret_key") 131 return user_outpost 132 # then try to auth via secret key (for embedded outpost/etc) 133 user = self.token_ipc(auth_credentials) 134 if user: 135 CTX_AUTH_VIA.set("ipc") 136 return user 137 raise AuthenticationFailed("Token invalid/expired")
raw_header in the Format of Bearer ....
139 def token_ipc(self, value: str) -> tuple[User, None] | None: 140 """Check if the token is the secret key 141 and return the service account for the managed outpost""" 142 if not ipc_key or not compare_digest(value, ipc_key): 143 return None 144 return IPCUser(), None
Check if the token is the secret key and return the service account for the managed outpost
def
token_secret_key( self, value: str) -> tuple[authentik.core.models.User, authentik.outposts.models.Outpost] | None:
146 def token_secret_key(self, value: str) -> tuple[User, Outpost] | None: 147 """Check if the token is the secret key 148 and return the service account for the managed outpost""" 149 from authentik.outposts.apps import MANAGED_OUTPOST 150 151 if not compare_digest(value, settings.SECRET_KEY): 152 return None 153 outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST) 154 if not outposts: 155 return None 156 outpost = outposts.first() 157 return outpost.user, outpost
Check if the token is the secret key and return the service account for the managed outpost
class
TokenSchema(drf_spectacular.plumbing.OpenApiGeneratorExtension[ForwardRef('OpenApiAuthenticationExtension')]):
160class TokenSchema(OpenApiAuthenticationExtension): 161 """Auth schema""" 162 163 target_class = TokenAuthentication 164 name = "authentik" 165 166 def get_security_definition(self, auto_schema): 167 """Auth schema""" 168 return {"type": "http", "scheme": "bearer"}
Auth schema
target_class =
<class 'TokenAuthentication'>