authentik.enterprise.providers.ssf.views.auth

SSF Token auth

 1"""SSF Token auth"""
 2
 3from typing import TYPE_CHECKING, Any
 4
 5from django.db.models import Q
 6from rest_framework.authentication import BaseAuthentication, get_authorization_header
 7from rest_framework.request import Request
 8
 9from authentik.core.models import Token, TokenIntents, User
10from authentik.enterprise.providers.ssf.models import SSFProvider
11from authentik.providers.oauth2.models import AccessToken
12
13if TYPE_CHECKING:
14    from authentik.enterprise.providers.ssf.views.base import SSFView
15
16
17class SSFTokenAuth(BaseAuthentication):
18    """SSF Token auth"""
19
20    view: SSFView
21
22    def __init__(self, view: SSFView) -> None:
23        super().__init__()
24        self.view = view
25
26    def check_token(self, key: str) -> Token | None:
27        """Check that a token exists, is not expired, and is assigned to the correct provider"""
28        token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first()
29        if not token:
30            return None
31        provider: SSFProvider = token.ssfprovider_set.first()
32        if not provider:
33            return None
34        self.view.application = provider.backchannel_application
35        self.view.provider = provider
36        return token
37
38    def check_jwt(self, jwt: str) -> AccessToken | None:
39        """Check JWT-based authentication, this supports tokens issued either by providers
40        configured directly in the provider, and by providers assigned to the application
41        that the SSF provider is a backchannel provider of."""
42        token = AccessToken.objects.filter(token=jwt, revoked=False).first()
43        if not token:
44            return None
45        ssf_provider = SSFProvider.objects.filter(
46            Q(oidc_auth_providers__in=[token.provider])
47            | Q(backchannel_application__provider__in=[token.provider]),
48        ).first()
49        if not ssf_provider:
50            return None
51        self.view.application = ssf_provider.backchannel_application
52        self.view.provider = ssf_provider
53        return token
54
55    def authenticate(self, request: Request) -> tuple[User, Any] | None:
56        auth = get_authorization_header(request).decode()
57        auth_type, _, key = auth.partition(" ")
58        if auth_type != "Bearer":
59            return None
60        token = self.check_token(key)
61        if token:
62            return (token.user, token)
63        jwt_token = self.check_jwt(key)
64        if jwt_token:
65            return (jwt_token.user, token)
66        return None
67
68    # Required to correctly propagate a 401 header which the SSF spec requires
69    def authenticate_header(self, request):
70        return "SSF"
class SSFTokenAuth(rest_framework.authentication.BaseAuthentication):
18class SSFTokenAuth(BaseAuthentication):
19    """SSF Token auth"""
20
21    view: SSFView
22
23    def __init__(self, view: SSFView) -> None:
24        super().__init__()
25        self.view = view
26
27    def check_token(self, key: str) -> Token | None:
28        """Check that a token exists, is not expired, and is assigned to the correct provider"""
29        token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first()
30        if not token:
31            return None
32        provider: SSFProvider = token.ssfprovider_set.first()
33        if not provider:
34            return None
35        self.view.application = provider.backchannel_application
36        self.view.provider = provider
37        return token
38
39    def check_jwt(self, jwt: str) -> AccessToken | None:
40        """Check JWT-based authentication, this supports tokens issued either by providers
41        configured directly in the provider, and by providers assigned to the application
42        that the SSF provider is a backchannel provider of."""
43        token = AccessToken.objects.filter(token=jwt, revoked=False).first()
44        if not token:
45            return None
46        ssf_provider = SSFProvider.objects.filter(
47            Q(oidc_auth_providers__in=[token.provider])
48            | Q(backchannel_application__provider__in=[token.provider]),
49        ).first()
50        if not ssf_provider:
51            return None
52        self.view.application = ssf_provider.backchannel_application
53        self.view.provider = ssf_provider
54        return token
55
56    def authenticate(self, request: Request) -> tuple[User, Any] | None:
57        auth = get_authorization_header(request).decode()
58        auth_type, _, key = auth.partition(" ")
59        if auth_type != "Bearer":
60            return None
61        token = self.check_token(key)
62        if token:
63            return (token.user, token)
64        jwt_token = self.check_jwt(key)
65        if jwt_token:
66            return (jwt_token.user, token)
67        return None
68
69    # Required to correctly propagate a 401 header which the SSF spec requires
70    def authenticate_header(self, request):
71        return "SSF"

SSF Token auth

23    def __init__(self, view: SSFView) -> None:
24        super().__init__()
25        self.view = view
def check_token(self, key: str) -> authentik.core.models.Token | None:
27    def check_token(self, key: str) -> Token | None:
28        """Check that a token exists, is not expired, and is assigned to the correct provider"""
29        token = Token.objects.filter(key=key, intent=TokenIntents.INTENT_API).first()
30        if not token:
31            return None
32        provider: SSFProvider = token.ssfprovider_set.first()
33        if not provider:
34            return None
35        self.view.application = provider.backchannel_application
36        self.view.provider = provider
37        return token

Check that a token exists, is not expired, and is assigned to the correct provider

def check_jwt(self, jwt: str) -> authentik.providers.oauth2.models.AccessToken | None:
39    def check_jwt(self, jwt: str) -> AccessToken | None:
40        """Check JWT-based authentication, this supports tokens issued either by providers
41        configured directly in the provider, and by providers assigned to the application
42        that the SSF provider is a backchannel provider of."""
43        token = AccessToken.objects.filter(token=jwt, revoked=False).first()
44        if not token:
45            return None
46        ssf_provider = SSFProvider.objects.filter(
47            Q(oidc_auth_providers__in=[token.provider])
48            | Q(backchannel_application__provider__in=[token.provider]),
49        ).first()
50        if not ssf_provider:
51            return None
52        self.view.application = ssf_provider.backchannel_application
53        self.view.provider = ssf_provider
54        return token

Check JWT-based authentication, this supports tokens issued either by providers configured directly in the provider, and by providers assigned to the application that the SSF provider is a backchannel provider of.

def authenticate( self, request: rest_framework.request.Request) -> tuple[authentik.core.models.User, Any] | None:
56    def authenticate(self, request: Request) -> tuple[User, Any] | None:
57        auth = get_authorization_header(request).decode()
58        auth_type, _, key = auth.partition(" ")
59        if auth_type != "Bearer":
60            return None
61        token = self.check_token(key)
62        if token:
63            return (token.user, token)
64        jwt_token = self.check_jwt(key)
65        if jwt_token:
66            return (jwt_token.user, token)
67        return None

Authenticate the request and return a two-tuple of (user, token).

def authenticate_header(self, request):
70    def authenticate_header(self, request):
71        return "SSF"

Return a string to be used as the value of the WWW-Authenticate header in a 401 Unauthenticated response, or None if the authentication scheme should return 403 Permission Denied responses.