authentik.providers.oauth2.views.token_revoke

Token revocation endpoint

 1"""Token revocation endpoint"""
 2
 3from dataclasses import dataclass
 4
 5from django.db.models import Q
 6from django.http import Http404, HttpRequest, HttpResponse
 7from django.utils.decorators import method_decorator
 8from django.views import View
 9from django.views.decorators.csrf import csrf_exempt
10from structlog.stdlib import get_logger
11
12from authentik.providers.oauth2.errors import TokenRevocationError
13from authentik.providers.oauth2.models import AccessToken, ClientType, OAuth2Provider, RefreshToken
14from authentik.providers.oauth2.utils import (
15    TokenResponse,
16    authenticate_provider,
17    provider_from_request,
18)
19
20LOGGER = get_logger()
21
22
23@dataclass(slots=True)
24class TokenRevocationParams:
25    """Parameters for Token Revocation"""
26
27    token: RefreshToken | AccessToken
28    provider: OAuth2Provider
29
30    @staticmethod
31    def from_request(request: HttpRequest) -> TokenRevocationParams:
32        """Extract required Parameters from HTTP Request"""
33        raw_token = request.POST.get("token")
34
35        provider, _, _ = provider_from_request(request)
36        if provider and provider.client_type == ClientType.CONFIDENTIAL:
37            provider = authenticate_provider(request)
38        if not provider:
39            raise TokenRevocationError("invalid_client")
40        # By default clients can only revoke their own tokens
41        query = Q(provider=provider, token=raw_token)
42        if provider.client_type == ClientType.CONFIDENTIAL:
43            provider = authenticate_provider(request)
44            if not provider:
45                raise TokenRevocationError("invalid_client")
46            # If the request is authenticated by a confidential provider, it can also
47            # revoke federated tokens
48            query = Q(
49                Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]),
50                token=raw_token,
51            )
52
53        access_token = AccessToken.objects.filter(query).first()
54        if access_token:
55            return TokenRevocationParams(access_token, provider)
56        refresh_token = RefreshToken.objects.filter(query).first()
57        if refresh_token:
58            return TokenRevocationParams(refresh_token, provider)
59        LOGGER.debug("Token does not exist", token=raw_token)
60        raise Http404
61
62
63@method_decorator(csrf_exempt, name="dispatch")
64class TokenRevokeView(View):
65    """Token revoke endpoint
66    https://datatracker.ietf.org/doc/html/rfc7009"""
67
68    token: RefreshToken
69    params: TokenRevocationParams
70    provider: OAuth2Provider
71
72    def post(self, request: HttpRequest) -> HttpResponse:
73        """Revocation handler"""
74        try:
75            self.params = TokenRevocationParams.from_request(request)
76
77            self.params.token.delete()
78
79            return TokenResponse(data={}, status=200)
80        except TokenRevocationError as exc:
81            return TokenResponse(exc.create_dict(request), status=401)
82        except Http404:
83            # Token not found should return a HTTP 200
84            # https://datatracker.ietf.org/doc/html/rfc7009#section-2.2
85            return TokenResponse(data={}, status=200)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass(slots=True)
class TokenRevocationParams:
24@dataclass(slots=True)
25class TokenRevocationParams:
26    """Parameters for Token Revocation"""
27
28    token: RefreshToken | AccessToken
29    provider: OAuth2Provider
30
31    @staticmethod
32    def from_request(request: HttpRequest) -> TokenRevocationParams:
33        """Extract required Parameters from HTTP Request"""
34        raw_token = request.POST.get("token")
35
36        provider, _, _ = provider_from_request(request)
37        if provider and provider.client_type == ClientType.CONFIDENTIAL:
38            provider = authenticate_provider(request)
39        if not provider:
40            raise TokenRevocationError("invalid_client")
41        # By default clients can only revoke their own tokens
42        query = Q(provider=provider, token=raw_token)
43        if provider.client_type == ClientType.CONFIDENTIAL:
44            provider = authenticate_provider(request)
45            if not provider:
46                raise TokenRevocationError("invalid_client")
47            # If the request is authenticated by a confidential provider, it can also
48            # revoke federated tokens
49            query = Q(
50                Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]),
51                token=raw_token,
52            )
53
54        access_token = AccessToken.objects.filter(query).first()
55        if access_token:
56            return TokenRevocationParams(access_token, provider)
57        refresh_token = RefreshToken.objects.filter(query).first()
58        if refresh_token:
59            return TokenRevocationParams(refresh_token, provider)
60        LOGGER.debug("Token does not exist", token=raw_token)
61        raise Http404

Parameters for Token Revocation

@staticmethod
def from_request( request: django.http.request.HttpRequest) -> TokenRevocationParams:
31    @staticmethod
32    def from_request(request: HttpRequest) -> TokenRevocationParams:
33        """Extract required Parameters from HTTP Request"""
34        raw_token = request.POST.get("token")
35
36        provider, _, _ = provider_from_request(request)
37        if provider and provider.client_type == ClientType.CONFIDENTIAL:
38            provider = authenticate_provider(request)
39        if not provider:
40            raise TokenRevocationError("invalid_client")
41        # By default clients can only revoke their own tokens
42        query = Q(provider=provider, token=raw_token)
43        if provider.client_type == ClientType.CONFIDENTIAL:
44            provider = authenticate_provider(request)
45            if not provider:
46                raise TokenRevocationError("invalid_client")
47            # If the request is authenticated by a confidential provider, it can also
48            # revoke federated tokens
49            query = Q(
50                Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]),
51                token=raw_token,
52            )
53
54        access_token = AccessToken.objects.filter(query).first()
55        if access_token:
56            return TokenRevocationParams(access_token, provider)
57        refresh_token = RefreshToken.objects.filter(query).first()
58        if refresh_token:
59            return TokenRevocationParams(refresh_token, provider)
60        LOGGER.debug("Token does not exist", token=raw_token)
61        raise Http404

Extract required Parameters from HTTP Request

@method_decorator(csrf_exempt, name='dispatch')
class TokenRevokeView(django.views.generic.base.View):
64@method_decorator(csrf_exempt, name="dispatch")
65class TokenRevokeView(View):
66    """Token revoke endpoint
67    https://datatracker.ietf.org/doc/html/rfc7009"""
68
69    token: RefreshToken
70    params: TokenRevocationParams
71    provider: OAuth2Provider
72
73    def post(self, request: HttpRequest) -> HttpResponse:
74        """Revocation handler"""
75        try:
76            self.params = TokenRevocationParams.from_request(request)
77
78            self.params.token.delete()
79
80            return TokenResponse(data={}, status=200)
81        except TokenRevocationError as exc:
82            return TokenResponse(exc.create_dict(request), status=401)
83        except Http404:
84            # Token not found should return a HTTP 200
85            # https://datatracker.ietf.org/doc/html/rfc7009#section-2.2
86            return TokenResponse(data={}, status=200)
def post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
73    def post(self, request: HttpRequest) -> HttpResponse:
74        """Revocation handler"""
75        try:
76            self.params = TokenRevocationParams.from_request(request)
77
78            self.params.token.delete()
79
80            return TokenResponse(data={}, status=200)
81        except TokenRevocationError as exc:
82            return TokenResponse(exc.create_dict(request), status=401)
83        except Http404:
84            # Token not found should return a HTTP 200
85            # https://datatracker.ietf.org/doc/html/rfc7009#section-2.2
86            return TokenResponse(data={}, status=200)

Revocation handler

def dispatch(self, request, *args, **kwargs):
135    def dispatch(self, request, *args, **kwargs):
136        # Try to dispatch to the right method; if a method doesn't exist,
137        # defer to the error handler. Also defer to the error handler if the
138        # request method isn't on the approved list.
139        if request.method.lower() in self.http_method_names:
140            handler = getattr(
141                self, request.method.lower(), self.http_method_not_allowed
142            )
143        else:
144            handler = self.http_method_not_allowed
145        return handler(request, *args, **kwargs)