authentik.api.authentication

API Authentication

  1"""API Authentication"""
  2
  3from hmac import compare_digest
  4from pathlib import Path
  5from tempfile import gettempdir
  6from typing import Any
  7
  8from django.conf import settings
  9from django.contrib.auth.models import AnonymousUser
 10from drf_spectacular.extensions import OpenApiAuthenticationExtension
 11from rest_framework.authentication import BaseAuthentication, get_authorization_header
 12from rest_framework.exceptions import AuthenticationFailed
 13from rest_framework.request import Request
 14from structlog.stdlib import get_logger
 15
 16from authentik.common.oauth.constants import SCOPE_AUTHENTIK_API
 17from authentik.core.middleware import CTX_AUTH_VIA
 18from authentik.core.models import Token, TokenIntents, User, UserTypes
 19from authentik.outposts.models import Outpost
 20
 21LOGGER = get_logger()
 22_tmp = Path(gettempdir())
 23try:
 24    with open(_tmp / "authentik-core-ipc.key") as _f:
 25        ipc_key = _f.read()
 26except OSError:
 27    ipc_key = None
 28
 29
 30def validate_auth(header: bytes, format="bearer") -> str | None:
 31    """Validate that the header is in a correct format,
 32    returns type and credentials"""
 33    auth_credentials = header.decode().strip()
 34    if auth_credentials == "" or " " not in auth_credentials:
 35        return None
 36    auth_type, _, auth_credentials = auth_credentials.partition(" ")
 37    if not compare_digest(auth_type.lower(), format):
 38        LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower())
 39        return None
 40    if auth_credentials == "":  # nosec # noqa
 41        raise AuthenticationFailed("Malformed header")
 42    return auth_credentials
 43
 44
 45class IPCUser(AnonymousUser):
 46    """'Virtual' user for IPC communication between authentik core and the authentik router"""
 47
 48    username = "authentik:system"
 49    is_active = True
 50    is_superuser = True
 51
 52    @property
 53    def type(self):
 54        return UserTypes.INTERNAL_SERVICE_ACCOUNT
 55
 56    def has_perm(self, perm, obj=None):
 57        return True
 58
 59    def has_perms(self, perm_list, obj=None):
 60        return True
 61
 62    def has_module_perms(self, module):
 63        return True
 64
 65    @property
 66    def is_anonymous(self):
 67        return False
 68
 69    @property
 70    def is_authenticated(self):
 71        return True
 72
 73    def all_roles(self):
 74        return []
 75
 76
 77class TokenAuthentication(BaseAuthentication):
 78    """Token-based authentication using HTTP Bearer authentication"""
 79
 80    def authenticate(self, request: Request) -> tuple[User, Any] | None:
 81        """Token-based authentication using HTTP Bearer authentication"""
 82        auth = get_authorization_header(request)
 83
 84        user_ctx = self.bearer_auth(auth)
 85        # None is only returned when the header isn't set.
 86        if not user_ctx:
 87            return None
 88
 89        return user_ctx
 90
 91    def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None:
 92        """raw_header in the Format of `Bearer ....`"""
 93        user_ctx = self.auth_user_lookup(raw_header)
 94        if not user_ctx:
 95            return None
 96        user, ctx = user_ctx
 97        if not user.is_active:
 98            raise AuthenticationFailed("Token invalid/expired")
 99        return user, ctx
100
101    def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None:
102        """raw_header in the Format of `Bearer ....`"""
103        from authentik.providers.oauth2.models import AccessToken
104
105        auth_credentials = validate_auth(raw_header)
106        if not auth_credentials:
107            return None
108        # first, check traditional tokens
109        key_token = Token.objects.filter(
110            key=auth_credentials, intent=TokenIntents.INTENT_API
111        ).first()
112        if key_token:
113            CTX_AUTH_VIA.set("api_token")
114            return key_token.user, key_token
115        # then try to auth via JWT
116        jwt_token = AccessToken.objects.filter(
117            token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API
118        ).first()
119        if jwt_token:
120            # Double-check scopes, since they are saved in a single string
121            # we want to check the parsed version too
122            if SCOPE_AUTHENTIK_API not in jwt_token.scope:
123                raise AuthenticationFailed("Token invalid/expired")
124            CTX_AUTH_VIA.set("jwt")
125            return jwt_token.user, jwt_token
126        # then try to auth via secret key (for embedded outpost/etc)
127        user_outpost = self.token_secret_key(auth_credentials)
128        if user_outpost:
129            CTX_AUTH_VIA.set("secret_key")
130            return user_outpost
131        # then try to auth via secret key (for embedded outpost/etc)
132        user = self.token_ipc(auth_credentials)
133        if user:
134            CTX_AUTH_VIA.set("ipc")
135            return user
136        raise AuthenticationFailed("Token invalid/expired")
137
138    def token_ipc(self, value: str) -> tuple[User, None] | None:
139        """Check if the token is the secret key
140        and return the service account for the managed outpost"""
141        if not ipc_key or not compare_digest(value, ipc_key):
142            return None
143        return IPCUser(), None
144
145    def token_secret_key(self, value: str) -> tuple[User, Outpost] | None:
146        """Check if the token is the secret key
147        and return the service account for the managed outpost"""
148        from authentik.outposts.apps import MANAGED_OUTPOST
149
150        if not compare_digest(value, settings.SECRET_KEY):
151            return None
152        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
153        if not outposts:
154            return None
155        outpost = outposts.first()
156        return outpost.user, outpost
157
158
159class TokenSchema(OpenApiAuthenticationExtension):
160    """Auth schema"""
161
162    target_class = TokenAuthentication
163    name = "authentik"
164
165    def get_security_definition(self, auto_schema):
166        """Auth schema"""
167        return {"type": "http", "scheme": "bearer"}
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def validate_auth(header: bytes, format='bearer') -> str | None:
31def validate_auth(header: bytes, format="bearer") -> str | None:
32    """Validate that the header is in a correct format,
33    returns type and credentials"""
34    auth_credentials = header.decode().strip()
35    if auth_credentials == "" or " " not in auth_credentials:
36        return None
37    auth_type, _, auth_credentials = auth_credentials.partition(" ")
38    if not compare_digest(auth_type.lower(), format):
39        LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower())
40        return None
41    if auth_credentials == "":  # nosec # noqa
42        raise AuthenticationFailed("Malformed header")
43    return auth_credentials

Validate that the header is in a correct format, returns type and credentials

class IPCUser(django.contrib.auth.models.AnonymousUser):
46class IPCUser(AnonymousUser):
47    """'Virtual' user for IPC communication between authentik core and the authentik router"""
48
49    username = "authentik:system"
50    is_active = True
51    is_superuser = True
52
53    @property
54    def type(self):
55        return UserTypes.INTERNAL_SERVICE_ACCOUNT
56
57    def has_perm(self, perm, obj=None):
58        return True
59
60    def has_perms(self, perm_list, obj=None):
61        return True
62
63    def has_module_perms(self, module):
64        return True
65
66    @property
67    def is_anonymous(self):
68        return False
69
70    @property
71    def is_authenticated(self):
72        return True
73
74    def all_roles(self):
75        return []

'Virtual' user for IPC communication between authentik core and the authentik router

username = 'authentik:system'
is_active = True
is_superuser = True
type
53    @property
54    def type(self):
55        return UserTypes.INTERNAL_SERVICE_ACCOUNT
def has_perm(self, perm, obj=None):
57    def has_perm(self, perm, obj=None):
58        return True
def has_perms(self, perm_list, obj=None):
60    def has_perms(self, perm_list, obj=None):
61        return True
def has_module_perms(self, module):
63    def has_module_perms(self, module):
64        return True
is_anonymous
66    @property
67    def is_anonymous(self):
68        return False
is_authenticated
70    @property
71    def is_authenticated(self):
72        return True
def all_roles(self):
74    def all_roles(self):
75        return []
class TokenAuthentication(rest_framework.authentication.BaseAuthentication):
 78class TokenAuthentication(BaseAuthentication):
 79    """Token-based authentication using HTTP Bearer authentication"""
 80
 81    def authenticate(self, request: Request) -> tuple[User, Any] | None:
 82        """Token-based authentication using HTTP Bearer authentication"""
 83        auth = get_authorization_header(request)
 84
 85        user_ctx = self.bearer_auth(auth)
 86        # None is only returned when the header isn't set.
 87        if not user_ctx:
 88            return None
 89
 90        return user_ctx
 91
 92    def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None:
 93        """raw_header in the Format of `Bearer ....`"""
 94        user_ctx = self.auth_user_lookup(raw_header)
 95        if not user_ctx:
 96            return None
 97        user, ctx = user_ctx
 98        if not user.is_active:
 99            raise AuthenticationFailed("Token invalid/expired")
100        return user, ctx
101
102    def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None:
103        """raw_header in the Format of `Bearer ....`"""
104        from authentik.providers.oauth2.models import AccessToken
105
106        auth_credentials = validate_auth(raw_header)
107        if not auth_credentials:
108            return None
109        # first, check traditional tokens
110        key_token = Token.objects.filter(
111            key=auth_credentials, intent=TokenIntents.INTENT_API
112        ).first()
113        if key_token:
114            CTX_AUTH_VIA.set("api_token")
115            return key_token.user, key_token
116        # then try to auth via JWT
117        jwt_token = AccessToken.objects.filter(
118            token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API
119        ).first()
120        if jwt_token:
121            # Double-check scopes, since they are saved in a single string
122            # we want to check the parsed version too
123            if SCOPE_AUTHENTIK_API not in jwt_token.scope:
124                raise AuthenticationFailed("Token invalid/expired")
125            CTX_AUTH_VIA.set("jwt")
126            return jwt_token.user, jwt_token
127        # then try to auth via secret key (for embedded outpost/etc)
128        user_outpost = self.token_secret_key(auth_credentials)
129        if user_outpost:
130            CTX_AUTH_VIA.set("secret_key")
131            return user_outpost
132        # then try to auth via secret key (for embedded outpost/etc)
133        user = self.token_ipc(auth_credentials)
134        if user:
135            CTX_AUTH_VIA.set("ipc")
136            return user
137        raise AuthenticationFailed("Token invalid/expired")
138
139    def token_ipc(self, value: str) -> tuple[User, None] | None:
140        """Check if the token is the secret key
141        and return the service account for the managed outpost"""
142        if not ipc_key or not compare_digest(value, ipc_key):
143            return None
144        return IPCUser(), None
145
146    def token_secret_key(self, value: str) -> tuple[User, Outpost] | None:
147        """Check if the token is the secret key
148        and return the service account for the managed outpost"""
149        from authentik.outposts.apps import MANAGED_OUTPOST
150
151        if not compare_digest(value, settings.SECRET_KEY):
152            return None
153        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
154        if not outposts:
155            return None
156        outpost = outposts.first()
157        return outpost.user, outpost

Token-based authentication using HTTP Bearer authentication

def authenticate( self, request: rest_framework.request.Request) -> tuple[authentik.core.models.User, Any] | None:
81    def authenticate(self, request: Request) -> tuple[User, Any] | None:
82        """Token-based authentication using HTTP Bearer authentication"""
83        auth = get_authorization_header(request)
84
85        user_ctx = self.bearer_auth(auth)
86        # None is only returned when the header isn't set.
87        if not user_ctx:
88            return None
89
90        return user_ctx

Token-based authentication using HTTP Bearer authentication

def bearer_auth(self, raw_header: bytes) -> tuple[authentik.core.models.User, Any] | None:
 92    def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None:
 93        """raw_header in the Format of `Bearer ....`"""
 94        user_ctx = self.auth_user_lookup(raw_header)
 95        if not user_ctx:
 96            return None
 97        user, ctx = user_ctx
 98        if not user.is_active:
 99            raise AuthenticationFailed("Token invalid/expired")
100        return user, ctx

raw_header in the Format of Bearer ....

def auth_user_lookup(self, raw_header: bytes) -> tuple[authentik.core.models.User, Any] | None:
102    def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None:
103        """raw_header in the Format of `Bearer ....`"""
104        from authentik.providers.oauth2.models import AccessToken
105
106        auth_credentials = validate_auth(raw_header)
107        if not auth_credentials:
108            return None
109        # first, check traditional tokens
110        key_token = Token.objects.filter(
111            key=auth_credentials, intent=TokenIntents.INTENT_API
112        ).first()
113        if key_token:
114            CTX_AUTH_VIA.set("api_token")
115            return key_token.user, key_token
116        # then try to auth via JWT
117        jwt_token = AccessToken.objects.filter(
118            token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API
119        ).first()
120        if jwt_token:
121            # Double-check scopes, since they are saved in a single string
122            # we want to check the parsed version too
123            if SCOPE_AUTHENTIK_API not in jwt_token.scope:
124                raise AuthenticationFailed("Token invalid/expired")
125            CTX_AUTH_VIA.set("jwt")
126            return jwt_token.user, jwt_token
127        # then try to auth via secret key (for embedded outpost/etc)
128        user_outpost = self.token_secret_key(auth_credentials)
129        if user_outpost:
130            CTX_AUTH_VIA.set("secret_key")
131            return user_outpost
132        # then try to auth via secret key (for embedded outpost/etc)
133        user = self.token_ipc(auth_credentials)
134        if user:
135            CTX_AUTH_VIA.set("ipc")
136            return user
137        raise AuthenticationFailed("Token invalid/expired")

raw_header in the Format of Bearer ....

def token_ipc(self, value: str) -> tuple[authentik.core.models.User, None] | None:
139    def token_ipc(self, value: str) -> tuple[User, None] | None:
140        """Check if the token is the secret key
141        and return the service account for the managed outpost"""
142        if not ipc_key or not compare_digest(value, ipc_key):
143            return None
144        return IPCUser(), None

Check if the token is the secret key and return the service account for the managed outpost

def token_secret_key( self, value: str) -> tuple[authentik.core.models.User, authentik.outposts.models.Outpost] | None:
146    def token_secret_key(self, value: str) -> tuple[User, Outpost] | None:
147        """Check if the token is the secret key
148        and return the service account for the managed outpost"""
149        from authentik.outposts.apps import MANAGED_OUTPOST
150
151        if not compare_digest(value, settings.SECRET_KEY):
152            return None
153        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
154        if not outposts:
155            return None
156        outpost = outposts.first()
157        return outpost.user, outpost

Check if the token is the secret key and return the service account for the managed outpost

class TokenSchema(drf_spectacular.plumbing.OpenApiGeneratorExtension[ForwardRef('OpenApiAuthenticationExtension')]):
160class TokenSchema(OpenApiAuthenticationExtension):
161    """Auth schema"""
162
163    target_class = TokenAuthentication
164    name = "authentik"
165
166    def get_security_definition(self, auto_schema):
167        """Auth schema"""
168        return {"type": "http", "scheme": "bearer"}

Auth schema

target_class = <class 'TokenAuthentication'>
name = $POSTGRES_DB
def get_security_definition(self, auto_schema):
166    def get_security_definition(self, auto_schema):
167        """Auth schema"""
168        return {"type": "http", "scheme": "bearer"}

Auth schema