authentik.providers.saml.signals

SAML Provider signals

  1"""SAML Provider signals"""
  2
  3from django.db.models.signals import post_save, pre_delete
  4from django.dispatch import receiver
  5from django.http import HttpRequest
  6from django.urls import reverse
  7from structlog.stdlib import get_logger
  8
  9from authentik.core.models import AuthenticatedSession, User
 10from authentik.flows.models import in_memory_stage
 11from authentik.flows.views.executor import FlowExecutorView
 12from authentik.providers.iframe_logout import IframeLogoutStageView
 13from authentik.providers.saml.models import SAMLBindings, SAMLLogoutMethods, SAMLSession
 14from authentik.providers.saml.native_logout import NativeLogoutStageView
 15from authentik.providers.saml.processors.logout_request import LogoutRequestProcessor
 16from authentik.providers.saml.tasks import send_saml_logout_request
 17from authentik.providers.saml.views.flows import (
 18    PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS,
 19    PLAN_CONTEXT_SAML_LOGOUT_NATIVE_SESSIONS,
 20    PLAN_CONTEXT_SAML_RELAY_STATE,
 21)
 22from authentik.stages.user_logout.models import UserLogoutStage
 23from authentik.stages.user_logout.stage import flow_pre_user_logout
 24
 25LOGGER = get_logger()
 26
 27
 28@receiver(flow_pre_user_logout)
 29def handle_saml_iframe_pre_user_logout(
 30    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
 31):
 32    """Handle SAML iframe logout when user logs out via flow"""
 33
 34    # Only proceed if this is actually a UserLogoutStage
 35    if not isinstance(executor.current_stage, UserLogoutStage):
 36        return
 37
 38    if not user.is_authenticated:
 39        return
 40
 41    auth_session = AuthenticatedSession.from_request(request, user)
 42    if not auth_session:
 43        return
 44
 45    iframe_saml_sessions = (
 46        SAMLSession.objects.filter(
 47            session=auth_session,
 48            user=user,
 49            provider__logout_method=SAMLLogoutMethods.FRONTCHANNEL_IFRAME,
 50        )
 51        .exclude(provider__sls_url="")
 52        .select_related("provider")
 53    )
 54
 55    if not iframe_saml_sessions.exists():
 56        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
 57        return
 58
 59    saml_sessions = []
 60
 61    relay_state = request.build_absolute_uri(
 62        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
 63    )
 64
 65    # Store return URL in plan context as fallback if SP doesn't echo relay_state
 66    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
 67
 68    for session in iframe_saml_sessions:
 69        try:
 70            processor = LogoutRequestProcessor(
 71                provider=session.provider,
 72                user=None,  # User context not needed for logout URL generation
 73                destination=session.provider.sls_url,
 74                name_id=session.name_id,
 75                name_id_format=session.name_id_format,
 76                session_index=session.session_index,
 77                relay_state=relay_state,
 78                issuer=session.issuer,
 79            )
 80
 81            if session.provider.sls_binding == SAMLBindings.POST:
 82                form_data = processor.get_post_form_data()
 83                logout_data = {
 84                    "url": session.provider.sls_url,
 85                    "saml_request": form_data["SAMLRequest"],
 86                    "provider_name": session.provider.name,
 87                    "binding": SAMLBindings.POST,
 88                }
 89            else:
 90                logout_url = processor.get_redirect_url()
 91                logout_data = {
 92                    "url": logout_url,
 93                    "provider_name": session.provider.name,
 94                    "binding": SAMLBindings.REDIRECT,
 95                }
 96
 97            saml_sessions.append(logout_data)
 98        except (KeyError, AttributeError) as exc:
 99            LOGGER.warning(
100                "Failed to generate SAML logout URL",
101                provider=session.provider.name,
102                exc=exc,
103            )
104
105    if saml_sessions:
106        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions
107        # Stage already exists, don't reinject it
108        if not any(
109            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
110        ):
111            iframe_stage = in_memory_stage(IframeLogoutStageView)
112            executor.plan.insert_stage(iframe_stage, index=1)
113
114        LOGGER.debug("saml iframe sessions gathered")
115
116
117@receiver(flow_pre_user_logout)
118def handle_flow_pre_user_logout(
119    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
120):
121    """Handle SAML native logout when user logs out via logout flow"""
122
123    # Only proceed if this is actually a UserLogoutStage
124    if not isinstance(executor.current_stage, UserLogoutStage):
125        return
126
127    if not user.is_authenticated:
128        return
129
130    auth_session = AuthenticatedSession.from_request(request, user)
131    if not auth_session:
132        return
133
134    native_saml_sessions = (
135        SAMLSession.objects.filter(
136            session=auth_session,
137            user=user,
138            provider__logout_method=SAMLLogoutMethods.FRONTCHANNEL_NATIVE,
139        )
140        .exclude(provider__sls_url="")
141        .select_related("provider")
142    )
143
144    if not native_saml_sessions.exists():
145        return
146
147    native_sessions = []
148
149    # Generate return URL back to the flow using the interface URL
150    relay_state = request.build_absolute_uri(
151        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
152    )
153
154    # Store return URL in plan context as fallback if SP doesn't echo relay_state
155    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
156
157    for session in native_saml_sessions:
158        try:
159            processor = LogoutRequestProcessor(
160                provider=session.provider,
161                user=None,  # User is already logged out
162                destination=session.provider.sls_url,
163                name_id=session.name_id,
164                name_id_format=session.name_id_format,
165                session_index=session.session_index,
166                relay_state=relay_state,
167                issuer=session.issuer,
168            )
169
170            if session.provider.sls_binding == SAMLBindings.POST:
171                form_data = processor.get_post_form_data()
172                logout_data = {
173                    "post_url": session.provider.sls_url,
174                    "saml_request": form_data["SAMLRequest"],
175                    "saml_relay_state": form_data["RelayState"],
176                    "provider_name": session.provider.name,
177                    "saml_binding": SAMLBindings.POST,
178                }
179            else:
180                logout_url = processor.get_redirect_url()
181                logout_data = {
182                    "redirect_url": logout_url,
183                    "provider_name": session.provider.name,
184                    "saml_binding": SAMLBindings.REDIRECT,
185                }
186
187            native_sessions.append(logout_data)
188        except (KeyError, AttributeError) as exc:
189            LOGGER.warning(
190                "Failed to generate SAML native logout data",
191                provider=session.provider.name,
192                exc=exc,
193            )
194
195    if native_sessions:
196        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_NATIVE_SESSIONS] = native_sessions
197        native_logout_stage = in_memory_stage(NativeLogoutStageView)
198        executor.plan.insert_stage(native_logout_stage, index=2)
199
200
201@receiver(pre_delete, sender=AuthenticatedSession)
202def user_session_deleted_saml_logout(sender, instance: AuthenticatedSession, **_):
203    """Send SAML backchannel logout requests when user session is deleted"""
204
205    backchannel_saml_sessions = (
206        SAMLSession.objects.filter(
207            session=instance,
208            provider__logout_method=SAMLLogoutMethods.BACKCHANNEL,
209            provider__sls_binding=SAMLBindings.POST,
210        )
211        .exclude(provider__sls_url="")
212        .select_related("provider", "user")
213    )
214
215    for saml_session in backchannel_saml_sessions:
216        LOGGER.info(
217            "Triggering backchannel SAML logout for deleted user session",
218            user=saml_session.user,
219            provider=saml_session.provider.name,
220            session_index=saml_session.session_index,
221        )
222
223        send_saml_logout_request.send(
224            provider_pk=saml_session.provider.pk,
225            sls_url=saml_session.provider.sls_url,
226            name_id=saml_session.name_id,
227            name_id_format=saml_session.name_id_format,
228            session_index=saml_session.session_index,
229            issuer=saml_session.issuer,
230        )
231
232
233@receiver(post_save, sender=User)
234def user_deactivated_saml_logout(sender, instance: User, **kwargs):
235    """Send SAML backchannel logout requests when user is deactivated"""
236    if instance.is_active:
237        return
238
239    backchannel_saml_sessions = (
240        SAMLSession.objects.filter(
241            user=instance,
242            provider__logout_method=SAMLLogoutMethods.BACKCHANNEL,
243            provider__sls_binding=SAMLBindings.POST,
244        )
245        .exclude(provider__sls_url="")
246        .select_related("provider")
247    )
248
249    for saml_session in backchannel_saml_sessions:
250        LOGGER.info(
251            "Triggering backchannel SAML logout for deactivated user",
252            user=instance,
253            provider=saml_session.provider.name,
254            session_index=saml_session.session_index,
255        )
256
257        send_saml_logout_request.send(
258            provider_pk=saml_session.provider.pk,
259            sls_url=saml_session.provider.sls_url,
260            name_id=saml_session.name_id,
261            name_id_format=saml_session.name_id_format,
262            session_index=saml_session.session_index,
263            issuer=saml_session.issuer,
264        )
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(flow_pre_user_logout)
def handle_saml_iframe_pre_user_logout( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, executor: authentik.flows.views.executor.FlowExecutorView, **kwargs):
 29@receiver(flow_pre_user_logout)
 30def handle_saml_iframe_pre_user_logout(
 31    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
 32):
 33    """Handle SAML iframe logout when user logs out via flow"""
 34
 35    # Only proceed if this is actually a UserLogoutStage
 36    if not isinstance(executor.current_stage, UserLogoutStage):
 37        return
 38
 39    if not user.is_authenticated:
 40        return
 41
 42    auth_session = AuthenticatedSession.from_request(request, user)
 43    if not auth_session:
 44        return
 45
 46    iframe_saml_sessions = (
 47        SAMLSession.objects.filter(
 48            session=auth_session,
 49            user=user,
 50            provider__logout_method=SAMLLogoutMethods.FRONTCHANNEL_IFRAME,
 51        )
 52        .exclude(provider__sls_url="")
 53        .select_related("provider")
 54    )
 55
 56    if not iframe_saml_sessions.exists():
 57        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
 58        return
 59
 60    saml_sessions = []
 61
 62    relay_state = request.build_absolute_uri(
 63        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
 64    )
 65
 66    # Store return URL in plan context as fallback if SP doesn't echo relay_state
 67    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
 68
 69    for session in iframe_saml_sessions:
 70        try:
 71            processor = LogoutRequestProcessor(
 72                provider=session.provider,
 73                user=None,  # User context not needed for logout URL generation
 74                destination=session.provider.sls_url,
 75                name_id=session.name_id,
 76                name_id_format=session.name_id_format,
 77                session_index=session.session_index,
 78                relay_state=relay_state,
 79                issuer=session.issuer,
 80            )
 81
 82            if session.provider.sls_binding == SAMLBindings.POST:
 83                form_data = processor.get_post_form_data()
 84                logout_data = {
 85                    "url": session.provider.sls_url,
 86                    "saml_request": form_data["SAMLRequest"],
 87                    "provider_name": session.provider.name,
 88                    "binding": SAMLBindings.POST,
 89                }
 90            else:
 91                logout_url = processor.get_redirect_url()
 92                logout_data = {
 93                    "url": logout_url,
 94                    "provider_name": session.provider.name,
 95                    "binding": SAMLBindings.REDIRECT,
 96                }
 97
 98            saml_sessions.append(logout_data)
 99        except (KeyError, AttributeError) as exc:
100            LOGGER.warning(
101                "Failed to generate SAML logout URL",
102                provider=session.provider.name,
103                exc=exc,
104            )
105
106    if saml_sessions:
107        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions
108        # Stage already exists, don't reinject it
109        if not any(
110            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
111        ):
112            iframe_stage = in_memory_stage(IframeLogoutStageView)
113            executor.plan.insert_stage(iframe_stage, index=1)
114
115        LOGGER.debug("saml iframe sessions gathered")

Handle SAML iframe logout when user logs out via flow

@receiver(flow_pre_user_logout)
def handle_flow_pre_user_logout( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, executor: authentik.flows.views.executor.FlowExecutorView, **kwargs):
118@receiver(flow_pre_user_logout)
119def handle_flow_pre_user_logout(
120    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
121):
122    """Handle SAML native logout when user logs out via logout flow"""
123
124    # Only proceed if this is actually a UserLogoutStage
125    if not isinstance(executor.current_stage, UserLogoutStage):
126        return
127
128    if not user.is_authenticated:
129        return
130
131    auth_session = AuthenticatedSession.from_request(request, user)
132    if not auth_session:
133        return
134
135    native_saml_sessions = (
136        SAMLSession.objects.filter(
137            session=auth_session,
138            user=user,
139            provider__logout_method=SAMLLogoutMethods.FRONTCHANNEL_NATIVE,
140        )
141        .exclude(provider__sls_url="")
142        .select_related("provider")
143    )
144
145    if not native_saml_sessions.exists():
146        return
147
148    native_sessions = []
149
150    # Generate return URL back to the flow using the interface URL
151    relay_state = request.build_absolute_uri(
152        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
153    )
154
155    # Store return URL in plan context as fallback if SP doesn't echo relay_state
156    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
157
158    for session in native_saml_sessions:
159        try:
160            processor = LogoutRequestProcessor(
161                provider=session.provider,
162                user=None,  # User is already logged out
163                destination=session.provider.sls_url,
164                name_id=session.name_id,
165                name_id_format=session.name_id_format,
166                session_index=session.session_index,
167                relay_state=relay_state,
168                issuer=session.issuer,
169            )
170
171            if session.provider.sls_binding == SAMLBindings.POST:
172                form_data = processor.get_post_form_data()
173                logout_data = {
174                    "post_url": session.provider.sls_url,
175                    "saml_request": form_data["SAMLRequest"],
176                    "saml_relay_state": form_data["RelayState"],
177                    "provider_name": session.provider.name,
178                    "saml_binding": SAMLBindings.POST,
179                }
180            else:
181                logout_url = processor.get_redirect_url()
182                logout_data = {
183                    "redirect_url": logout_url,
184                    "provider_name": session.provider.name,
185                    "saml_binding": SAMLBindings.REDIRECT,
186                }
187
188            native_sessions.append(logout_data)
189        except (KeyError, AttributeError) as exc:
190            LOGGER.warning(
191                "Failed to generate SAML native logout data",
192                provider=session.provider.name,
193                exc=exc,
194            )
195
196    if native_sessions:
197        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_NATIVE_SESSIONS] = native_sessions
198        native_logout_stage = in_memory_stage(NativeLogoutStageView)
199        executor.plan.insert_stage(native_logout_stage, index=2)

Handle SAML native logout when user logs out via logout flow

@receiver(pre_delete, sender=AuthenticatedSession)
def user_session_deleted_saml_logout(sender, instance: authentik.core.models.AuthenticatedSession, **_):
202@receiver(pre_delete, sender=AuthenticatedSession)
203def user_session_deleted_saml_logout(sender, instance: AuthenticatedSession, **_):
204    """Send SAML backchannel logout requests when user session is deleted"""
205
206    backchannel_saml_sessions = (
207        SAMLSession.objects.filter(
208            session=instance,
209            provider__logout_method=SAMLLogoutMethods.BACKCHANNEL,
210            provider__sls_binding=SAMLBindings.POST,
211        )
212        .exclude(provider__sls_url="")
213        .select_related("provider", "user")
214    )
215
216    for saml_session in backchannel_saml_sessions:
217        LOGGER.info(
218            "Triggering backchannel SAML logout for deleted user session",
219            user=saml_session.user,
220            provider=saml_session.provider.name,
221            session_index=saml_session.session_index,
222        )
223
224        send_saml_logout_request.send(
225            provider_pk=saml_session.provider.pk,
226            sls_url=saml_session.provider.sls_url,
227            name_id=saml_session.name_id,
228            name_id_format=saml_session.name_id_format,
229            session_index=saml_session.session_index,
230            issuer=saml_session.issuer,
231        )

Send SAML backchannel logout requests when user session is deleted

@receiver(post_save, sender=User)
def user_deactivated_saml_logout(sender, instance: authentik.core.models.User, **kwargs):
234@receiver(post_save, sender=User)
235def user_deactivated_saml_logout(sender, instance: User, **kwargs):
236    """Send SAML backchannel logout requests when user is deactivated"""
237    if instance.is_active:
238        return
239
240    backchannel_saml_sessions = (
241        SAMLSession.objects.filter(
242            user=instance,
243            provider__logout_method=SAMLLogoutMethods.BACKCHANNEL,
244            provider__sls_binding=SAMLBindings.POST,
245        )
246        .exclude(provider__sls_url="")
247        .select_related("provider")
248    )
249
250    for saml_session in backchannel_saml_sessions:
251        LOGGER.info(
252            "Triggering backchannel SAML logout for deactivated user",
253            user=instance,
254            provider=saml_session.provider.name,
255            session_index=saml_session.session_index,
256        )
257
258        send_saml_logout_request.send(
259            provider_pk=saml_session.provider.pk,
260            sls_url=saml_session.provider.sls_url,
261            name_id=saml_session.name_id,
262            name_id_format=saml_session.name_id_format,
263            session_index=saml_session.session_index,
264            issuer=saml_session.issuer,
265        )

Send SAML backchannel logout requests when user is deactivated