authentik.enterprise.providers.ssf.views.auth
SSF Token auth
1"""SSF Token auth""" 2 3from typing import TYPE_CHECKING, Any 4 5from django.db.models import Q 6from rest_framework.authentication import BaseAuthentication, get_authorization_header 7from rest_framework.request import Request 8 9from authentik.core.models import Token, TokenIntents, User 10from authentik.enterprise.providers.ssf.models import SSFProvider 11from authentik.providers.oauth2.models import AccessToken 12 13if TYPE_CHECKING: 14 from authentik.enterprise.providers.ssf.views.base import SSFView 15 16 17class SSFTokenAuth(BaseAuthentication): 18 """SSF Token auth""" 19 20 view: SSFView 21 22 def __init__(self, view: SSFView) -> None: 23 super().__init__() 24 self.view = view 25 26 def check_token(self, key: str) -> Token | None: 27 """Check that a token exists, is not expired, and is assigned to the correct provider""" 28 token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first() 29 if not token: 30 return None 31 provider: SSFProvider = token.ssfprovider_set.first() 32 if not provider: 33 return None 34 self.view.application = provider.backchannel_application 35 self.view.provider = provider 36 return token 37 38 def check_jwt(self, jwt: str) -> AccessToken | None: 39 """Check JWT-based authentication, this supports tokens issued either by providers 40 configured directly in the provider, and by providers assigned to the application 41 that the SSF provider is a backchannel provider of.""" 42 token = AccessToken.objects.filter(token=jwt, revoked=False).first() 43 if not token: 44 return None 45 ssf_provider = SSFProvider.objects.filter( 46 Q(oidc_auth_providers__in=[token.provider]) 47 | Q(backchannel_application__provider__in=[token.provider]), 48 ).first() 49 if not ssf_provider: 50 return None 51 self.view.application = ssf_provider.backchannel_application 52 self.view.provider = ssf_provider 53 return token 54 55 def authenticate(self, request: Request) -> tuple[User, Any] | None: 56 auth = get_authorization_header(request).decode() 57 auth_type, _, key = auth.partition(" ") 58 if auth_type != "Bearer": 59 return None 60 token = self.check_token(key) 61 if token: 62 return (token.user, token) 63 jwt_token = self.check_jwt(key) 64 if jwt_token: 65 return (jwt_token.user, token) 66 return None 67 68 # Required to correctly propagate a 401 header which the SSF spec requires 69 def authenticate_header(self, request): 70 return "SSF"
class
SSFTokenAuth(rest_framework.authentication.BaseAuthentication):
18class SSFTokenAuth(BaseAuthentication): 19 """SSF Token auth""" 20 21 view: SSFView 22 23 def __init__(self, view: SSFView) -> None: 24 super().__init__() 25 self.view = view 26 27 def check_token(self, key: str) -> Token | None: 28 """Check that a token exists, is not expired, and is assigned to the correct provider""" 29 token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first() 30 if not token: 31 return None 32 provider: SSFProvider = token.ssfprovider_set.first() 33 if not provider: 34 return None 35 self.view.application = provider.backchannel_application 36 self.view.provider = provider 37 return token 38 39 def check_jwt(self, jwt: str) -> AccessToken | None: 40 """Check JWT-based authentication, this supports tokens issued either by providers 41 configured directly in the provider, and by providers assigned to the application 42 that the SSF provider is a backchannel provider of.""" 43 token = AccessToken.objects.filter(token=jwt, revoked=False).first() 44 if not token: 45 return None 46 ssf_provider = SSFProvider.objects.filter( 47 Q(oidc_auth_providers__in=[token.provider]) 48 | Q(backchannel_application__provider__in=[token.provider]), 49 ).first() 50 if not ssf_provider: 51 return None 52 self.view.application = ssf_provider.backchannel_application 53 self.view.provider = ssf_provider 54 return token 55 56 def authenticate(self, request: Request) -> tuple[User, Any] | None: 57 auth = get_authorization_header(request).decode() 58 auth_type, _, key = auth.partition(" ") 59 if auth_type != "Bearer": 60 return None 61 token = self.check_token(key) 62 if token: 63 return (token.user, token) 64 jwt_token = self.check_jwt(key) 65 if jwt_token: 66 return (jwt_token.user, token) 67 return None 68 69 # Required to correctly propagate a 401 header which the SSF spec requires 70 def authenticate_header(self, request): 71 return "SSF"
SSF Token auth
SSFTokenAuth(view: authentik.enterprise.providers.ssf.views.base.SSFView)
27 def check_token(self, key: str) -> Token | None: 28 """Check that a token exists, is not expired, and is assigned to the correct provider""" 29 token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first() 30 if not token: 31 return None 32 provider: SSFProvider = token.ssfprovider_set.first() 33 if not provider: 34 return None 35 self.view.application = provider.backchannel_application 36 self.view.provider = provider 37 return token
Check that a token exists, is not expired, and is assigned to the correct provider
39 def check_jwt(self, jwt: str) -> AccessToken | None: 40 """Check JWT-based authentication, this supports tokens issued either by providers 41 configured directly in the provider, and by providers assigned to the application 42 that the SSF provider is a backchannel provider of.""" 43 token = AccessToken.objects.filter(token=jwt, revoked=False).first() 44 if not token: 45 return None 46 ssf_provider = SSFProvider.objects.filter( 47 Q(oidc_auth_providers__in=[token.provider]) 48 | Q(backchannel_application__provider__in=[token.provider]), 49 ).first() 50 if not ssf_provider: 51 return None 52 self.view.application = ssf_provider.backchannel_application 53 self.view.provider = ssf_provider 54 return token
Check JWT-based authentication, this supports tokens issued either by providers configured directly in the provider, and by providers assigned to the application that the SSF provider is a backchannel provider of.
def
authenticate( self, request: rest_framework.request.Request) -> tuple[authentik.core.models.User, Any] | None:
56 def authenticate(self, request: Request) -> tuple[User, Any] | None: 57 auth = get_authorization_header(request).decode() 58 auth_type, _, key = auth.partition(" ") 59 if auth_type != "Bearer": 60 return None 61 token = self.check_token(key) 62 if token: 63 return (token.user, token) 64 jwt_token = self.check_jwt(key) 65 if jwt_token: 66 return (jwt_token.user, token) 67 return None
Authenticate the request and return a two-tuple of (user, token).