authentik.sources.scim.views.v2.auth
SCIM Token auth
1"""SCIM Token auth""" 2 3from base64 import b64decode 4from typing import Any 5 6from django.conf import settings 7from rest_framework.authentication import BaseAuthentication, get_authorization_header 8from rest_framework.request import Request 9from rest_framework.views import APIView 10 11from authentik.core.middleware import CTX_AUTH_VIA 12from authentik.core.models import Token, TokenIntents, User 13from authentik.sources.scim.models import SCIMSource 14 15 16class SCIMTokenAuth(BaseAuthentication): 17 """SCIM Token auth""" 18 19 def __init__(self, view: APIView) -> None: 20 super().__init__() 21 self.view = view 22 23 def legacy(self, key: str, source_slug: str) -> Token | None: # pragma: no cover 24 """Legacy HTTP-Basic auth for testing""" 25 if not settings.TEST and not settings.DEBUG: 26 return None 27 _username, _, password = b64decode(key.encode()).decode().partition(":") 28 token = self.check_token(password, source_slug) 29 if token: 30 CTX_AUTH_VIA.set("scim_basic") 31 return (token.user, token) 32 return None 33 34 def check_token(self, key: str, source_slug: str) -> Token | None: 35 """Check that a token exists, is not expired, and is assigned to the correct source""" 36 token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first() 37 if not token: 38 return None 39 source: SCIMSource = token.scimsource_set.first() 40 if not source: 41 return None 42 if source.slug != source_slug: 43 return None 44 self.view.source = source 45 return token 46 47 def authenticate(self, request: Request) -> tuple[User, Any] | None: 48 kwargs = request._request.resolver_match.kwargs 49 source_slug = kwargs.get("source_slug", None) 50 auth = get_authorization_header(request).decode() 51 auth_type, _, key = auth.partition(" ") 52 if auth_type != "Bearer": 53 return self.legacy(key, source_slug) 54 token = self.check_token(key, source_slug) 55 if not token: 56 return None 57 CTX_AUTH_VIA.set("scim_token") 58 return (token.user, token)
class
SCIMTokenAuth(rest_framework.authentication.BaseAuthentication):
17class SCIMTokenAuth(BaseAuthentication): 18 """SCIM Token auth""" 19 20 def __init__(self, view: APIView) -> None: 21 super().__init__() 22 self.view = view 23 24 def legacy(self, key: str, source_slug: str) -> Token | None: # pragma: no cover 25 """Legacy HTTP-Basic auth for testing""" 26 if not settings.TEST and not settings.DEBUG: 27 return None 28 _username, _, password = b64decode(key.encode()).decode().partition(":") 29 token = self.check_token(password, source_slug) 30 if token: 31 CTX_AUTH_VIA.set("scim_basic") 32 return (token.user, token) 33 return None 34 35 def check_token(self, key: str, source_slug: str) -> Token | None: 36 """Check that a token exists, is not expired, and is assigned to the correct source""" 37 token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first() 38 if not token: 39 return None 40 source: SCIMSource = token.scimsource_set.first() 41 if not source: 42 return None 43 if source.slug != source_slug: 44 return None 45 self.view.source = source 46 return token 47 48 def authenticate(self, request: Request) -> tuple[User, Any] | None: 49 kwargs = request._request.resolver_match.kwargs 50 source_slug = kwargs.get("source_slug", None) 51 auth = get_authorization_header(request).decode() 52 auth_type, _, key = auth.partition(" ") 53 if auth_type != "Bearer": 54 return self.legacy(key, source_slug) 55 token = self.check_token(key, source_slug) 56 if not token: 57 return None 58 CTX_AUTH_VIA.set("scim_token") 59 return (token.user, token)
SCIM Token auth
24 def legacy(self, key: str, source_slug: str) -> Token | None: # pragma: no cover 25 """Legacy HTTP-Basic auth for testing""" 26 if not settings.TEST and not settings.DEBUG: 27 return None 28 _username, _, password = b64decode(key.encode()).decode().partition(":") 29 token = self.check_token(password, source_slug) 30 if token: 31 CTX_AUTH_VIA.set("scim_basic") 32 return (token.user, token) 33 return None
Legacy HTTP-Basic auth for testing
35 def check_token(self, key: str, source_slug: str) -> Token | None: 36 """Check that a token exists, is not expired, and is assigned to the correct source""" 37 token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first() 38 if not token: 39 return None 40 source: SCIMSource = token.scimsource_set.first() 41 if not source: 42 return None 43 if source.slug != source_slug: 44 return None 45 self.view.source = source 46 return token
Check that a token exists, is not expired, and is assigned to the correct source
def
authenticate( self, request: rest_framework.request.Request) -> tuple[authentik.core.models.User, Any] | None:
48 def authenticate(self, request: Request) -> tuple[User, Any] | None: 49 kwargs = request._request.resolver_match.kwargs 50 source_slug = kwargs.get("source_slug", None) 51 auth = get_authorization_header(request).decode() 52 auth_type, _, key = auth.partition(" ") 53 if auth_type != "Bearer": 54 return self.legacy(key, source_slug) 55 token = self.check_token(key, source_slug) 56 if not token: 57 return None 58 CTX_AUTH_VIA.set("scim_token") 59 return (token.user, token)
Authenticate the request and return a two-tuple of (user, token).