authentik.providers.oauth2.views.introspection
authentik OAuth2 Token Introspection Views
1"""authentik OAuth2 Token Introspection Views""" 2 3from dataclasses import dataclass, field 4 5from django.db.models import Q 6from django.http import HttpRequest, HttpResponse 7from django.utils.decorators import method_decorator 8from django.views import View 9from django.views.decorators.csrf import csrf_exempt 10from structlog.stdlib import get_logger 11 12from authentik.providers.oauth2.errors import TokenIntrospectionError 13from authentik.providers.oauth2.id_token import IDToken 14from authentik.providers.oauth2.models import AccessToken, ClientType, OAuth2Provider, RefreshToken 15from authentik.providers.oauth2.utils import TokenResponse, authenticate_provider 16 17LOGGER = get_logger() 18 19 20@dataclass(slots=True) 21class TokenIntrospectionParams: 22 """Parameters for Token Introspection""" 23 24 token: RefreshToken | AccessToken 25 provider: OAuth2Provider 26 27 id_token: IDToken = field(init=False) 28 29 def __post_init__(self): 30 if self.token.is_expired: 31 LOGGER.debug("Token is not valid") 32 raise TokenIntrospectionError() 33 34 self.id_token = self.token.id_token 35 36 if not self.token.id_token: 37 LOGGER.debug("token not an authentication token", token=self.token) 38 raise TokenIntrospectionError() 39 40 @staticmethod 41 def from_request(request: HttpRequest) -> TokenIntrospectionParams: 42 """Extract required Parameters from HTTP Request""" 43 raw_token = request.POST.get("token") 44 provider = authenticate_provider(request) 45 if not provider: 46 LOGGER.info("Failed to authenticate introspection request") 47 raise TokenIntrospectionError 48 if provider.client_type != ClientType.CONFIDENTIAL: 49 LOGGER.info("Introspection request from public provider, denying.") 50 raise TokenIntrospectionError 51 52 query = Q( 53 Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]), 54 token=raw_token, 55 ) 56 57 access_token = AccessToken.objects.filter(query).first() 58 if access_token: 59 return TokenIntrospectionParams(access_token, access_token.provider) 60 refresh_token = RefreshToken.objects.filter(query).first() 61 if refresh_token: 62 return TokenIntrospectionParams(refresh_token, refresh_token.provider) 63 LOGGER.debug("Token does not exist", token=raw_token) 64 raise TokenIntrospectionError() 65 66 67@method_decorator(csrf_exempt, name="dispatch") 68class TokenIntrospectionView(View): 69 """Token Introspection 70 https://datatracker.ietf.org/doc/html/rfc7662""" 71 72 token: RefreshToken | AccessToken 73 params: TokenIntrospectionParams 74 provider: OAuth2Provider 75 76 def post(self, request: HttpRequest) -> HttpResponse: 77 """Introspection handler""" 78 try: 79 self.params = TokenIntrospectionParams.from_request(request) 80 response = {} 81 if self.params.id_token: 82 response.update(self.params.id_token.to_dict()) 83 response["active"] = not self.params.token.is_expired and not self.params.token.revoked 84 response["scope"] = " ".join(self.params.token.scope) 85 response["client_id"] = self.params.provider.client_id 86 return TokenResponse(response) 87 except TokenIntrospectionError: 88 return TokenResponse({"active": False})
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass(slots=True)
class
TokenIntrospectionParams:
21@dataclass(slots=True) 22class TokenIntrospectionParams: 23 """Parameters for Token Introspection""" 24 25 token: RefreshToken | AccessToken 26 provider: OAuth2Provider 27 28 id_token: IDToken = field(init=False) 29 30 def __post_init__(self): 31 if self.token.is_expired: 32 LOGGER.debug("Token is not valid") 33 raise TokenIntrospectionError() 34 35 self.id_token = self.token.id_token 36 37 if not self.token.id_token: 38 LOGGER.debug("token not an authentication token", token=self.token) 39 raise TokenIntrospectionError() 40 41 @staticmethod 42 def from_request(request: HttpRequest) -> TokenIntrospectionParams: 43 """Extract required Parameters from HTTP Request""" 44 raw_token = request.POST.get("token") 45 provider = authenticate_provider(request) 46 if not provider: 47 LOGGER.info("Failed to authenticate introspection request") 48 raise TokenIntrospectionError 49 if provider.client_type != ClientType.CONFIDENTIAL: 50 LOGGER.info("Introspection request from public provider, denying.") 51 raise TokenIntrospectionError 52 53 query = Q( 54 Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]), 55 token=raw_token, 56 ) 57 58 access_token = AccessToken.objects.filter(query).first() 59 if access_token: 60 return TokenIntrospectionParams(access_token, access_token.provider) 61 refresh_token = RefreshToken.objects.filter(query).first() 62 if refresh_token: 63 return TokenIntrospectionParams(refresh_token, refresh_token.provider) 64 LOGGER.debug("Token does not exist", token=raw_token) 65 raise TokenIntrospectionError()
Parameters for Token Introspection
TokenIntrospectionParams( token: authentik.providers.oauth2.models.RefreshToken | authentik.providers.oauth2.models.AccessToken, provider: authentik.providers.oauth2.models.OAuth2Provider)
token: authentik.providers.oauth2.models.RefreshToken | authentik.providers.oauth2.models.AccessToken
@staticmethod
def
from_request( request: django.http.request.HttpRequest) -> TokenIntrospectionParams:
41 @staticmethod 42 def from_request(request: HttpRequest) -> TokenIntrospectionParams: 43 """Extract required Parameters from HTTP Request""" 44 raw_token = request.POST.get("token") 45 provider = authenticate_provider(request) 46 if not provider: 47 LOGGER.info("Failed to authenticate introspection request") 48 raise TokenIntrospectionError 49 if provider.client_type != ClientType.CONFIDENTIAL: 50 LOGGER.info("Introspection request from public provider, denying.") 51 raise TokenIntrospectionError 52 53 query = Q( 54 Q(provider=provider) | Q(provider__jwt_federation_providers__in=[provider]), 55 token=raw_token, 56 ) 57 58 access_token = AccessToken.objects.filter(query).first() 59 if access_token: 60 return TokenIntrospectionParams(access_token, access_token.provider) 61 refresh_token = RefreshToken.objects.filter(query).first() 62 if refresh_token: 63 return TokenIntrospectionParams(refresh_token, refresh_token.provider) 64 LOGGER.debug("Token does not exist", token=raw_token) 65 raise TokenIntrospectionError()
Extract required Parameters from HTTP Request
@method_decorator(csrf_exempt, name='dispatch')
class
TokenIntrospectionView68@method_decorator(csrf_exempt, name="dispatch") 69class TokenIntrospectionView(View): 70 """Token Introspection 71 https://datatracker.ietf.org/doc/html/rfc7662""" 72 73 token: RefreshToken | AccessToken 74 params: TokenIntrospectionParams 75 provider: OAuth2Provider 76 77 def post(self, request: HttpRequest) -> HttpResponse: 78 """Introspection handler""" 79 try: 80 self.params = TokenIntrospectionParams.from_request(request) 81 response = {} 82 if self.params.id_token: 83 response.update(self.params.id_token.to_dict()) 84 response["active"] = not self.params.token.is_expired and not self.params.token.revoked 85 response["scope"] = " ".join(self.params.token.scope) 86 response["client_id"] = self.params.provider.client_id 87 return TokenResponse(response) 88 except TokenIntrospectionError: 89 return TokenResponse({"active": False})
Token Introspection https://datatracker.ietf.org/doc/html/rfc7662
token: authentik.providers.oauth2.models.RefreshToken | authentik.providers.oauth2.models.AccessToken
params: TokenIntrospectionParams
def
post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
77 def post(self, request: HttpRequest) -> HttpResponse: 78 """Introspection handler""" 79 try: 80 self.params = TokenIntrospectionParams.from_request(request) 81 response = {} 82 if self.params.id_token: 83 response.update(self.params.id_token.to_dict()) 84 response["active"] = not self.params.token.is_expired and not self.params.token.revoked 85 response["scope"] = " ".join(self.params.token.scope) 86 response["client_id"] = self.params.provider.client_id 87 return TokenResponse(response) 88 except TokenIntrospectionError: 89 return TokenResponse({"active": False})
Introspection handler
def
dispatch(self, request, *args, **kwargs):
135 def dispatch(self, request, *args, **kwargs): 136 # Try to dispatch to the right method; if a method doesn't exist, 137 # defer to the error handler. Also defer to the error handler if the 138 # request method isn't on the approved list. 139 if request.method.lower() in self.http_method_names: 140 handler = getattr( 141 self, request.method.lower(), self.http_method_not_allowed 142 ) 143 else: 144 handler = self.http_method_not_allowed 145 return handler(request, *args, **kwargs)