authentik.providers.oauth2.views.introspection

authentik OAuth2 Token Introspection Views

 1"""authentik OAuth2 Token Introspection Views"""
 2
 3from dataclasses import dataclass, field
 4
 5from django.db.models import Q
 6from django.http import HttpRequest, HttpResponse
 7from django.utils.decorators import method_decorator
 8from django.views import View
 9from django.views.decorators.csrf import csrf_exempt
10from structlog.stdlib import get_logger
11
12from authentik.providers.oauth2.errors import TokenIntrospectionError
13from authentik.providers.oauth2.id_token import IDToken
14from authentik.providers.oauth2.models import AccessToken, ClientType, OAuth2Provider, RefreshToken
15from authentik.providers.oauth2.utils import TokenResponse, authenticate_provider
16
17LOGGER = get_logger()
18
19
20@dataclass(slots=True)
21class TokenIntrospectionParams:
22    """Parameters for Token Introspection"""
23
24    token: RefreshToken | AccessToken
25    provider: OAuth2Provider
26
27    id_token: IDToken = field(init=False)
28
29    def __post_init__(self):
30        if self.token.is_expired:
31            LOGGER.debug("Token is not valid")
32            raise TokenIntrospectionError()
33
34        self.id_token = self.token.id_token
35
36        if not self.token.id_token:
37            LOGGER.debug("token not an authentication token", token=self.token)
38            raise TokenIntrospectionError()
39
40    @staticmethod
41    def from_request(request: HttpRequest) -> TokenIntrospectionParams:
42        """Extract required Parameters from HTTP Request"""
43        raw_token = request.POST.get("token")
44        provider = authenticate_provider(request)
45        if not provider:
46            LOGGER.info("Failed to authenticate introspection request")
47            raise TokenIntrospectionError
48        if provider.client_type != ClientType.CONFIDENTIAL:
49            LOGGER.info("Introspection request from public provider, denying.")
50            raise TokenIntrospectionError
51
52        query = Q(
53            Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]),
54            token=raw_token,
55        )
56
57        access_token = AccessToken.objects.filter(query).first()
58        if access_token:
59            return TokenIntrospectionParams(access_token, access_token.provider)
60        refresh_token = RefreshToken.objects.filter(query).first()
61        if refresh_token:
62            return TokenIntrospectionParams(refresh_token, refresh_token.provider)
63        LOGGER.debug("Token does not exist", token=raw_token)
64        raise TokenIntrospectionError()
65
66
67@method_decorator(csrf_exempt, name="dispatch")
68class TokenIntrospectionView(View):
69    """Token Introspection
70    https://datatracker.ietf.org/doc/html/rfc7662"""
71
72    token: RefreshToken | AccessToken
73    params: TokenIntrospectionParams
74    provider: OAuth2Provider
75
76    def post(self, request: HttpRequest) -> HttpResponse:
77        """Introspection handler"""
78        try:
79            self.params = TokenIntrospectionParams.from_request(request)
80            response = {}
81            if self.params.id_token:
82                response.update(self.params.id_token.to_dict())
83            response["active"] = not self.params.token.is_expired and not self.params.token.revoked
84            response["scope"] = " ".join(self.params.token.scope)
85            response["client_id"] = self.params.provider.client_id
86            return TokenResponse(response)
87        except TokenIntrospectionError:
88            return TokenResponse({"active": False})
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass(slots=True)
class TokenIntrospectionParams:
21@dataclass(slots=True)
22class TokenIntrospectionParams:
23    """Parameters for Token Introspection"""
24
25    token: RefreshToken | AccessToken
26    provider: OAuth2Provider
27
28    id_token: IDToken = field(init=False)
29
30    def __post_init__(self):
31        if self.token.is_expired:
32            LOGGER.debug("Token is not valid")
33            raise TokenIntrospectionError()
34
35        self.id_token = self.token.id_token
36
37        if not self.token.id_token:
38            LOGGER.debug("token not an authentication token", token=self.token)
39            raise TokenIntrospectionError()
40
41    @staticmethod
42    def from_request(request: HttpRequest) -> TokenIntrospectionParams:
43        """Extract required Parameters from HTTP Request"""
44        raw_token = request.POST.get("token")
45        provider = authenticate_provider(request)
46        if not provider:
47            LOGGER.info("Failed to authenticate introspection request")
48            raise TokenIntrospectionError
49        if provider.client_type != ClientType.CONFIDENTIAL:
50            LOGGER.info("Introspection request from public provider, denying.")
51            raise TokenIntrospectionError
52
53        query = Q(
54            Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]),
55            token=raw_token,
56        )
57
58        access_token = AccessToken.objects.filter(query).first()
59        if access_token:
60            return TokenIntrospectionParams(access_token, access_token.provider)
61        refresh_token = RefreshToken.objects.filter(query).first()
62        if refresh_token:
63            return TokenIntrospectionParams(refresh_token, refresh_token.provider)
64        LOGGER.debug("Token does not exist", token=raw_token)
65        raise TokenIntrospectionError()

Parameters for Token Introspection

@staticmethod
def from_request( request: django.http.request.HttpRequest) -> TokenIntrospectionParams:
41    @staticmethod
42    def from_request(request: HttpRequest) -> TokenIntrospectionParams:
43        """Extract required Parameters from HTTP Request"""
44        raw_token = request.POST.get("token")
45        provider = authenticate_provider(request)
46        if not provider:
47            LOGGER.info("Failed to authenticate introspection request")
48            raise TokenIntrospectionError
49        if provider.client_type != ClientType.CONFIDENTIAL:
50            LOGGER.info("Introspection request from public provider, denying.")
51            raise TokenIntrospectionError
52
53        query = Q(
54            Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]),
55            token=raw_token,
56        )
57
58        access_token = AccessToken.objects.filter(query).first()
59        if access_token:
60            return TokenIntrospectionParams(access_token, access_token.provider)
61        refresh_token = RefreshToken.objects.filter(query).first()
62        if refresh_token:
63            return TokenIntrospectionParams(refresh_token, refresh_token.provider)
64        LOGGER.debug("Token does not exist", token=raw_token)
65        raise TokenIntrospectionError()

Extract required Parameters from HTTP Request

@method_decorator(csrf_exempt, name='dispatch')
class TokenIntrospectionView(django.views.generic.base.View):
68@method_decorator(csrf_exempt, name="dispatch")
69class TokenIntrospectionView(View):
70    """Token Introspection
71    https://datatracker.ietf.org/doc/html/rfc7662"""
72
73    token: RefreshToken | AccessToken
74    params: TokenIntrospectionParams
75    provider: OAuth2Provider
76
77    def post(self, request: HttpRequest) -> HttpResponse:
78        """Introspection handler"""
79        try:
80            self.params = TokenIntrospectionParams.from_request(request)
81            response = {}
82            if self.params.id_token:
83                response.update(self.params.id_token.to_dict())
84            response["active"] = not self.params.token.is_expired and not self.params.token.revoked
85            response["scope"] = " ".join(self.params.token.scope)
86            response["client_id"] = self.params.provider.client_id
87            return TokenResponse(response)
88        except TokenIntrospectionError:
89            return TokenResponse({"active": False})
def post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
77    def post(self, request: HttpRequest) -> HttpResponse:
78        """Introspection handler"""
79        try:
80            self.params = TokenIntrospectionParams.from_request(request)
81            response = {}
82            if self.params.id_token:
83                response.update(self.params.id_token.to_dict())
84            response["active"] = not self.params.token.is_expired and not self.params.token.revoked
85            response["scope"] = " ".join(self.params.token.scope)
86            response["client_id"] = self.params.provider.client_id
87            return TokenResponse(response)
88        except TokenIntrospectionError:
89            return TokenResponse({"active": False})

Introspection handler

def dispatch(self, request, *args, **kwargs):
135    def dispatch(self, request, *args, **kwargs):
136        # Try to dispatch to the right method; if a method doesn't exist,
137        # defer to the error handler. Also defer to the error handler if the
138        # request method isn't on the approved list.
139        if request.method.lower() in self.http_method_names:
140            handler = getattr(
141                self, request.method.lower(), self.http_method_not_allowed
142            )
143        else:
144            handler = self.http_method_not_allowed
145        return handler(request, *args, **kwargs)