authentik.providers.oauth2.views.end_session

oauth2 provider end_session Views

  1"""oauth2 provider end_session Views"""
  2
  3from re import fullmatch
  4from urllib.parse import quote, urlparse
  5
  6from django.http import Http404, HttpRequest, HttpResponse
  7from django.shortcuts import get_object_or_404
  8from jwt import PyJWTError
  9from jwt import decode as jwt_decode
 10
 11from authentik.common.oauth.constants import (
 12    FORBIDDEN_URI_SCHEMES,
 13    OAUTH2_BINDING,
 14    PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS,
 15    PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI,
 16)
 17from authentik.core.models import Application, AuthenticatedSession
 18from authentik.flows.models import Flow, in_memory_stage
 19from authentik.flows.planner import (
 20    PLAN_CONTEXT_APPLICATION,
 21    FlowPlanner,
 22)
 23from authentik.flows.stage import SessionEndStage
 24from authentik.flows.views.executor import SESSION_KEY_PLAN
 25from authentik.lib.views import bad_request_message
 26from authentik.policies.views import PolicyAccessView
 27from authentik.providers.iframe_logout import IframeLogoutStageView
 28from authentik.providers.oauth2.errors import TokenError
 29from authentik.providers.oauth2.models import (
 30    AccessToken,
 31    JWTAlgorithms,
 32    OAuth2LogoutMethod,
 33    OAuth2Provider,
 34    RedirectURIMatchingMode,
 35)
 36from authentik.providers.oauth2.tasks import send_backchannel_logout_request
 37from authentik.providers.oauth2.utils import build_frontchannel_logout_url
 38
 39
 40class EndSessionView(PolicyAccessView):
 41    """OIDC RP-Initiated Logout endpoint"""
 42
 43    flow: Flow
 44    post_logout_redirect_uri: str | None
 45
 46    def resolve_provider_application(self):
 47        self.application = get_object_or_404(Application, slug=self.kwargs["application_slug"])
 48        self.provider = self.application.get_provider()
 49        if not self.provider:
 50            raise Http404
 51        self.flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation
 52        if not self.flow:
 53            raise Http404
 54
 55    def validate(self):
 56        # Parse end session parameters
 57        query_dict = self.request.POST if self.request.method == "POST" else self.request.GET
 58        state = query_dict.get("state")
 59        request_redirect_uri = query_dict.get("post_logout_redirect_uri")
 60        id_token_hint = query_dict.get("id_token_hint")
 61        self.post_logout_redirect_uri = None
 62
 63        # OIDC Certification: Verify id_token_hint. If invalid or missing, throw an error
 64        if id_token_hint:
 65            # Load a fresh provider instance that's not part of the flow
 66            # since it'll have the cryptography Certificate that can't be pickled
 67            provider = OAuth2Provider.objects.get(pk=self.provider.pk)
 68            key, alg = provider.jwt_key
 69            if alg != JWTAlgorithms.HS256:
 70                key = provider.signing_key.public_key
 71            try:
 72                jwt_decode(
 73                    id_token_hint,
 74                    key,
 75                    algorithms=[alg],
 76                    audience=provider.client_id,
 77                    issuer=provider.get_issuer(self.request),
 78                    # ID Tokens are short-lived; a logout request arriving
 79                    # after expiry is still legitimate and must succeed.
 80                    options={"verify_exp": False},
 81                )
 82            except PyJWTError:
 83                raise TokenError("invalid_request").with_cause(
 84                    "id_token_hint_decode_failed"
 85                ) from None
 86
 87        # Validate post_logout_redirect_uri against registered URIs
 88        if request_redirect_uri:
 89            # OIDC Certification: id_token_hint required with post_logout_redirect_uri
 90            if not id_token_hint:
 91                raise TokenError("invalid_request").with_cause("id_token_hint_missing")
 92            if urlparse(request_redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
 93                raise TokenError("invalid_request").with_cause("post_logout_redirect_uri")
 94            for allowed in self.provider.post_logout_redirect_uris:
 95                if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
 96                    if request_redirect_uri == allowed.url:
 97                        self.post_logout_redirect_uri = request_redirect_uri
 98                        break
 99                elif allowed.matching_mode == RedirectURIMatchingMode.REGEX:
100                    if fullmatch(allowed.url, request_redirect_uri):
101                        self.post_logout_redirect_uri = request_redirect_uri
102                        break
103            # OIDC Certification: OP MUST NOT perform post-logout redirection
104            # if the supplied URI does not exactly match a registered one
105            if self.post_logout_redirect_uri is None:
106                raise TokenError("invalid_request").with_cause("invalid_post_logout_redirect_uri")
107
108        # Append state to the redirect URI if both are present
109        if self.post_logout_redirect_uri and state:
110            separator = "&" if "?" in self.post_logout_redirect_uri else "?"
111            self.post_logout_redirect_uri = (
112                f"{self.post_logout_redirect_uri}{separator}state={quote(state, safe='')}"
113            )
114
115    # If IFrame provider logout happens when a saml provider has redirect
116    # logout enabled, the flow won't make it back without this dispatch
117    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
118        """Check for active logout flow before policy checks"""
119
120        # Check if we're already in an active logout flow
121        # (being called from an iframe during single logout)
122        if SESSION_KEY_PLAN in request.session:
123            return HttpResponse(
124                "<html><body>Logout successful</body></html>", content_type="text/html", status=200
125            )
126
127        return super().dispatch(request, *args, **kwargs)
128
129    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
130        """Dispatch the flow planner for the invalidation flow"""
131        try:
132            self.validate()
133        except TokenError as exc:
134            return bad_request_message(
135                self.request,
136                exc.description,
137            )
138        planner = FlowPlanner(self.flow)
139        planner.allow_empty_flows = True
140
141        context = {
142            PLAN_CONTEXT_APPLICATION: self.application,
143        }
144
145        auth_session = AuthenticatedSession.from_request(request, request.user)
146
147        if self.post_logout_redirect_uri:
148            context[PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI] = self.post_logout_redirect_uri
149
150        session_key = (
151            auth_session.session.session_key if auth_session and auth_session.session else None
152        )
153
154        frontchannel_logout_url = None
155        if self.provider.logout_method == OAuth2LogoutMethod.FRONTCHANNEL:
156            frontchannel_logout_url = build_frontchannel_logout_url(
157                self.provider, request, session_key
158            )
159
160        if (
161            self.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL
162            and self.provider.logout_uri
163        ):
164            access_token = AccessToken.objects.filter(
165                user=request.user,
166                provider=self.provider,
167                session=auth_session,
168            ).first()
169            if access_token and access_token.id_token:
170                send_backchannel_logout_request.send(
171                    self.provider.pk,
172                    access_token.id_token.iss,
173                    access_token.id_token.sub,
174                    session_key,
175                )
176                # Delete the token to prevent duplicate backchannel logout
177                # when UserLogoutStage triggers the session deletion signal
178                access_token.delete()
179
180        if frontchannel_logout_url:
181            context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = [
182                {
183                    "url": frontchannel_logout_url,
184                    "provider_name": self.provider.name,
185                    "binding": OAUTH2_BINDING,
186                    "provider_type": (
187                        f"{self.provider._meta.app_label}.{self.provider._meta.model_name}"
188                    ),
189                }
190            ]
191
192        access_tokens = AccessToken.objects.filter(
193            user=request.user,
194            provider=self.provider,
195        )
196        if auth_session:
197            access_tokens = access_tokens.filter(session=auth_session)
198        access_tokens.delete()
199
200        plan = planner.plan(request, context)
201
202        if frontchannel_logout_url:
203            plan.insert_stage(in_memory_stage(IframeLogoutStageView))
204
205        plan.append_stage(in_memory_stage(SessionEndStage))
206        return plan.to_redirect(self.request, self.flow)
207
208    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
209        """Handle POST requests for logout (same as GET per OIDC spec)"""
210        return self.get(request, *args, **kwargs)
class EndSessionView(authentik.policies.views.PolicyAccessView):
 41class EndSessionView(PolicyAccessView):
 42    """OIDC RP-Initiated Logout endpoint"""
 43
 44    flow: Flow
 45    post_logout_redirect_uri: str | None
 46
 47    def resolve_provider_application(self):
 48        self.application = get_object_or_404(Application, slug=self.kwargs["application_slug"])
 49        self.provider = self.application.get_provider()
 50        if not self.provider:
 51            raise Http404
 52        self.flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation
 53        if not self.flow:
 54            raise Http404
 55
 56    def validate(self):
 57        # Parse end session parameters
 58        query_dict = self.request.POST if self.request.method == "POST" else self.request.GET
 59        state = query_dict.get("state")
 60        request_redirect_uri = query_dict.get("post_logout_redirect_uri")
 61        id_token_hint = query_dict.get("id_token_hint")
 62        self.post_logout_redirect_uri = None
 63
 64        # OIDC Certification: Verify id_token_hint. If invalid or missing, throw an error
 65        if id_token_hint:
 66            # Load a fresh provider instance that's not part of the flow
 67            # since it'll have the cryptography Certificate that can't be pickled
 68            provider = OAuth2Provider.objects.get(pk=self.provider.pk)
 69            key, alg = provider.jwt_key
 70            if alg != JWTAlgorithms.HS256:
 71                key = provider.signing_key.public_key
 72            try:
 73                jwt_decode(
 74                    id_token_hint,
 75                    key,
 76                    algorithms=[alg],
 77                    audience=provider.client_id,
 78                    issuer=provider.get_issuer(self.request),
 79                    # ID Tokens are short-lived; a logout request arriving
 80                    # after expiry is still legitimate and must succeed.
 81                    options={"verify_exp": False},
 82                )
 83            except PyJWTError:
 84                raise TokenError("invalid_request").with_cause(
 85                    "id_token_hint_decode_failed"
 86                ) from None
 87
 88        # Validate post_logout_redirect_uri against registered URIs
 89        if request_redirect_uri:
 90            # OIDC Certification: id_token_hint required with post_logout_redirect_uri
 91            if not id_token_hint:
 92                raise TokenError("invalid_request").with_cause("id_token_hint_missing")
 93            if urlparse(request_redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
 94                raise TokenError("invalid_request").with_cause("post_logout_redirect_uri")
 95            for allowed in self.provider.post_logout_redirect_uris:
 96                if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
 97                    if request_redirect_uri == allowed.url:
 98                        self.post_logout_redirect_uri = request_redirect_uri
 99                        break
100                elif allowed.matching_mode == RedirectURIMatchingMode.REGEX:
101                    if fullmatch(allowed.url, request_redirect_uri):
102                        self.post_logout_redirect_uri = request_redirect_uri
103                        break
104            # OIDC Certification: OP MUST NOT perform post-logout redirection
105            # if the supplied URI does not exactly match a registered one
106            if self.post_logout_redirect_uri is None:
107                raise TokenError("invalid_request").with_cause("invalid_post_logout_redirect_uri")
108
109        # Append state to the redirect URI if both are present
110        if self.post_logout_redirect_uri and state:
111            separator = "&" if "?" in self.post_logout_redirect_uri else "?"
112            self.post_logout_redirect_uri = (
113                f"{self.post_logout_redirect_uri}{separator}state={quote(state, safe='')}"
114            )
115
116    # If IFrame provider logout happens when a saml provider has redirect
117    # logout enabled, the flow won't make it back without this dispatch
118    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
119        """Check for active logout flow before policy checks"""
120
121        # Check if we're already in an active logout flow
122        # (being called from an iframe during single logout)
123        if SESSION_KEY_PLAN in request.session:
124            return HttpResponse(
125                "<html><body>Logout successful</body></html>", content_type="text/html", status=200
126            )
127
128        return super().dispatch(request, *args, **kwargs)
129
130    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
131        """Dispatch the flow planner for the invalidation flow"""
132        try:
133            self.validate()
134        except TokenError as exc:
135            return bad_request_message(
136                self.request,
137                exc.description,
138            )
139        planner = FlowPlanner(self.flow)
140        planner.allow_empty_flows = True
141
142        context = {
143            PLAN_CONTEXT_APPLICATION: self.application,
144        }
145
146        auth_session = AuthenticatedSession.from_request(request, request.user)
147
148        if self.post_logout_redirect_uri:
149            context[PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI] = self.post_logout_redirect_uri
150
151        session_key = (
152            auth_session.session.session_key if auth_session and auth_session.session else None
153        )
154
155        frontchannel_logout_url = None
156        if self.provider.logout_method == OAuth2LogoutMethod.FRONTCHANNEL:
157            frontchannel_logout_url = build_frontchannel_logout_url(
158                self.provider, request, session_key
159            )
160
161        if (
162            self.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL
163            and self.provider.logout_uri
164        ):
165            access_token = AccessToken.objects.filter(
166                user=request.user,
167                provider=self.provider,
168                session=auth_session,
169            ).first()
170            if access_token and access_token.id_token:
171                send_backchannel_logout_request.send(
172                    self.provider.pk,
173                    access_token.id_token.iss,
174                    access_token.id_token.sub,
175                    session_key,
176                )
177                # Delete the token to prevent duplicate backchannel logout
178                # when UserLogoutStage triggers the session deletion signal
179                access_token.delete()
180
181        if frontchannel_logout_url:
182            context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = [
183                {
184                    "url": frontchannel_logout_url,
185                    "provider_name": self.provider.name,
186                    "binding": OAUTH2_BINDING,
187                    "provider_type": (
188                        f"{self.provider._meta.app_label}.{self.provider._meta.model_name}"
189                    ),
190                }
191            ]
192
193        access_tokens = AccessToken.objects.filter(
194            user=request.user,
195            provider=self.provider,
196        )
197        if auth_session:
198            access_tokens = access_tokens.filter(session=auth_session)
199        access_tokens.delete()
200
201        plan = planner.plan(request, context)
202
203        if frontchannel_logout_url:
204            plan.insert_stage(in_memory_stage(IframeLogoutStageView))
205
206        plan.append_stage(in_memory_stage(SessionEndStage))
207        return plan.to_redirect(self.request, self.flow)
208
209    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
210        """Handle POST requests for logout (same as GET per OIDC spec)"""
211        return self.get(request, *args, **kwargs)

OIDC RP-Initiated Logout endpoint

post_logout_redirect_uri: str | None
def resolve_provider_application(self):
47    def resolve_provider_application(self):
48        self.application = get_object_or_404(Application, slug=self.kwargs["application_slug"])
49        self.provider = self.application.get_provider()
50        if not self.provider:
51            raise Http404
52        self.flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation
53        if not self.flow:
54            raise Http404

Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly

def validate(self):
 56    def validate(self):
 57        # Parse end session parameters
 58        query_dict = self.request.POST if self.request.method == "POST" else self.request.GET
 59        state = query_dict.get("state")
 60        request_redirect_uri = query_dict.get("post_logout_redirect_uri")
 61        id_token_hint = query_dict.get("id_token_hint")
 62        self.post_logout_redirect_uri = None
 63
 64        # OIDC Certification: Verify id_token_hint. If invalid or missing, throw an error
 65        if id_token_hint:
 66            # Load a fresh provider instance that's not part of the flow
 67            # since it'll have the cryptography Certificate that can't be pickled
 68            provider = OAuth2Provider.objects.get(pk=self.provider.pk)
 69            key, alg = provider.jwt_key
 70            if alg != JWTAlgorithms.HS256:
 71                key = provider.signing_key.public_key
 72            try:
 73                jwt_decode(
 74                    id_token_hint,
 75                    key,
 76                    algorithms=[alg],
 77                    audience=provider.client_id,
 78                    issuer=provider.get_issuer(self.request),
 79                    # ID Tokens are short-lived; a logout request arriving
 80                    # after expiry is still legitimate and must succeed.
 81                    options={"verify_exp": False},
 82                )
 83            except PyJWTError:
 84                raise TokenError("invalid_request").with_cause(
 85                    "id_token_hint_decode_failed"
 86                ) from None
 87
 88        # Validate post_logout_redirect_uri against registered URIs
 89        if request_redirect_uri:
 90            # OIDC Certification: id_token_hint required with post_logout_redirect_uri
 91            if not id_token_hint:
 92                raise TokenError("invalid_request").with_cause("id_token_hint_missing")
 93            if urlparse(request_redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
 94                raise TokenError("invalid_request").with_cause("post_logout_redirect_uri")
 95            for allowed in self.provider.post_logout_redirect_uris:
 96                if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
 97                    if request_redirect_uri == allowed.url:
 98                        self.post_logout_redirect_uri = request_redirect_uri
 99                        break
100                elif allowed.matching_mode == RedirectURIMatchingMode.REGEX:
101                    if fullmatch(allowed.url, request_redirect_uri):
102                        self.post_logout_redirect_uri = request_redirect_uri
103                        break
104            # OIDC Certification: OP MUST NOT perform post-logout redirection
105            # if the supplied URI does not exactly match a registered one
106            if self.post_logout_redirect_uri is None:
107                raise TokenError("invalid_request").with_cause("invalid_post_logout_redirect_uri")
108
109        # Append state to the redirect URI if both are present
110        if self.post_logout_redirect_uri and state:
111            separator = "&" if "?" in self.post_logout_redirect_uri else "?"
112            self.post_logout_redirect_uri = (
113                f"{self.post_logout_redirect_uri}{separator}state={quote(state, safe='')}"
114            )
def dispatch( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
118    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
119        """Check for active logout flow before policy checks"""
120
121        # Check if we're already in an active logout flow
122        # (being called from an iframe during single logout)
123        if SESSION_KEY_PLAN in request.session:
124            return HttpResponse(
125                "<html><body>Logout successful</body></html>", content_type="text/html", status=200
126            )
127
128        return super().dispatch(request, *args, **kwargs)

Check for active logout flow before policy checks

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
130    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
131        """Dispatch the flow planner for the invalidation flow"""
132        try:
133            self.validate()
134        except TokenError as exc:
135            return bad_request_message(
136                self.request,
137                exc.description,
138            )
139        planner = FlowPlanner(self.flow)
140        planner.allow_empty_flows = True
141
142        context = {
143            PLAN_CONTEXT_APPLICATION: self.application,
144        }
145
146        auth_session = AuthenticatedSession.from_request(request, request.user)
147
148        if self.post_logout_redirect_uri:
149            context[PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI] = self.post_logout_redirect_uri
150
151        session_key = (
152            auth_session.session.session_key if auth_session and auth_session.session else None
153        )
154
155        frontchannel_logout_url = None
156        if self.provider.logout_method == OAuth2LogoutMethod.FRONTCHANNEL:
157            frontchannel_logout_url = build_frontchannel_logout_url(
158                self.provider, request, session_key
159            )
160
161        if (
162            self.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL
163            and self.provider.logout_uri
164        ):
165            access_token = AccessToken.objects.filter(
166                user=request.user,
167                provider=self.provider,
168                session=auth_session,
169            ).first()
170            if access_token and access_token.id_token:
171                send_backchannel_logout_request.send(
172                    self.provider.pk,
173                    access_token.id_token.iss,
174                    access_token.id_token.sub,
175                    session_key,
176                )
177                # Delete the token to prevent duplicate backchannel logout
178                # when UserLogoutStage triggers the session deletion signal
179                access_token.delete()
180
181        if frontchannel_logout_url:
182            context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = [
183                {
184                    "url": frontchannel_logout_url,
185                    "provider_name": self.provider.name,
186                    "binding": OAUTH2_BINDING,
187                    "provider_type": (
188                        f"{self.provider._meta.app_label}.{self.provider._meta.model_name}"
189                    ),
190                }
191            ]
192
193        access_tokens = AccessToken.objects.filter(
194            user=request.user,
195            provider=self.provider,
196        )
197        if auth_session:
198            access_tokens = access_tokens.filter(session=auth_session)
199        access_tokens.delete()
200
201        plan = planner.plan(request, context)
202
203        if frontchannel_logout_url:
204            plan.insert_stage(in_memory_stage(IframeLogoutStageView))
205
206        plan.append_stage(in_memory_stage(SessionEndStage))
207        return plan.to_redirect(self.request, self.flow)

Dispatch the flow planner for the invalidation flow

def post( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
209    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
210        """Handle POST requests for logout (same as GET per OIDC spec)"""
211        return self.get(request, *args, **kwargs)

Handle POST requests for logout (same as GET per OIDC spec)