authentik.api.authentication
API Authentication
1"""API Authentication""" 2 3from hmac import compare_digest 4from pathlib import Path 5from tempfile import gettempdir 6from typing import Any 7 8from django.conf import settings 9from django.contrib.auth.models import AnonymousUser 10from drf_spectacular.extensions import OpenApiAuthenticationExtension 11from rest_framework.authentication import BaseAuthentication, get_authorization_header 12from rest_framework.exceptions import AuthenticationFailed 13from rest_framework.request import Request 14from structlog.stdlib import get_logger 15 16from authentik.common.oauth.constants import SCOPE_AUTHENTIK_API 17from authentik.core.middleware import CTX_AUTH_VIA 18from authentik.core.models import Token, TokenIntents, User, UserTypes 19from authentik.outposts.models import Outpost 20 21LOGGER = get_logger() 22_tmp = Path(gettempdir()) 23try: 24 with open(_tmp / "authentik-core-ipc.key") as _f: 25 ipc_key = _f.read() 26except OSError: 27 ipc_key = None 28 29 30def validate_auth(header: bytes, format="bearer") -> str | None: 31 """Validate that the header is in a correct format, 32 returns type and credentials""" 33 auth_credentials = header.decode().strip() 34 if auth_credentials == "" or " " not in auth_credentials: 35 return None 36 auth_type, _, auth_credentials = auth_credentials.partition(" ") 37 if not compare_digest(auth_type.lower(), format): 38 LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower()) 39 return None 40 if auth_credentials == "": # nosec # noqa 41 raise AuthenticationFailed("Malformed header") 42 return auth_credentials 43 44 45class VirtualUser(AnonymousUser): 46 is_active = True 47 48 @property 49 def type(self): 50 return UserTypes.INTERNAL_SERVICE_ACCOUNT 51 52 @property 53 def is_anonymous(self): 54 return False 55 56 @property 57 def is_authenticated(self): 58 return True 59 60 def all_roles(self): 61 return [] 62 63 64class IPCUser(VirtualUser): 65 """'Virtual' user for IPC communication between authentik core and the authentik router""" 66 67 username = "authentik:system" 68 is_superuser = True 69 70 @property 71 def type(self): 72 return UserTypes.INTERNAL_SERVICE_ACCOUNT 73 74 def has_perm(self, perm, obj=None): 75 return True 76 77 def has_perms(self, perm_list, obj=None): 78 return True 79 80 def has_module_perms(self, module): 81 return True 82 83 84class TokenAuthentication(BaseAuthentication): 85 """Token-based authentication using HTTP Bearer authentication""" 86 87 def authenticate(self, request: Request) -> tuple[User, Any] | None: 88 """Token-based authentication using HTTP Bearer authentication""" 89 auth = get_authorization_header(request) 90 91 user_ctx = self.bearer_auth(auth) 92 # None is only returned when the header isn't set. 93 if not user_ctx: 94 return None 95 96 return user_ctx 97 98 def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None: 99 """raw_header in the Format of `Bearer ....`""" 100 user_ctx = self.auth_user_lookup(raw_header) 101 if not user_ctx: 102 return None 103 user, ctx = user_ctx 104 if not user.is_active: 105 raise AuthenticationFailed("Token invalid/expired") 106 return user, ctx 107 108 def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None: 109 """raw_header in the Format of `Bearer ....`""" 110 from authentik.providers.oauth2.models import AccessToken 111 112 auth_credentials = validate_auth(raw_header) 113 if not auth_credentials: 114 return None 115 # first, check traditional tokens 116 key_token = Token.objects.filter( 117 key=auth_credentials, intent=TokenIntents.INTENT_API 118 ).first() 119 if key_token: 120 CTX_AUTH_VIA.set("api_token") 121 return key_token.user, key_token 122 # then try to auth via JWT 123 jwt_token = AccessToken.objects.filter( 124 token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API 125 ).first() 126 if jwt_token: 127 # Double-check scopes, since they are saved in a single string 128 # we want to check the parsed version too 129 if SCOPE_AUTHENTIK_API not in jwt_token.scope: 130 raise AuthenticationFailed("Token invalid/expired") 131 CTX_AUTH_VIA.set("jwt") 132 return jwt_token.user, jwt_token 133 # then try to auth via secret key (for embedded outpost/etc) 134 user_outpost = self.token_secret_key(auth_credentials) 135 if user_outpost: 136 CTX_AUTH_VIA.set("secret_key") 137 return user_outpost 138 # then try to auth via secret key (for embedded outpost/etc) 139 user = self.token_ipc(auth_credentials) 140 if user: 141 CTX_AUTH_VIA.set("ipc") 142 return user 143 raise AuthenticationFailed("Token invalid/expired") 144 145 def token_ipc(self, value: str) -> tuple[User, None] | None: 146 """Check if the token is the secret key 147 and return the service account for the managed outpost""" 148 if not ipc_key or not compare_digest(value, ipc_key): 149 return None 150 return IPCUser(), None 151 152 def token_secret_key(self, value: str) -> tuple[User, Outpost] | None: 153 """Check if the token is the secret key 154 and return the service account for the managed outpost""" 155 from authentik.outposts.apps import MANAGED_OUTPOST 156 157 if not compare_digest(value, settings.SECRET_KEY): 158 return None 159 outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST) 160 if not outposts: 161 return None 162 outpost = outposts.first() 163 return outpost.user, outpost 164 165 166class TokenSchema(OpenApiAuthenticationExtension): 167 """Auth schema""" 168 169 target_class = TokenAuthentication 170 name = "authentik" 171 172 def get_security_definition(self, auto_schema): 173 """Auth schema""" 174 return {"type": "http", "scheme": "bearer"}
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def
validate_auth(header: bytes, format='bearer') -> str | None:
31def validate_auth(header: bytes, format="bearer") -> str | None: 32 """Validate that the header is in a correct format, 33 returns type and credentials""" 34 auth_credentials = header.decode().strip() 35 if auth_credentials == "" or " " not in auth_credentials: 36 return None 37 auth_type, _, auth_credentials = auth_credentials.partition(" ") 38 if not compare_digest(auth_type.lower(), format): 39 LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower()) 40 return None 41 if auth_credentials == "": # nosec # noqa 42 raise AuthenticationFailed("Malformed header") 43 return auth_credentials
Validate that the header is in a correct format, returns type and credentials
class
VirtualUser(django.contrib.auth.models.AnonymousUser):
65class IPCUser(VirtualUser): 66 """'Virtual' user for IPC communication between authentik core and the authentik router""" 67 68 username = "authentik:system" 69 is_superuser = True 70 71 @property 72 def type(self): 73 return UserTypes.INTERNAL_SERVICE_ACCOUNT 74 75 def has_perm(self, perm, obj=None): 76 return True 77 78 def has_perms(self, perm_list, obj=None): 79 return True 80 81 def has_module_perms(self, module): 82 return True
'Virtual' user for IPC communication between authentik core and the authentik router
Inherited Members
class
TokenAuthentication(rest_framework.authentication.BaseAuthentication):
85class TokenAuthentication(BaseAuthentication): 86 """Token-based authentication using HTTP Bearer authentication""" 87 88 def authenticate(self, request: Request) -> tuple[User, Any] | None: 89 """Token-based authentication using HTTP Bearer authentication""" 90 auth = get_authorization_header(request) 91 92 user_ctx = self.bearer_auth(auth) 93 # None is only returned when the header isn't set. 94 if not user_ctx: 95 return None 96 97 return user_ctx 98 99 def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None: 100 """raw_header in the Format of `Bearer ....`""" 101 user_ctx = self.auth_user_lookup(raw_header) 102 if not user_ctx: 103 return None 104 user, ctx = user_ctx 105 if not user.is_active: 106 raise AuthenticationFailed("Token invalid/expired") 107 return user, ctx 108 109 def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None: 110 """raw_header in the Format of `Bearer ....`""" 111 from authentik.providers.oauth2.models import AccessToken 112 113 auth_credentials = validate_auth(raw_header) 114 if not auth_credentials: 115 return None 116 # first, check traditional tokens 117 key_token = Token.objects.filter( 118 key=auth_credentials, intent=TokenIntents.INTENT_API 119 ).first() 120 if key_token: 121 CTX_AUTH_VIA.set("api_token") 122 return key_token.user, key_token 123 # then try to auth via JWT 124 jwt_token = AccessToken.objects.filter( 125 token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API 126 ).first() 127 if jwt_token: 128 # Double-check scopes, since they are saved in a single string 129 # we want to check the parsed version too 130 if SCOPE_AUTHENTIK_API not in jwt_token.scope: 131 raise AuthenticationFailed("Token invalid/expired") 132 CTX_AUTH_VIA.set("jwt") 133 return jwt_token.user, jwt_token 134 # then try to auth via secret key (for embedded outpost/etc) 135 user_outpost = self.token_secret_key(auth_credentials) 136 if user_outpost: 137 CTX_AUTH_VIA.set("secret_key") 138 return user_outpost 139 # then try to auth via secret key (for embedded outpost/etc) 140 user = self.token_ipc(auth_credentials) 141 if user: 142 CTX_AUTH_VIA.set("ipc") 143 return user 144 raise AuthenticationFailed("Token invalid/expired") 145 146 def token_ipc(self, value: str) -> tuple[User, None] | None: 147 """Check if the token is the secret key 148 and return the service account for the managed outpost""" 149 if not ipc_key or not compare_digest(value, ipc_key): 150 return None 151 return IPCUser(), None 152 153 def token_secret_key(self, value: str) -> tuple[User, Outpost] | None: 154 """Check if the token is the secret key 155 and return the service account for the managed outpost""" 156 from authentik.outposts.apps import MANAGED_OUTPOST 157 158 if not compare_digest(value, settings.SECRET_KEY): 159 return None 160 outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST) 161 if not outposts: 162 return None 163 outpost = outposts.first() 164 return outpost.user, outpost
Token-based authentication using HTTP Bearer authentication
def
authenticate( self, request: rest_framework.request.Request) -> tuple[authentik.core.models.User, Any] | None:
88 def authenticate(self, request: Request) -> tuple[User, Any] | None: 89 """Token-based authentication using HTTP Bearer authentication""" 90 auth = get_authorization_header(request) 91 92 user_ctx = self.bearer_auth(auth) 93 # None is only returned when the header isn't set. 94 if not user_ctx: 95 return None 96 97 return user_ctx
Token-based authentication using HTTP Bearer authentication
99 def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None: 100 """raw_header in the Format of `Bearer ....`""" 101 user_ctx = self.auth_user_lookup(raw_header) 102 if not user_ctx: 103 return None 104 user, ctx = user_ctx 105 if not user.is_active: 106 raise AuthenticationFailed("Token invalid/expired") 107 return user, ctx
raw_header in the Format of Bearer ....
109 def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None: 110 """raw_header in the Format of `Bearer ....`""" 111 from authentik.providers.oauth2.models import AccessToken 112 113 auth_credentials = validate_auth(raw_header) 114 if not auth_credentials: 115 return None 116 # first, check traditional tokens 117 key_token = Token.objects.filter( 118 key=auth_credentials, intent=TokenIntents.INTENT_API 119 ).first() 120 if key_token: 121 CTX_AUTH_VIA.set("api_token") 122 return key_token.user, key_token 123 # then try to auth via JWT 124 jwt_token = AccessToken.objects.filter( 125 token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API 126 ).first() 127 if jwt_token: 128 # Double-check scopes, since they are saved in a single string 129 # we want to check the parsed version too 130 if SCOPE_AUTHENTIK_API not in jwt_token.scope: 131 raise AuthenticationFailed("Token invalid/expired") 132 CTX_AUTH_VIA.set("jwt") 133 return jwt_token.user, jwt_token 134 # then try to auth via secret key (for embedded outpost/etc) 135 user_outpost = self.token_secret_key(auth_credentials) 136 if user_outpost: 137 CTX_AUTH_VIA.set("secret_key") 138 return user_outpost 139 # then try to auth via secret key (for embedded outpost/etc) 140 user = self.token_ipc(auth_credentials) 141 if user: 142 CTX_AUTH_VIA.set("ipc") 143 return user 144 raise AuthenticationFailed("Token invalid/expired")
raw_header in the Format of Bearer ....
146 def token_ipc(self, value: str) -> tuple[User, None] | None: 147 """Check if the token is the secret key 148 and return the service account for the managed outpost""" 149 if not ipc_key or not compare_digest(value, ipc_key): 150 return None 151 return IPCUser(), None
Check if the token is the secret key and return the service account for the managed outpost
def
token_secret_key( self, value: str) -> tuple[authentik.core.models.User, authentik.outposts.models.Outpost] | None:
153 def token_secret_key(self, value: str) -> tuple[User, Outpost] | None: 154 """Check if the token is the secret key 155 and return the service account for the managed outpost""" 156 from authentik.outposts.apps import MANAGED_OUTPOST 157 158 if not compare_digest(value, settings.SECRET_KEY): 159 return None 160 outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST) 161 if not outposts: 162 return None 163 outpost = outposts.first() 164 return outpost.user, outpost
Check if the token is the secret key and return the service account for the managed outpost
class
TokenSchema(drf_spectacular.plumbing.OpenApiGeneratorExtension[ForwardRef('OpenApiAuthenticationExtension')]):
167class TokenSchema(OpenApiAuthenticationExtension): 168 """Auth schema""" 169 170 target_class = TokenAuthentication 171 name = "authentik" 172 173 def get_security_definition(self, auto_schema): 174 """Auth schema""" 175 return {"type": "http", "scheme": "bearer"}
Auth schema
target_class =
<class 'TokenAuthentication'>