authentik.providers.oauth2.views.end_session
oauth2 provider end_session Views
1"""oauth2 provider end_session Views""" 2 3from re import fullmatch 4from urllib.parse import quote, urlparse 5 6from django.http import Http404, HttpRequest, HttpResponse 7from django.shortcuts import get_object_or_404 8from jwt import PyJWTError 9from jwt import decode as jwt_decode 10 11from authentik.common.oauth.constants import ( 12 FORBIDDEN_URI_SCHEMES, 13 OAUTH2_BINDING, 14 PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS, 15 PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI, 16) 17from authentik.core.models import Application, AuthenticatedSession 18from authentik.flows.models import Flow, in_memory_stage 19from authentik.flows.planner import ( 20 PLAN_CONTEXT_APPLICATION, 21 FlowPlanner, 22) 23from authentik.flows.stage import SessionEndStage 24from authentik.flows.views.executor import SESSION_KEY_PLAN 25from authentik.lib.views import bad_request_message 26from authentik.policies.views import PolicyAccessView 27from authentik.providers.iframe_logout import IframeLogoutStageView 28from authentik.providers.oauth2.errors import TokenError 29from authentik.providers.oauth2.models import ( 30 AccessToken, 31 JWTAlgorithms, 32 OAuth2LogoutMethod, 33 OAuth2Provider, 34 RedirectURIMatchingMode, 35) 36from authentik.providers.oauth2.tasks import send_backchannel_logout_request 37from authentik.providers.oauth2.utils import build_frontchannel_logout_url 38 39 40class EndSessionView(PolicyAccessView): 41 """OIDC RP-Initiated Logout endpoint""" 42 43 flow: Flow 44 post_logout_redirect_uri: str | None 45 46 def resolve_provider_application(self): 47 self.application = get_object_or_404(Application, slug=self.kwargs["application_slug"]) 48 self.provider = self.application.get_provider() 49 if not self.provider: 50 raise Http404 51 self.flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation 52 if not self.flow: 53 raise Http404 54 55 def validate(self): 56 # Parse end session parameters 57 query_dict = self.request.POST if self.request.method == "POST" else self.request.GET 58 state = query_dict.get("state") 59 request_redirect_uri = query_dict.get("post_logout_redirect_uri") 60 id_token_hint = query_dict.get("id_token_hint") 61 self.post_logout_redirect_uri = None 62 63 # OIDC Certification: Verify id_token_hint. If invalid or missing, throw an error 64 if id_token_hint: 65 # Load a fresh provider instance that's not part of the flow 66 # since it'll have the cryptography Certificate that can't be pickled 67 provider = OAuth2Provider.objects.get(pk=self.provider.pk) 68 key, alg = provider.jwt_key 69 if alg != JWTAlgorithms.HS256: 70 key = provider.signing_key.public_key 71 try: 72 jwt_decode( 73 id_token_hint, 74 key, 75 algorithms=[alg], 76 audience=provider.client_id, 77 issuer=provider.get_issuer(self.request), 78 # ID Tokens are short-lived; a logout request arriving 79 # after expiry is still legitimate and must succeed. 80 options={"verify_exp": False}, 81 ) 82 except PyJWTError: 83 raise TokenError("invalid_request").with_cause( 84 "id_token_hint_decode_failed" 85 ) from None 86 87 # Validate post_logout_redirect_uri against registered URIs 88 if request_redirect_uri: 89 # OIDC Certification: id_token_hint required with post_logout_redirect_uri 90 if not id_token_hint: 91 raise TokenError("invalid_request").with_cause("id_token_hint_missing") 92 if urlparse(request_redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 93 raise TokenError("invalid_request").with_cause("post_logout_redirect_uri") 94 for allowed in self.provider.post_logout_redirect_uris: 95 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 96 if request_redirect_uri == allowed.url: 97 self.post_logout_redirect_uri = request_redirect_uri 98 break 99 elif allowed.matching_mode == RedirectURIMatchingMode.REGEX: 100 if fullmatch(allowed.url, request_redirect_uri): 101 self.post_logout_redirect_uri = request_redirect_uri 102 break 103 # OIDC Certification: OP MUST NOT perform post-logout redirection 104 # if the supplied URI does not exactly match a registered one 105 if self.post_logout_redirect_uri is None: 106 raise TokenError("invalid_request").with_cause("invalid_post_logout_redirect_uri") 107 108 # Append state to the redirect URI if both are present 109 if self.post_logout_redirect_uri and state: 110 separator = "&" if "?" in self.post_logout_redirect_uri else "?" 111 self.post_logout_redirect_uri = ( 112 f"{self.post_logout_redirect_uri}{separator}state={quote(state, safe='')}" 113 ) 114 115 # If IFrame provider logout happens when a saml provider has redirect 116 # logout enabled, the flow won't make it back without this dispatch 117 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 118 """Check for active logout flow before policy checks""" 119 120 # Check if we're already in an active logout flow 121 # (being called from an iframe during single logout) 122 if SESSION_KEY_PLAN in request.session: 123 return HttpResponse( 124 "<html><body>Logout successful</body></html>", content_type="text/html", status=200 125 ) 126 127 return super().dispatch(request, *args, **kwargs) 128 129 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 130 """Dispatch the flow planner for the invalidation flow""" 131 try: 132 self.validate() 133 except TokenError as exc: 134 return bad_request_message( 135 self.request, 136 exc.description, 137 ) 138 planner = FlowPlanner(self.flow) 139 planner.allow_empty_flows = True 140 141 context = { 142 PLAN_CONTEXT_APPLICATION: self.application, 143 } 144 145 auth_session = AuthenticatedSession.from_request(request, request.user) 146 147 if self.post_logout_redirect_uri: 148 context[PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI] = self.post_logout_redirect_uri 149 150 session_key = ( 151 auth_session.session.session_key if auth_session and auth_session.session else None 152 ) 153 154 frontchannel_logout_url = None 155 if self.provider.logout_method == OAuth2LogoutMethod.FRONTCHANNEL: 156 frontchannel_logout_url = build_frontchannel_logout_url( 157 self.provider, request, session_key 158 ) 159 160 if ( 161 self.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL 162 and self.provider.logout_uri 163 ): 164 access_token = AccessToken.objects.filter( 165 user=request.user, 166 provider=self.provider, 167 session=auth_session, 168 ).first() 169 if access_token and access_token.id_token: 170 send_backchannel_logout_request.send( 171 self.provider.pk, 172 access_token.id_token.iss, 173 access_token.id_token.sub, 174 session_key, 175 ) 176 # Delete the token to prevent duplicate backchannel logout 177 # when UserLogoutStage triggers the session deletion signal 178 access_token.delete() 179 180 if frontchannel_logout_url: 181 context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = [ 182 { 183 "url": frontchannel_logout_url, 184 "provider_name": self.provider.name, 185 "binding": OAUTH2_BINDING, 186 "provider_type": ( 187 f"{self.provider._meta.app_label}.{self.provider._meta.model_name}" 188 ), 189 } 190 ] 191 192 access_tokens = AccessToken.objects.filter( 193 user=request.user, 194 provider=self.provider, 195 ) 196 if auth_session: 197 access_tokens = access_tokens.filter(session=auth_session) 198 access_tokens.delete() 199 200 plan = planner.plan(request, context) 201 202 if frontchannel_logout_url: 203 plan.insert_stage(in_memory_stage(IframeLogoutStageView)) 204 205 plan.append_stage(in_memory_stage(SessionEndStage)) 206 return plan.to_redirect(self.request, self.flow) 207 208 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 209 """Handle POST requests for logout (same as GET per OIDC spec)""" 210 return self.get(request, *args, **kwargs)
41class EndSessionView(PolicyAccessView): 42 """OIDC RP-Initiated Logout endpoint""" 43 44 flow: Flow 45 post_logout_redirect_uri: str | None 46 47 def resolve_provider_application(self): 48 self.application = get_object_or_404(Application, slug=self.kwargs["application_slug"]) 49 self.provider = self.application.get_provider() 50 if not self.provider: 51 raise Http404 52 self.flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation 53 if not self.flow: 54 raise Http404 55 56 def validate(self): 57 # Parse end session parameters 58 query_dict = self.request.POST if self.request.method == "POST" else self.request.GET 59 state = query_dict.get("state") 60 request_redirect_uri = query_dict.get("post_logout_redirect_uri") 61 id_token_hint = query_dict.get("id_token_hint") 62 self.post_logout_redirect_uri = None 63 64 # OIDC Certification: Verify id_token_hint. If invalid or missing, throw an error 65 if id_token_hint: 66 # Load a fresh provider instance that's not part of the flow 67 # since it'll have the cryptography Certificate that can't be pickled 68 provider = OAuth2Provider.objects.get(pk=self.provider.pk) 69 key, alg = provider.jwt_key 70 if alg != JWTAlgorithms.HS256: 71 key = provider.signing_key.public_key 72 try: 73 jwt_decode( 74 id_token_hint, 75 key, 76 algorithms=[alg], 77 audience=provider.client_id, 78 issuer=provider.get_issuer(self.request), 79 # ID Tokens are short-lived; a logout request arriving 80 # after expiry is still legitimate and must succeed. 81 options={"verify_exp": False}, 82 ) 83 except PyJWTError: 84 raise TokenError("invalid_request").with_cause( 85 "id_token_hint_decode_failed" 86 ) from None 87 88 # Validate post_logout_redirect_uri against registered URIs 89 if request_redirect_uri: 90 # OIDC Certification: id_token_hint required with post_logout_redirect_uri 91 if not id_token_hint: 92 raise TokenError("invalid_request").with_cause("id_token_hint_missing") 93 if urlparse(request_redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 94 raise TokenError("invalid_request").with_cause("post_logout_redirect_uri") 95 for allowed in self.provider.post_logout_redirect_uris: 96 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 97 if request_redirect_uri == allowed.url: 98 self.post_logout_redirect_uri = request_redirect_uri 99 break 100 elif allowed.matching_mode == RedirectURIMatchingMode.REGEX: 101 if fullmatch(allowed.url, request_redirect_uri): 102 self.post_logout_redirect_uri = request_redirect_uri 103 break 104 # OIDC Certification: OP MUST NOT perform post-logout redirection 105 # if the supplied URI does not exactly match a registered one 106 if self.post_logout_redirect_uri is None: 107 raise TokenError("invalid_request").with_cause("invalid_post_logout_redirect_uri") 108 109 # Append state to the redirect URI if both are present 110 if self.post_logout_redirect_uri and state: 111 separator = "&" if "?" in self.post_logout_redirect_uri else "?" 112 self.post_logout_redirect_uri = ( 113 f"{self.post_logout_redirect_uri}{separator}state={quote(state, safe='')}" 114 ) 115 116 # If IFrame provider logout happens when a saml provider has redirect 117 # logout enabled, the flow won't make it back without this dispatch 118 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 119 """Check for active logout flow before policy checks""" 120 121 # Check if we're already in an active logout flow 122 # (being called from an iframe during single logout) 123 if SESSION_KEY_PLAN in request.session: 124 return HttpResponse( 125 "<html><body>Logout successful</body></html>", content_type="text/html", status=200 126 ) 127 128 return super().dispatch(request, *args, **kwargs) 129 130 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 131 """Dispatch the flow planner for the invalidation flow""" 132 try: 133 self.validate() 134 except TokenError as exc: 135 return bad_request_message( 136 self.request, 137 exc.description, 138 ) 139 planner = FlowPlanner(self.flow) 140 planner.allow_empty_flows = True 141 142 context = { 143 PLAN_CONTEXT_APPLICATION: self.application, 144 } 145 146 auth_session = AuthenticatedSession.from_request(request, request.user) 147 148 if self.post_logout_redirect_uri: 149 context[PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI] = self.post_logout_redirect_uri 150 151 session_key = ( 152 auth_session.session.session_key if auth_session and auth_session.session else None 153 ) 154 155 frontchannel_logout_url = None 156 if self.provider.logout_method == OAuth2LogoutMethod.FRONTCHANNEL: 157 frontchannel_logout_url = build_frontchannel_logout_url( 158 self.provider, request, session_key 159 ) 160 161 if ( 162 self.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL 163 and self.provider.logout_uri 164 ): 165 access_token = AccessToken.objects.filter( 166 user=request.user, 167 provider=self.provider, 168 session=auth_session, 169 ).first() 170 if access_token and access_token.id_token: 171 send_backchannel_logout_request.send( 172 self.provider.pk, 173 access_token.id_token.iss, 174 access_token.id_token.sub, 175 session_key, 176 ) 177 # Delete the token to prevent duplicate backchannel logout 178 # when UserLogoutStage triggers the session deletion signal 179 access_token.delete() 180 181 if frontchannel_logout_url: 182 context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = [ 183 { 184 "url": frontchannel_logout_url, 185 "provider_name": self.provider.name, 186 "binding": OAUTH2_BINDING, 187 "provider_type": ( 188 f"{self.provider._meta.app_label}.{self.provider._meta.model_name}" 189 ), 190 } 191 ] 192 193 access_tokens = AccessToken.objects.filter( 194 user=request.user, 195 provider=self.provider, 196 ) 197 if auth_session: 198 access_tokens = access_tokens.filter(session=auth_session) 199 access_tokens.delete() 200 201 plan = planner.plan(request, context) 202 203 if frontchannel_logout_url: 204 plan.insert_stage(in_memory_stage(IframeLogoutStageView)) 205 206 plan.append_stage(in_memory_stage(SessionEndStage)) 207 return plan.to_redirect(self.request, self.flow) 208 209 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 210 """Handle POST requests for logout (same as GET per OIDC spec)""" 211 return self.get(request, *args, **kwargs)
OIDC RP-Initiated Logout endpoint
def
resolve_provider_application(self):
47 def resolve_provider_application(self): 48 self.application = get_object_or_404(Application, slug=self.kwargs["application_slug"]) 49 self.provider = self.application.get_provider() 50 if not self.provider: 51 raise Http404 52 self.flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation 53 if not self.flow: 54 raise Http404
Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly
def
validate(self):
56 def validate(self): 57 # Parse end session parameters 58 query_dict = self.request.POST if self.request.method == "POST" else self.request.GET 59 state = query_dict.get("state") 60 request_redirect_uri = query_dict.get("post_logout_redirect_uri") 61 id_token_hint = query_dict.get("id_token_hint") 62 self.post_logout_redirect_uri = None 63 64 # OIDC Certification: Verify id_token_hint. If invalid or missing, throw an error 65 if id_token_hint: 66 # Load a fresh provider instance that's not part of the flow 67 # since it'll have the cryptography Certificate that can't be pickled 68 provider = OAuth2Provider.objects.get(pk=self.provider.pk) 69 key, alg = provider.jwt_key 70 if alg != JWTAlgorithms.HS256: 71 key = provider.signing_key.public_key 72 try: 73 jwt_decode( 74 id_token_hint, 75 key, 76 algorithms=[alg], 77 audience=provider.client_id, 78 issuer=provider.get_issuer(self.request), 79 # ID Tokens are short-lived; a logout request arriving 80 # after expiry is still legitimate and must succeed. 81 options={"verify_exp": False}, 82 ) 83 except PyJWTError: 84 raise TokenError("invalid_request").with_cause( 85 "id_token_hint_decode_failed" 86 ) from None 87 88 # Validate post_logout_redirect_uri against registered URIs 89 if request_redirect_uri: 90 # OIDC Certification: id_token_hint required with post_logout_redirect_uri 91 if not id_token_hint: 92 raise TokenError("invalid_request").with_cause("id_token_hint_missing") 93 if urlparse(request_redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 94 raise TokenError("invalid_request").with_cause("post_logout_redirect_uri") 95 for allowed in self.provider.post_logout_redirect_uris: 96 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 97 if request_redirect_uri == allowed.url: 98 self.post_logout_redirect_uri = request_redirect_uri 99 break 100 elif allowed.matching_mode == RedirectURIMatchingMode.REGEX: 101 if fullmatch(allowed.url, request_redirect_uri): 102 self.post_logout_redirect_uri = request_redirect_uri 103 break 104 # OIDC Certification: OP MUST NOT perform post-logout redirection 105 # if the supplied URI does not exactly match a registered one 106 if self.post_logout_redirect_uri is None: 107 raise TokenError("invalid_request").with_cause("invalid_post_logout_redirect_uri") 108 109 # Append state to the redirect URI if both are present 110 if self.post_logout_redirect_uri and state: 111 separator = "&" if "?" in self.post_logout_redirect_uri else "?" 112 self.post_logout_redirect_uri = ( 113 f"{self.post_logout_redirect_uri}{separator}state={quote(state, safe='')}" 114 )
def
dispatch( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
118 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 119 """Check for active logout flow before policy checks""" 120 121 # Check if we're already in an active logout flow 122 # (being called from an iframe during single logout) 123 if SESSION_KEY_PLAN in request.session: 124 return HttpResponse( 125 "<html><body>Logout successful</body></html>", content_type="text/html", status=200 126 ) 127 128 return super().dispatch(request, *args, **kwargs)
Check for active logout flow before policy checks
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
130 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 131 """Dispatch the flow planner for the invalidation flow""" 132 try: 133 self.validate() 134 except TokenError as exc: 135 return bad_request_message( 136 self.request, 137 exc.description, 138 ) 139 planner = FlowPlanner(self.flow) 140 planner.allow_empty_flows = True 141 142 context = { 143 PLAN_CONTEXT_APPLICATION: self.application, 144 } 145 146 auth_session = AuthenticatedSession.from_request(request, request.user) 147 148 if self.post_logout_redirect_uri: 149 context[PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI] = self.post_logout_redirect_uri 150 151 session_key = ( 152 auth_session.session.session_key if auth_session and auth_session.session else None 153 ) 154 155 frontchannel_logout_url = None 156 if self.provider.logout_method == OAuth2LogoutMethod.FRONTCHANNEL: 157 frontchannel_logout_url = build_frontchannel_logout_url( 158 self.provider, request, session_key 159 ) 160 161 if ( 162 self.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL 163 and self.provider.logout_uri 164 ): 165 access_token = AccessToken.objects.filter( 166 user=request.user, 167 provider=self.provider, 168 session=auth_session, 169 ).first() 170 if access_token and access_token.id_token: 171 send_backchannel_logout_request.send( 172 self.provider.pk, 173 access_token.id_token.iss, 174 access_token.id_token.sub, 175 session_key, 176 ) 177 # Delete the token to prevent duplicate backchannel logout 178 # when UserLogoutStage triggers the session deletion signal 179 access_token.delete() 180 181 if frontchannel_logout_url: 182 context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = [ 183 { 184 "url": frontchannel_logout_url, 185 "provider_name": self.provider.name, 186 "binding": OAUTH2_BINDING, 187 "provider_type": ( 188 f"{self.provider._meta.app_label}.{self.provider._meta.model_name}" 189 ), 190 } 191 ] 192 193 access_tokens = AccessToken.objects.filter( 194 user=request.user, 195 provider=self.provider, 196 ) 197 if auth_session: 198 access_tokens = access_tokens.filter(session=auth_session) 199 access_tokens.delete() 200 201 plan = planner.plan(request, context) 202 203 if frontchannel_logout_url: 204 plan.insert_stage(in_memory_stage(IframeLogoutStageView)) 205 206 plan.append_stage(in_memory_stage(SessionEndStage)) 207 return plan.to_redirect(self.request, self.flow)
Dispatch the flow planner for the invalidation flow
def
post( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
209 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 210 """Handle POST requests for logout (same as GET per OIDC spec)""" 211 return self.get(request, *args, **kwargs)
Handle POST requests for logout (same as GET per OIDC spec)