authentik.providers.oauth2.views.introspection
authentik OAuth2 Token Introspection Views
1"""authentik OAuth2 Token Introspection Views""" 2 3from dataclasses import dataclass, field 4 5from django.http import HttpRequest, HttpResponse 6from django.utils.decorators import method_decorator 7from django.views import View 8from django.views.decorators.csrf import csrf_exempt 9from structlog.stdlib import get_logger 10 11from authentik.providers.oauth2.errors import TokenIntrospectionError 12from authentik.providers.oauth2.id_token import IDToken 13from authentik.providers.oauth2.models import AccessToken, OAuth2Provider, RefreshToken 14from authentik.providers.oauth2.utils import TokenResponse, authenticate_provider 15 16LOGGER = get_logger() 17 18 19@dataclass(slots=True) 20class TokenIntrospectionParams: 21 """Parameters for Token Introspection""" 22 23 token: RefreshToken | AccessToken 24 provider: OAuth2Provider 25 26 id_token: IDToken = field(init=False) 27 28 def __post_init__(self): 29 if self.token.is_expired: 30 LOGGER.debug("Token is not valid") 31 raise TokenIntrospectionError() 32 33 self.id_token = self.token.id_token 34 35 if not self.token.id_token: 36 LOGGER.debug( 37 "token not an authentication token", 38 token=self.token, 39 ) 40 raise TokenIntrospectionError() 41 42 @staticmethod 43 def from_request(request: HttpRequest) -> TokenIntrospectionParams: 44 """Extract required Parameters from HTTP Request""" 45 raw_token = request.POST.get("token") 46 provider = authenticate_provider(request) 47 if not provider: 48 raise TokenIntrospectionError 49 50 access_token = AccessToken.objects.filter(token=raw_token, provider=provider).first() 51 if access_token: 52 return TokenIntrospectionParams(access_token, provider) 53 refresh_token = RefreshToken.objects.filter(token=raw_token, provider=provider).first() 54 if refresh_token: 55 return TokenIntrospectionParams(refresh_token, provider) 56 LOGGER.debug("Token does not exist", token=raw_token) 57 raise TokenIntrospectionError() 58 59 60@method_decorator(csrf_exempt, name="dispatch") 61class TokenIntrospectionView(View): 62 """Token Introspection 63 https://datatracker.ietf.org/doc/html/rfc7662""" 64 65 token: RefreshToken | AccessToken 66 params: TokenIntrospectionParams 67 provider: OAuth2Provider 68 69 def post(self, request: HttpRequest) -> HttpResponse: 70 """Introspection handler""" 71 try: 72 self.params = TokenIntrospectionParams.from_request(request) 73 response = {} 74 if self.params.id_token: 75 response.update(self.params.id_token.to_dict()) 76 response["active"] = not self.params.token.is_expired and not self.params.token.revoked 77 response["scope"] = " ".join(self.params.token.scope) 78 response["client_id"] = self.params.provider.client_id 79 return TokenResponse(response) 80 except TokenIntrospectionError: 81 return TokenResponse({"active": False})
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass(slots=True)
class
TokenIntrospectionParams:
20@dataclass(slots=True) 21class TokenIntrospectionParams: 22 """Parameters for Token Introspection""" 23 24 token: RefreshToken | AccessToken 25 provider: OAuth2Provider 26 27 id_token: IDToken = field(init=False) 28 29 def __post_init__(self): 30 if self.token.is_expired: 31 LOGGER.debug("Token is not valid") 32 raise TokenIntrospectionError() 33 34 self.id_token = self.token.id_token 35 36 if not self.token.id_token: 37 LOGGER.debug( 38 "token not an authentication token", 39 token=self.token, 40 ) 41 raise TokenIntrospectionError() 42 43 @staticmethod 44 def from_request(request: HttpRequest) -> TokenIntrospectionParams: 45 """Extract required Parameters from HTTP Request""" 46 raw_token = request.POST.get("token") 47 provider = authenticate_provider(request) 48 if not provider: 49 raise TokenIntrospectionError 50 51 access_token = AccessToken.objects.filter(token=raw_token, provider=provider).first() 52 if access_token: 53 return TokenIntrospectionParams(access_token, provider) 54 refresh_token = RefreshToken.objects.filter(token=raw_token, provider=provider).first() 55 if refresh_token: 56 return TokenIntrospectionParams(refresh_token, provider) 57 LOGGER.debug("Token does not exist", token=raw_token) 58 raise TokenIntrospectionError()
Parameters for Token Introspection
TokenIntrospectionParams( token: authentik.providers.oauth2.models.RefreshToken | authentik.providers.oauth2.models.AccessToken, provider: authentik.providers.oauth2.models.OAuth2Provider)
token: authentik.providers.oauth2.models.RefreshToken | authentik.providers.oauth2.models.AccessToken
@staticmethod
def
from_request( request: django.http.request.HttpRequest) -> TokenIntrospectionParams:
43 @staticmethod 44 def from_request(request: HttpRequest) -> TokenIntrospectionParams: 45 """Extract required Parameters from HTTP Request""" 46 raw_token = request.POST.get("token") 47 provider = authenticate_provider(request) 48 if not provider: 49 raise TokenIntrospectionError 50 51 access_token = AccessToken.objects.filter(token=raw_token, provider=provider).first() 52 if access_token: 53 return TokenIntrospectionParams(access_token, provider) 54 refresh_token = RefreshToken.objects.filter(token=raw_token, provider=provider).first() 55 if refresh_token: 56 return TokenIntrospectionParams(refresh_token, provider) 57 LOGGER.debug("Token does not exist", token=raw_token) 58 raise TokenIntrospectionError()
Extract required Parameters from HTTP Request
@method_decorator(csrf_exempt, name='dispatch')
class
TokenIntrospectionView61@method_decorator(csrf_exempt, name="dispatch") 62class TokenIntrospectionView(View): 63 """Token Introspection 64 https://datatracker.ietf.org/doc/html/rfc7662""" 65 66 token: RefreshToken | AccessToken 67 params: TokenIntrospectionParams 68 provider: OAuth2Provider 69 70 def post(self, request: HttpRequest) -> HttpResponse: 71 """Introspection handler""" 72 try: 73 self.params = TokenIntrospectionParams.from_request(request) 74 response = {} 75 if self.params.id_token: 76 response.update(self.params.id_token.to_dict()) 77 response["active"] = not self.params.token.is_expired and not self.params.token.revoked 78 response["scope"] = " ".join(self.params.token.scope) 79 response["client_id"] = self.params.provider.client_id 80 return TokenResponse(response) 81 except TokenIntrospectionError: 82 return TokenResponse({"active": False})
Token Introspection https://datatracker.ietf.org/doc/html/rfc7662
token: authentik.providers.oauth2.models.RefreshToken | authentik.providers.oauth2.models.AccessToken
params: TokenIntrospectionParams
def
post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
70 def post(self, request: HttpRequest) -> HttpResponse: 71 """Introspection handler""" 72 try: 73 self.params = TokenIntrospectionParams.from_request(request) 74 response = {} 75 if self.params.id_token: 76 response.update(self.params.id_token.to_dict()) 77 response["active"] = not self.params.token.is_expired and not self.params.token.revoked 78 response["scope"] = " ".join(self.params.token.scope) 79 response["client_id"] = self.params.provider.client_id 80 return TokenResponse(response) 81 except TokenIntrospectionError: 82 return TokenResponse({"active": False})
Introspection handler
def
dispatch(self, request, *args, **kwargs):
135 def dispatch(self, request, *args, **kwargs): 136 # Try to dispatch to the right method; if a method doesn't exist, 137 # defer to the error handler. Also defer to the error handler if the 138 # request method isn't on the approved list. 139 if request.method.lower() in self.http_method_names: 140 handler = getattr( 141 self, request.method.lower(), self.http_method_not_allowed 142 ) 143 else: 144 handler = self.http_method_not_allowed 145 return handler(request, *args, **kwargs)