authentik.api.authentication

API Authentication

  1"""API Authentication"""
  2
  3from hmac import compare_digest
  4from pathlib import Path
  5from tempfile import gettempdir
  6from typing import Any
  7
  8from django.conf import settings
  9from django.contrib.auth.models import AnonymousUser
 10from drf_spectacular.extensions import OpenApiAuthenticationExtension
 11from rest_framework.authentication import BaseAuthentication, get_authorization_header
 12from rest_framework.exceptions import AuthenticationFailed
 13from rest_framework.request import Request
 14from structlog.stdlib import get_logger
 15
 16from authentik.common.oauth.constants import SCOPE_AUTHENTIK_API
 17from authentik.core.middleware import CTX_AUTH_VIA
 18from authentik.core.models import Token, TokenIntents, User, UserTypes
 19from authentik.outposts.models import Outpost
 20
 21LOGGER = get_logger()
 22_tmp = Path(gettempdir())
 23try:
 24    with open(_tmp / "authentik-core-ipc.key") as _f:
 25        ipc_key = _f.read()
 26except OSError:
 27    ipc_key = None
 28
 29
 30def validate_auth(header: bytes, format="bearer") -> str | None:
 31    """Validate that the header is in a correct format,
 32    returns type and credentials"""
 33    auth_credentials = header.decode().strip()
 34    if auth_credentials == "" or " " not in auth_credentials:
 35        return None
 36    auth_type, _, auth_credentials = auth_credentials.partition(" ")
 37    if not compare_digest(auth_type.lower(), format):
 38        LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower())
 39        return None
 40    if auth_credentials == "":  # nosec # noqa
 41        raise AuthenticationFailed("Malformed header")
 42    return auth_credentials
 43
 44
 45class VirtualUser(AnonymousUser):
 46    is_active = True
 47
 48    @property
 49    def type(self):
 50        return UserTypes.INTERNAL_SERVICE_ACCOUNT
 51
 52    @property
 53    def is_anonymous(self):
 54        return False
 55
 56    @property
 57    def is_authenticated(self):
 58        return True
 59
 60    def all_roles(self):
 61        return []
 62
 63
 64class IPCUser(VirtualUser):
 65    """'Virtual' user for IPC communication between authentik core and the authentik router"""
 66
 67    username = "authentik:system"
 68    is_superuser = True
 69
 70    @property
 71    def type(self):
 72        return UserTypes.INTERNAL_SERVICE_ACCOUNT
 73
 74    def has_perm(self, perm, obj=None):
 75        return True
 76
 77    def has_perms(self, perm_list, obj=None):
 78        return True
 79
 80    def has_module_perms(self, module):
 81        return True
 82
 83
 84class TokenAuthentication(BaseAuthentication):
 85    """Token-based authentication using HTTP Bearer authentication"""
 86
 87    def authenticate(self, request: Request) -> tuple[User, Any] | None:
 88        """Token-based authentication using HTTP Bearer authentication"""
 89        auth = get_authorization_header(request)
 90
 91        user_ctx = self.bearer_auth(auth)
 92        # None is only returned when the header isn't set.
 93        if not user_ctx:
 94            return None
 95
 96        return user_ctx
 97
 98    def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None:
 99        """raw_header in the Format of `Bearer ....`"""
100        user_ctx = self.auth_user_lookup(raw_header)
101        if not user_ctx:
102            return None
103        user, ctx = user_ctx
104        if not user.is_active:
105            raise AuthenticationFailed("Token invalid/expired")
106        return user, ctx
107
108    def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None:
109        """raw_header in the Format of `Bearer ....`"""
110        from authentik.providers.oauth2.models import AccessToken
111
112        auth_credentials = validate_auth(raw_header)
113        if not auth_credentials:
114            return None
115        # first, check traditional tokens
116        key_token = Token.objects.filter(
117            key=auth_credentials, intent=TokenIntents.INTENT_API
118        ).first()
119        if key_token:
120            CTX_AUTH_VIA.set("api_token")
121            return key_token.user, key_token
122        # then try to auth via JWT
123        jwt_token = AccessToken.objects.filter(
124            token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API
125        ).first()
126        if jwt_token:
127            # Double-check scopes, since they are saved in a single string
128            # we want to check the parsed version too
129            if SCOPE_AUTHENTIK_API not in jwt_token.scope:
130                raise AuthenticationFailed("Token invalid/expired")
131            CTX_AUTH_VIA.set("jwt")
132            return jwt_token.user, jwt_token
133        # then try to auth via secret key (for embedded outpost/etc)
134        user_outpost = self.token_secret_key(auth_credentials)
135        if user_outpost:
136            CTX_AUTH_VIA.set("secret_key")
137            return user_outpost
138        # then try to auth via secret key (for embedded outpost/etc)
139        user = self.token_ipc(auth_credentials)
140        if user:
141            CTX_AUTH_VIA.set("ipc")
142            return user
143        raise AuthenticationFailed("Token invalid/expired")
144
145    def token_ipc(self, value: str) -> tuple[User, None] | None:
146        """Check if the token is the secret key
147        and return the service account for the managed outpost"""
148        if not ipc_key or not compare_digest(value, ipc_key):
149            return None
150        return IPCUser(), None
151
152    def token_secret_key(self, value: str) -> tuple[User, Outpost] | None:
153        """Check if the token is the secret key
154        and return the service account for the managed outpost"""
155        from authentik.outposts.apps import MANAGED_OUTPOST
156
157        if not compare_digest(value, settings.SECRET_KEY):
158            return None
159        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
160        if not outposts:
161            return None
162        outpost = outposts.first()
163        return outpost.user, outpost
164
165
166class TokenSchema(OpenApiAuthenticationExtension):
167    """Auth schema"""
168
169    target_class = TokenAuthentication
170    name = "authentik"
171
172    def get_security_definition(self, auto_schema):
173        """Auth schema"""
174        return {"type": "http", "scheme": "bearer"}
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def validate_auth(header: bytes, format='bearer') -> str | None:
31def validate_auth(header: bytes, format="bearer") -> str | None:
32    """Validate that the header is in a correct format,
33    returns type and credentials"""
34    auth_credentials = header.decode().strip()
35    if auth_credentials == "" or " " not in auth_credentials:
36        return None
37    auth_type, _, auth_credentials = auth_credentials.partition(" ")
38    if not compare_digest(auth_type.lower(), format):
39        LOGGER.debug("Unsupported authentication type, denying", type=auth_type.lower())
40        return None
41    if auth_credentials == "":  # nosec # noqa
42        raise AuthenticationFailed("Malformed header")
43    return auth_credentials

Validate that the header is in a correct format, returns type and credentials

class VirtualUser(django.contrib.auth.models.AnonymousUser):
46class VirtualUser(AnonymousUser):
47    is_active = True
48
49    @property
50    def type(self):
51        return UserTypes.INTERNAL_SERVICE_ACCOUNT
52
53    @property
54    def is_anonymous(self):
55        return False
56
57    @property
58    def is_authenticated(self):
59        return True
60
61    def all_roles(self):
62        return []
is_active = True
type
49    @property
50    def type(self):
51        return UserTypes.INTERNAL_SERVICE_ACCOUNT
is_anonymous
53    @property
54    def is_anonymous(self):
55        return False
is_authenticated
57    @property
58    def is_authenticated(self):
59        return True
def all_roles(self):
61    def all_roles(self):
62        return []
class IPCUser(VirtualUser):
65class IPCUser(VirtualUser):
66    """'Virtual' user for IPC communication between authentik core and the authentik router"""
67
68    username = "authentik:system"
69    is_superuser = True
70
71    @property
72    def type(self):
73        return UserTypes.INTERNAL_SERVICE_ACCOUNT
74
75    def has_perm(self, perm, obj=None):
76        return True
77
78    def has_perms(self, perm_list, obj=None):
79        return True
80
81    def has_module_perms(self, module):
82        return True

'Virtual' user for IPC communication between authentik core and the authentik router

username = 'authentik:system'
is_superuser = True
type
71    @property
72    def type(self):
73        return UserTypes.INTERNAL_SERVICE_ACCOUNT
def has_perm(self, perm, obj=None):
75    def has_perm(self, perm, obj=None):
76        return True
def has_perms(self, perm_list, obj=None):
78    def has_perms(self, perm_list, obj=None):
79        return True
def has_module_perms(self, module):
81    def has_module_perms(self, module):
82        return True
class TokenAuthentication(rest_framework.authentication.BaseAuthentication):
 85class TokenAuthentication(BaseAuthentication):
 86    """Token-based authentication using HTTP Bearer authentication"""
 87
 88    def authenticate(self, request: Request) -> tuple[User, Any] | None:
 89        """Token-based authentication using HTTP Bearer authentication"""
 90        auth = get_authorization_header(request)
 91
 92        user_ctx = self.bearer_auth(auth)
 93        # None is only returned when the header isn't set.
 94        if not user_ctx:
 95            return None
 96
 97        return user_ctx
 98
 99    def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None:
100        """raw_header in the Format of `Bearer ....`"""
101        user_ctx = self.auth_user_lookup(raw_header)
102        if not user_ctx:
103            return None
104        user, ctx = user_ctx
105        if not user.is_active:
106            raise AuthenticationFailed("Token invalid/expired")
107        return user, ctx
108
109    def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None:
110        """raw_header in the Format of `Bearer ....`"""
111        from authentik.providers.oauth2.models import AccessToken
112
113        auth_credentials = validate_auth(raw_header)
114        if not auth_credentials:
115            return None
116        # first, check traditional tokens
117        key_token = Token.objects.filter(
118            key=auth_credentials, intent=TokenIntents.INTENT_API
119        ).first()
120        if key_token:
121            CTX_AUTH_VIA.set("api_token")
122            return key_token.user, key_token
123        # then try to auth via JWT
124        jwt_token = AccessToken.objects.filter(
125            token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API
126        ).first()
127        if jwt_token:
128            # Double-check scopes, since they are saved in a single string
129            # we want to check the parsed version too
130            if SCOPE_AUTHENTIK_API not in jwt_token.scope:
131                raise AuthenticationFailed("Token invalid/expired")
132            CTX_AUTH_VIA.set("jwt")
133            return jwt_token.user, jwt_token
134        # then try to auth via secret key (for embedded outpost/etc)
135        user_outpost = self.token_secret_key(auth_credentials)
136        if user_outpost:
137            CTX_AUTH_VIA.set("secret_key")
138            return user_outpost
139        # then try to auth via secret key (for embedded outpost/etc)
140        user = self.token_ipc(auth_credentials)
141        if user:
142            CTX_AUTH_VIA.set("ipc")
143            return user
144        raise AuthenticationFailed("Token invalid/expired")
145
146    def token_ipc(self, value: str) -> tuple[User, None] | None:
147        """Check if the token is the secret key
148        and return the service account for the managed outpost"""
149        if not ipc_key or not compare_digest(value, ipc_key):
150            return None
151        return IPCUser(), None
152
153    def token_secret_key(self, value: str) -> tuple[User, Outpost] | None:
154        """Check if the token is the secret key
155        and return the service account for the managed outpost"""
156        from authentik.outposts.apps import MANAGED_OUTPOST
157
158        if not compare_digest(value, settings.SECRET_KEY):
159            return None
160        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
161        if not outposts:
162            return None
163        outpost = outposts.first()
164        return outpost.user, outpost

Token-based authentication using HTTP Bearer authentication

def authenticate( self, request: rest_framework.request.Request) -> tuple[authentik.core.models.User, Any] | None:
88    def authenticate(self, request: Request) -> tuple[User, Any] | None:
89        """Token-based authentication using HTTP Bearer authentication"""
90        auth = get_authorization_header(request)
91
92        user_ctx = self.bearer_auth(auth)
93        # None is only returned when the header isn't set.
94        if not user_ctx:
95            return None
96
97        return user_ctx

Token-based authentication using HTTP Bearer authentication

def bearer_auth(self, raw_header: bytes) -> tuple[authentik.core.models.User, Any] | None:
 99    def bearer_auth(self, raw_header: bytes) -> tuple[User, Any] | None:
100        """raw_header in the Format of `Bearer ....`"""
101        user_ctx = self.auth_user_lookup(raw_header)
102        if not user_ctx:
103            return None
104        user, ctx = user_ctx
105        if not user.is_active:
106            raise AuthenticationFailed("Token invalid/expired")
107        return user, ctx

raw_header in the Format of Bearer ....

def auth_user_lookup(self, raw_header: bytes) -> tuple[authentik.core.models.User, Any] | None:
109    def auth_user_lookup(self, raw_header: bytes) -> tuple[User, Any] | None:
110        """raw_header in the Format of `Bearer ....`"""
111        from authentik.providers.oauth2.models import AccessToken
112
113        auth_credentials = validate_auth(raw_header)
114        if not auth_credentials:
115            return None
116        # first, check traditional tokens
117        key_token = Token.objects.filter(
118            key=auth_credentials, intent=TokenIntents.INTENT_API
119        ).first()
120        if key_token:
121            CTX_AUTH_VIA.set("api_token")
122            return key_token.user, key_token
123        # then try to auth via JWT
124        jwt_token = AccessToken.objects.filter(
125            token=auth_credentials, _scope__icontains=SCOPE_AUTHENTIK_API
126        ).first()
127        if jwt_token:
128            # Double-check scopes, since they are saved in a single string
129            # we want to check the parsed version too
130            if SCOPE_AUTHENTIK_API not in jwt_token.scope:
131                raise AuthenticationFailed("Token invalid/expired")
132            CTX_AUTH_VIA.set("jwt")
133            return jwt_token.user, jwt_token
134        # then try to auth via secret key (for embedded outpost/etc)
135        user_outpost = self.token_secret_key(auth_credentials)
136        if user_outpost:
137            CTX_AUTH_VIA.set("secret_key")
138            return user_outpost
139        # then try to auth via secret key (for embedded outpost/etc)
140        user = self.token_ipc(auth_credentials)
141        if user:
142            CTX_AUTH_VIA.set("ipc")
143            return user
144        raise AuthenticationFailed("Token invalid/expired")

raw_header in the Format of Bearer ....

def token_ipc(self, value: str) -> tuple[authentik.core.models.User, None] | None:
146    def token_ipc(self, value: str) -> tuple[User, None] | None:
147        """Check if the token is the secret key
148        and return the service account for the managed outpost"""
149        if not ipc_key or not compare_digest(value, ipc_key):
150            return None
151        return IPCUser(), None

Check if the token is the secret key and return the service account for the managed outpost

def token_secret_key( self, value: str) -> tuple[authentik.core.models.User, authentik.outposts.models.Outpost] | None:
153    def token_secret_key(self, value: str) -> tuple[User, Outpost] | None:
154        """Check if the token is the secret key
155        and return the service account for the managed outpost"""
156        from authentik.outposts.apps import MANAGED_OUTPOST
157
158        if not compare_digest(value, settings.SECRET_KEY):
159            return None
160        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
161        if not outposts:
162            return None
163        outpost = outposts.first()
164        return outpost.user, outpost

Check if the token is the secret key and return the service account for the managed outpost

class TokenSchema(drf_spectacular.plumbing.OpenApiGeneratorExtension[ForwardRef('OpenApiAuthenticationExtension')]):
167class TokenSchema(OpenApiAuthenticationExtension):
168    """Auth schema"""
169
170    target_class = TokenAuthentication
171    name = "authentik"
172
173    def get_security_definition(self, auto_schema):
174        """Auth schema"""
175        return {"type": "http", "scheme": "bearer"}

Auth schema

target_class = <class 'TokenAuthentication'>
name = $POSTGRES_DB
def get_security_definition(self, auto_schema):
173    def get_security_definition(self, auto_schema):
174        """Auth schema"""
175        return {"type": "http", "scheme": "bearer"}

Auth schema