authentik.providers.oauth2.views.introspection

authentik OAuth2 Token Introspection Views

 1"""authentik OAuth2 Token Introspection Views"""
 2
 3from dataclasses import dataclass, field
 4
 5from django.http import HttpRequest, HttpResponse
 6from django.utils.decorators import method_decorator
 7from django.views import View
 8from django.views.decorators.csrf import csrf_exempt
 9from structlog.stdlib import get_logger
10
11from authentik.providers.oauth2.errors import TokenIntrospectionError
12from authentik.providers.oauth2.id_token import IDToken
13from authentik.providers.oauth2.models import AccessToken, OAuth2Provider, RefreshToken
14from authentik.providers.oauth2.utils import TokenResponse, authenticate_provider
15
16LOGGER = get_logger()
17
18
19@dataclass(slots=True)
20class TokenIntrospectionParams:
21    """Parameters for Token Introspection"""
22
23    token: RefreshToken | AccessToken
24    provider: OAuth2Provider
25
26    id_token: IDToken = field(init=False)
27
28    def __post_init__(self):
29        if self.token.is_expired:
30            LOGGER.debug("Token is not valid")
31            raise TokenIntrospectionError()
32
33        self.id_token = self.token.id_token
34
35        if not self.token.id_token:
36            LOGGER.debug(
37                "token not an authentication token",
38                token=self.token,
39            )
40            raise TokenIntrospectionError()
41
42    @staticmethod
43    def from_request(request: HttpRequest) -> TokenIntrospectionParams:
44        """Extract required Parameters from HTTP Request"""
45        raw_token = request.POST.get("token")
46        provider = authenticate_provider(request)
47        if not provider:
48            raise TokenIntrospectionError
49
50        access_token = AccessToken.objects.filter(token=raw_token, provider=provider).first()
51        if access_token:
52            return TokenIntrospectionParams(access_token, provider)
53        refresh_token = RefreshToken.objects.filter(token=raw_token, provider=provider).first()
54        if refresh_token:
55            return TokenIntrospectionParams(refresh_token, provider)
56        LOGGER.debug("Token does not exist", token=raw_token)
57        raise TokenIntrospectionError()
58
59
60@method_decorator(csrf_exempt, name="dispatch")
61class TokenIntrospectionView(View):
62    """Token Introspection
63    https://datatracker.ietf.org/doc/html/rfc7662"""
64
65    token: RefreshToken | AccessToken
66    params: TokenIntrospectionParams
67    provider: OAuth2Provider
68
69    def post(self, request: HttpRequest) -> HttpResponse:
70        """Introspection handler"""
71        try:
72            self.params = TokenIntrospectionParams.from_request(request)
73            response = {}
74            if self.params.id_token:
75                response.update(self.params.id_token.to_dict())
76            response["active"] = not self.params.token.is_expired and not self.params.token.revoked
77            response["scope"] = " ".join(self.params.token.scope)
78            response["client_id"] = self.params.provider.client_id
79            return TokenResponse(response)
80        except TokenIntrospectionError:
81            return TokenResponse({"active": False})
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass(slots=True)
class TokenIntrospectionParams:
20@dataclass(slots=True)
21class TokenIntrospectionParams:
22    """Parameters for Token Introspection"""
23
24    token: RefreshToken | AccessToken
25    provider: OAuth2Provider
26
27    id_token: IDToken = field(init=False)
28
29    def __post_init__(self):
30        if self.token.is_expired:
31            LOGGER.debug("Token is not valid")
32            raise TokenIntrospectionError()
33
34        self.id_token = self.token.id_token
35
36        if not self.token.id_token:
37            LOGGER.debug(
38                "token not an authentication token",
39                token=self.token,
40            )
41            raise TokenIntrospectionError()
42
43    @staticmethod
44    def from_request(request: HttpRequest) -> TokenIntrospectionParams:
45        """Extract required Parameters from HTTP Request"""
46        raw_token = request.POST.get("token")
47        provider = authenticate_provider(request)
48        if not provider:
49            raise TokenIntrospectionError
50
51        access_token = AccessToken.objects.filter(token=raw_token, provider=provider).first()
52        if access_token:
53            return TokenIntrospectionParams(access_token, provider)
54        refresh_token = RefreshToken.objects.filter(token=raw_token, provider=provider).first()
55        if refresh_token:
56            return TokenIntrospectionParams(refresh_token, provider)
57        LOGGER.debug("Token does not exist", token=raw_token)
58        raise TokenIntrospectionError()

Parameters for Token Introspection

@staticmethod
def from_request( request: django.http.request.HttpRequest) -> TokenIntrospectionParams:
43    @staticmethod
44    def from_request(request: HttpRequest) -> TokenIntrospectionParams:
45        """Extract required Parameters from HTTP Request"""
46        raw_token = request.POST.get("token")
47        provider = authenticate_provider(request)
48        if not provider:
49            raise TokenIntrospectionError
50
51        access_token = AccessToken.objects.filter(token=raw_token, provider=provider).first()
52        if access_token:
53            return TokenIntrospectionParams(access_token, provider)
54        refresh_token = RefreshToken.objects.filter(token=raw_token, provider=provider).first()
55        if refresh_token:
56            return TokenIntrospectionParams(refresh_token, provider)
57        LOGGER.debug("Token does not exist", token=raw_token)
58        raise TokenIntrospectionError()

Extract required Parameters from HTTP Request

@method_decorator(csrf_exempt, name='dispatch')
class TokenIntrospectionView(django.views.generic.base.View):
61@method_decorator(csrf_exempt, name="dispatch")
62class TokenIntrospectionView(View):
63    """Token Introspection
64    https://datatracker.ietf.org/doc/html/rfc7662"""
65
66    token: RefreshToken | AccessToken
67    params: TokenIntrospectionParams
68    provider: OAuth2Provider
69
70    def post(self, request: HttpRequest) -> HttpResponse:
71        """Introspection handler"""
72        try:
73            self.params = TokenIntrospectionParams.from_request(request)
74            response = {}
75            if self.params.id_token:
76                response.update(self.params.id_token.to_dict())
77            response["active"] = not self.params.token.is_expired and not self.params.token.revoked
78            response["scope"] = " ".join(self.params.token.scope)
79            response["client_id"] = self.params.provider.client_id
80            return TokenResponse(response)
81        except TokenIntrospectionError:
82            return TokenResponse({"active": False})
def post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
70    def post(self, request: HttpRequest) -> HttpResponse:
71        """Introspection handler"""
72        try:
73            self.params = TokenIntrospectionParams.from_request(request)
74            response = {}
75            if self.params.id_token:
76                response.update(self.params.id_token.to_dict())
77            response["active"] = not self.params.token.is_expired and not self.params.token.revoked
78            response["scope"] = " ".join(self.params.token.scope)
79            response["client_id"] = self.params.provider.client_id
80            return TokenResponse(response)
81        except TokenIntrospectionError:
82            return TokenResponse({"active": False})

Introspection handler

def dispatch(self, request, *args, **kwargs):
135    def dispatch(self, request, *args, **kwargs):
136        # Try to dispatch to the right method; if a method doesn't exist,
137        # defer to the error handler. Also defer to the error handler if the
138        # request method isn't on the approved list.
139        if request.method.lower() in self.http_method_names:
140            handler = getattr(
141                self, request.method.lower(), self.http_method_not_allowed
142            )
143        else:
144            handler = self.http_method_not_allowed
145        return handler(request, *args, **kwargs)