authentik.sources.scim.views.v2.auth

SCIM Token auth

 1"""SCIM Token auth"""
 2
 3from base64 import b64decode
 4from typing import Any
 5
 6from django.conf import settings
 7from rest_framework.authentication import BaseAuthentication, get_authorization_header
 8from rest_framework.request import Request
 9from rest_framework.views import APIView
10
11from authentik.core.middleware import CTX_AUTH_VIA
12from authentik.core.models import Token, TokenIntents, User
13from authentik.sources.scim.models import SCIMSource
14
15
16class SCIMTokenAuth(BaseAuthentication):
17    """SCIM Token auth"""
18
19    def __init__(self, view: APIView) -> None:
20        super().__init__()
21        self.view = view
22
23    def legacy(self, key: str, source_slug: str) -> Token | None:  # pragma: no cover
24        """Legacy HTTP-Basic auth for testing"""
25        if not settings.TEST and not settings.DEBUG:
26            return None
27        _username, _, password = b64decode(key.encode()).decode().partition(":")
28        token = self.check_token(password, source_slug)
29        if token:
30            CTX_AUTH_VIA.set("scim_basic")
31            return (token.user, token)
32        return None
33
34    def check_token(self, key: str, source_slug: str) -> Token | None:
35        """Check that a token exists, is not expired, and is assigned to the correct source"""
36        token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first()
37        if not token:
38            return None
39        source: SCIMSource = token.scimsource_set.first()
40        if not source:
41            return None
42        if source.slug != source_slug:
43            return None
44        self.view.source = source
45        return token
46
47    def authenticate(self, request: Request) -> tuple[User, Any] | None:
48        kwargs = request._request.resolver_match.kwargs
49        source_slug = kwargs.get("source_slug", None)
50        auth = get_authorization_header(request).decode()
51        auth_type, _, key = auth.partition(" ")
52        if auth_type != "Bearer":
53            return self.legacy(key, source_slug)
54        token = self.check_token(key, source_slug)
55        if not token:
56            return None
57        CTX_AUTH_VIA.set("scim_token")
58        return (token.user, token)
class SCIMTokenAuth(rest_framework.authentication.BaseAuthentication):
17class SCIMTokenAuth(BaseAuthentication):
18    """SCIM Token auth"""
19
20    def __init__(self, view: APIView) -> None:
21        super().__init__()
22        self.view = view
23
24    def legacy(self, key: str, source_slug: str) -> Token | None:  # pragma: no cover
25        """Legacy HTTP-Basic auth for testing"""
26        if not settings.TEST and not settings.DEBUG:
27            return None
28        _username, _, password = b64decode(key.encode()).decode().partition(":")
29        token = self.check_token(password, source_slug)
30        if token:
31            CTX_AUTH_VIA.set("scim_basic")
32            return (token.user, token)
33        return None
34
35    def check_token(self, key: str, source_slug: str) -> Token | None:
36        """Check that a token exists, is not expired, and is assigned to the correct source"""
37        token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first()
38        if not token:
39            return None
40        source: SCIMSource = token.scimsource_set.first()
41        if not source:
42            return None
43        if source.slug != source_slug:
44            return None
45        self.view.source = source
46        return token
47
48    def authenticate(self, request: Request) -> tuple[User, Any] | None:
49        kwargs = request._request.resolver_match.kwargs
50        source_slug = kwargs.get("source_slug", None)
51        auth = get_authorization_header(request).decode()
52        auth_type, _, key = auth.partition(" ")
53        if auth_type != "Bearer":
54            return self.legacy(key, source_slug)
55        token = self.check_token(key, source_slug)
56        if not token:
57            return None
58        CTX_AUTH_VIA.set("scim_token")
59        return (token.user, token)

SCIM Token auth

SCIMTokenAuth(view: rest_framework.views.APIView)
20    def __init__(self, view: APIView) -> None:
21        super().__init__()
22        self.view = view
view
def legacy(self, key: str, source_slug: str) -> authentik.core.models.Token | None:
24    def legacy(self, key: str, source_slug: str) -> Token | None:  # pragma: no cover
25        """Legacy HTTP-Basic auth for testing"""
26        if not settings.TEST and not settings.DEBUG:
27            return None
28        _username, _, password = b64decode(key.encode()).decode().partition(":")
29        token = self.check_token(password, source_slug)
30        if token:
31            CTX_AUTH_VIA.set("scim_basic")
32            return (token.user, token)
33        return None

Legacy HTTP-Basic auth for testing

def check_token(self, key: str, source_slug: str) -> authentik.core.models.Token | None:
35    def check_token(self, key: str, source_slug: str) -> Token | None:
36        """Check that a token exists, is not expired, and is assigned to the correct source"""
37        token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first()
38        if not token:
39            return None
40        source: SCIMSource = token.scimsource_set.first()
41        if not source:
42            return None
43        if source.slug != source_slug:
44            return None
45        self.view.source = source
46        return token

Check that a token exists, is not expired, and is assigned to the correct source

def authenticate( self, request: rest_framework.request.Request) -> tuple[authentik.core.models.User, Any] | None:
48    def authenticate(self, request: Request) -> tuple[User, Any] | None:
49        kwargs = request._request.resolver_match.kwargs
50        source_slug = kwargs.get("source_slug", None)
51        auth = get_authorization_header(request).decode()
52        auth_type, _, key = auth.partition(" ")
53        if auth_type != "Bearer":
54            return self.legacy(key, source_slug)
55        token = self.check_token(key, source_slug)
56        if not token:
57            return None
58        CTX_AUTH_VIA.set("scim_token")
59        return (token.user, token)

Authenticate the request and return a two-tuple of (user, token).