authentik.providers.oauth2.views.token_revoke
Token revocation endpoint
1"""Token revocation endpoint""" 2 3from dataclasses import dataclass 4 5from django.db.models import Q 6from django.http import Http404, HttpRequest, HttpResponse 7from django.utils.decorators import method_decorator 8from django.views import View 9from django.views.decorators.csrf import csrf_exempt 10from structlog.stdlib import get_logger 11 12from authentik.providers.oauth2.errors import TokenRevocationError 13from authentik.providers.oauth2.models import AccessToken, ClientType, OAuth2Provider, RefreshToken 14from authentik.providers.oauth2.utils import ( 15 TokenResponse, 16 authenticate_provider, 17 provider_from_request, 18) 19 20LOGGER = get_logger() 21 22 23@dataclass(slots=True) 24class TokenRevocationParams: 25 """Parameters for Token Revocation""" 26 27 token: RefreshToken | AccessToken 28 provider: OAuth2Provider 29 30 @staticmethod 31 def from_request(request: HttpRequest) -> TokenRevocationParams: 32 """Extract required Parameters from HTTP Request""" 33 raw_token = request.POST.get("token") 34 35 provider, _, _ = provider_from_request(request) 36 if provider and provider.client_type == ClientType.CONFIDENTIAL: 37 provider = authenticate_provider(request) 38 if not provider: 39 raise TokenRevocationError("invalid_client") 40 # By default clients can only revoke their own tokens 41 query = Q(provider=provider, token=raw_token) 42 if provider.client_type == ClientType.CONFIDENTIAL: 43 provider = authenticate_provider(request) 44 if not provider: 45 raise TokenRevocationError("invalid_client") 46 # If the request is authenticated by a confidential provider, it can also 47 # revoke federated tokens 48 query = Q( 49 Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]), 50 token=raw_token, 51 ) 52 53 access_token = AccessToken.objects.filter(query).first() 54 if access_token: 55 return TokenRevocationParams(access_token, provider) 56 refresh_token = RefreshToken.objects.filter(query).first() 57 if refresh_token: 58 return TokenRevocationParams(refresh_token, provider) 59 LOGGER.debug("Token does not exist", token=raw_token) 60 raise Http404 61 62 63@method_decorator(csrf_exempt, name="dispatch") 64class TokenRevokeView(View): 65 """Token revoke endpoint 66 https://datatracker.ietf.org/doc/html/rfc7009""" 67 68 token: RefreshToken 69 params: TokenRevocationParams 70 provider: OAuth2Provider 71 72 def post(self, request: HttpRequest) -> HttpResponse: 73 """Revocation handler""" 74 try: 75 self.params = TokenRevocationParams.from_request(request) 76 77 self.params.token.delete() 78 79 return TokenResponse(data={}, status=200) 80 except TokenRevocationError as exc: 81 return TokenResponse(exc.create_dict(request), status=401) 82 except Http404: 83 # Token not found should return a HTTP 200 84 # https://datatracker.ietf.org/doc/html/rfc7009#section-2.2 85 return TokenResponse(data={}, status=200)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass(slots=True)
class
TokenRevocationParams:
24@dataclass(slots=True) 25class TokenRevocationParams: 26 """Parameters for Token Revocation""" 27 28 token: RefreshToken | AccessToken 29 provider: OAuth2Provider 30 31 @staticmethod 32 def from_request(request: HttpRequest) -> TokenRevocationParams: 33 """Extract required Parameters from HTTP Request""" 34 raw_token = request.POST.get("token") 35 36 provider, _, _ = provider_from_request(request) 37 if provider and provider.client_type == ClientType.CONFIDENTIAL: 38 provider = authenticate_provider(request) 39 if not provider: 40 raise TokenRevocationError("invalid_client") 41 # By default clients can only revoke their own tokens 42 query = Q(provider=provider, token=raw_token) 43 if provider.client_type == ClientType.CONFIDENTIAL: 44 provider = authenticate_provider(request) 45 if not provider: 46 raise TokenRevocationError("invalid_client") 47 # If the request is authenticated by a confidential provider, it can also 48 # revoke federated tokens 49 query = Q( 50 Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]), 51 token=raw_token, 52 ) 53 54 access_token = AccessToken.objects.filter(query).first() 55 if access_token: 56 return TokenRevocationParams(access_token, provider) 57 refresh_token = RefreshToken.objects.filter(query).first() 58 if refresh_token: 59 return TokenRevocationParams(refresh_token, provider) 60 LOGGER.debug("Token does not exist", token=raw_token) 61 raise Http404
Parameters for Token Revocation
TokenRevocationParams( token: authentik.providers.oauth2.models.RefreshToken | authentik.providers.oauth2.models.AccessToken, provider: authentik.providers.oauth2.models.OAuth2Provider)
token: authentik.providers.oauth2.models.RefreshToken | authentik.providers.oauth2.models.AccessToken
31 @staticmethod 32 def from_request(request: HttpRequest) -> TokenRevocationParams: 33 """Extract required Parameters from HTTP Request""" 34 raw_token = request.POST.get("token") 35 36 provider, _, _ = provider_from_request(request) 37 if provider and provider.client_type == ClientType.CONFIDENTIAL: 38 provider = authenticate_provider(request) 39 if not provider: 40 raise TokenRevocationError("invalid_client") 41 # By default clients can only revoke their own tokens 42 query = Q(provider=provider, token=raw_token) 43 if provider.client_type == ClientType.CONFIDENTIAL: 44 provider = authenticate_provider(request) 45 if not provider: 46 raise TokenRevocationError("invalid_client") 47 # If the request is authenticated by a confidential provider, it can also 48 # revoke federated tokens 49 query = Q( 50 Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]), 51 token=raw_token, 52 ) 53 54 access_token = AccessToken.objects.filter(query).first() 55 if access_token: 56 return TokenRevocationParams(access_token, provider) 57 refresh_token = RefreshToken.objects.filter(query).first() 58 if refresh_token: 59 return TokenRevocationParams(refresh_token, provider) 60 LOGGER.debug("Token does not exist", token=raw_token) 61 raise Http404
Extract required Parameters from HTTP Request
@method_decorator(csrf_exempt, name='dispatch')
class
TokenRevokeView64@method_decorator(csrf_exempt, name="dispatch") 65class TokenRevokeView(View): 66 """Token revoke endpoint 67 https://datatracker.ietf.org/doc/html/rfc7009""" 68 69 token: RefreshToken 70 params: TokenRevocationParams 71 provider: OAuth2Provider 72 73 def post(self, request: HttpRequest) -> HttpResponse: 74 """Revocation handler""" 75 try: 76 self.params = TokenRevocationParams.from_request(request) 77 78 self.params.token.delete() 79 80 return TokenResponse(data={}, status=200) 81 except TokenRevocationError as exc: 82 return TokenResponse(exc.create_dict(request), status=401) 83 except Http404: 84 # Token not found should return a HTTP 200 85 # https://datatracker.ietf.org/doc/html/rfc7009#section-2.2 86 return TokenResponse(data={}, status=200)
Token revoke endpoint https://datatracker.ietf.org/doc/html/rfc7009
params: TokenRevocationParams
def
post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
73 def post(self, request: HttpRequest) -> HttpResponse: 74 """Revocation handler""" 75 try: 76 self.params = TokenRevocationParams.from_request(request) 77 78 self.params.token.delete() 79 80 return TokenResponse(data={}, status=200) 81 except TokenRevocationError as exc: 82 return TokenResponse(exc.create_dict(request), status=401) 83 except Http404: 84 # Token not found should return a HTTP 200 85 # https://datatracker.ietf.org/doc/html/rfc7009#section-2.2 86 return TokenResponse(data={}, status=200)
Revocation handler
def
dispatch(self, request, *args, **kwargs):
135 def dispatch(self, request, *args, **kwargs): 136 # Try to dispatch to the right method; if a method doesn't exist, 137 # defer to the error handler. Also defer to the error handler if the 138 # request method isn't on the approved list. 139 if request.method.lower() in self.http_method_names: 140 handler = getattr( 141 self, request.method.lower(), self.http_method_not_allowed 142 ) 143 else: 144 handler = self.http_method_not_allowed 145 return handler(request, *args, **kwargs)