authentik.providers.saml.signals

SAML Provider signals

  1"""SAML Provider signals"""
  2
  3from django.db.models.signals import post_save, pre_delete
  4from django.dispatch import receiver
  5from django.http import HttpRequest
  6from django.urls import reverse
  7from structlog.stdlib import get_logger
  8
  9from authentik.core.models import AuthenticatedSession, User
 10from authentik.flows.models import in_memory_stage
 11from authentik.flows.views.executor import FlowExecutorView
 12from authentik.providers.iframe_logout import IframeLogoutStageView
 13from authentik.providers.saml.models import SAMLBindings, SAMLLogoutMethods, SAMLSession
 14from authentik.providers.saml.native_logout import NativeLogoutStageView
 15from authentik.providers.saml.processors.logout_request import LogoutRequestProcessor
 16from authentik.providers.saml.tasks import send_saml_logout_request
 17from authentik.providers.saml.views.flows import (
 18    PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS,
 19    PLAN_CONTEXT_SAML_LOGOUT_NATIVE_SESSIONS,
 20    PLAN_CONTEXT_SAML_RELAY_STATE,
 21)
 22from authentik.stages.user_logout.models import UserLogoutStage
 23from authentik.stages.user_logout.stage import flow_pre_user_logout
 24
 25LOGGER = get_logger()
 26
 27
 28@receiver(flow_pre_user_logout)
 29def handle_saml_iframe_pre_user_logout(
 30    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
 31):
 32    """Handle SAML iframe logout when user logs out via flow"""
 33
 34    # Only proceed if this is actually a UserLogoutStage
 35    if not isinstance(executor.current_stage, UserLogoutStage):
 36        return
 37
 38    if not user.is_authenticated:
 39        return
 40
 41    auth_session = AuthenticatedSession.from_request(request, user)
 42    if not auth_session:
 43        return
 44
 45    iframe_saml_sessions = (
 46        SAMLSession.objects.filter(
 47            session=auth_session,
 48            user=user,
 49            provider__logout_method=SAMLLogoutMethods.FRONTCHANNEL_IFRAME,
 50        )
 51        .exclude(provider__sls_url="")
 52        .select_related("provider")
 53    )
 54
 55    if not iframe_saml_sessions.exists():
 56        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
 57        return
 58
 59    saml_sessions = []
 60
 61    relay_state = request.build_absolute_uri(
 62        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
 63    )
 64
 65    # Store return URL in plan context as fallback if SP doesn't echo relay_state
 66    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
 67
 68    for session in iframe_saml_sessions:
 69        try:
 70            processor = LogoutRequestProcessor(
 71                provider=session.provider,
 72                user=None,  # User context not needed for logout URL generation
 73                destination=session.provider.sls_url,
 74                name_id=session.name_id,
 75                name_id_format=session.name_id_format,
 76                session_index=session.session_index,
 77                relay_state=relay_state,
 78            )
 79
 80            if session.provider.sls_binding == SAMLBindings.POST:
 81                form_data = processor.get_post_form_data()
 82                logout_data = {
 83                    "url": session.provider.sls_url,
 84                    "saml_request": form_data["SAMLRequest"],
 85                    "provider_name": session.provider.name,
 86                    "binding": SAMLBindings.POST,
 87                }
 88            else:
 89                logout_url = processor.get_redirect_url()
 90                logout_data = {
 91                    "url": logout_url,
 92                    "provider_name": session.provider.name,
 93                    "binding": SAMLBindings.REDIRECT,
 94                }
 95
 96            saml_sessions.append(logout_data)
 97        except (KeyError, AttributeError) as exc:
 98            LOGGER.warning(
 99                "Failed to generate SAML logout URL",
100                provider=session.provider.name,
101                exc=exc,
102            )
103
104    if saml_sessions:
105        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions
106        # Stage already exists, don't reinject it
107        if not any(
108            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
109        ):
110            iframe_stage = in_memory_stage(IframeLogoutStageView)
111            executor.plan.insert_stage(iframe_stage, index=1)
112
113        LOGGER.debug("saml iframe sessions gathered")
114
115
116@receiver(flow_pre_user_logout)
117def handle_flow_pre_user_logout(
118    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
119):
120    """Handle SAML native logout when user logs out via logout flow"""
121
122    # Only proceed if this is actually a UserLogoutStage
123    if not isinstance(executor.current_stage, UserLogoutStage):
124        return
125
126    if not user.is_authenticated:
127        return
128
129    auth_session = AuthenticatedSession.from_request(request, user)
130    if not auth_session:
131        return
132
133    native_saml_sessions = (
134        SAMLSession.objects.filter(
135            session=auth_session,
136            user=user,
137            provider__logout_method=SAMLLogoutMethods.FRONTCHANNEL_NATIVE,
138        )
139        .exclude(provider__sls_url="")
140        .select_related("provider")
141    )
142
143    if not native_saml_sessions.exists():
144        return
145
146    native_sessions = []
147
148    # Generate return URL back to the flow using the interface URL
149    relay_state = request.build_absolute_uri(
150        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
151    )
152
153    # Store return URL in plan context as fallback if SP doesn't echo relay_state
154    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
155
156    for session in native_saml_sessions:
157        try:
158            processor = LogoutRequestProcessor(
159                provider=session.provider,
160                user=None,  # User is already logged out
161                destination=session.provider.sls_url,
162                name_id=session.name_id,
163                name_id_format=session.name_id_format,
164                session_index=session.session_index,
165                relay_state=relay_state,
166            )
167
168            if session.provider.sls_binding == SAMLBindings.POST:
169                form_data = processor.get_post_form_data()
170                logout_data = {
171                    "post_url": session.provider.sls_url,
172                    "saml_request": form_data["SAMLRequest"],
173                    "saml_relay_state": form_data["RelayState"],
174                    "provider_name": session.provider.name,
175                    "saml_binding": SAMLBindings.POST,
176                }
177            else:
178                logout_url = processor.get_redirect_url()
179                logout_data = {
180                    "redirect_url": logout_url,
181                    "provider_name": session.provider.name,
182                    "saml_binding": SAMLBindings.REDIRECT,
183                }
184
185            native_sessions.append(logout_data)
186        except (KeyError, AttributeError) as exc:
187            LOGGER.warning(
188                "Failed to generate SAML native logout data",
189                provider=session.provider.name,
190                exc=exc,
191            )
192
193    if native_sessions:
194        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_NATIVE_SESSIONS] = native_sessions
195        native_logout_stage = in_memory_stage(NativeLogoutStageView)
196        executor.plan.insert_stage(native_logout_stage, index=2)
197
198
199@receiver(pre_delete, sender=AuthenticatedSession)
200def user_session_deleted_saml_logout(sender, instance: AuthenticatedSession, **_):
201    """Send SAML backchannel logout requests when user session is deleted"""
202
203    backchannel_saml_sessions = (
204        SAMLSession.objects.filter(
205            session=instance,
206            provider__logout_method=SAMLLogoutMethods.BACKCHANNEL,
207            provider__sls_binding=SAMLBindings.POST,
208        )
209        .exclude(provider__sls_url="")
210        .select_related("provider", "user")
211    )
212
213    for saml_session in backchannel_saml_sessions:
214        LOGGER.info(
215            "Triggering backchannel SAML logout for deleted user session",
216            user=saml_session.user,
217            provider=saml_session.provider.name,
218            session_index=saml_session.session_index,
219        )
220
221        send_saml_logout_request.send(
222            provider_pk=saml_session.provider.pk,
223            sls_url=saml_session.provider.sls_url,
224            name_id=saml_session.name_id,
225            name_id_format=saml_session.name_id_format,
226            session_index=saml_session.session_index,
227        )
228
229
230@receiver(post_save, sender=User)
231def user_deactivated_saml_logout(sender, instance: User, **kwargs):
232    """Send SAML backchannel logout requests when user is deactivated"""
233    if instance.is_active:
234        return
235
236    backchannel_saml_sessions = (
237        SAMLSession.objects.filter(
238            user=instance,
239            provider__logout_method=SAMLLogoutMethods.BACKCHANNEL,
240            provider__sls_binding=SAMLBindings.POST,
241        )
242        .exclude(provider__sls_url="")
243        .select_related("provider")
244    )
245
246    for saml_session in backchannel_saml_sessions:
247        LOGGER.info(
248            "Triggering backchannel SAML logout for deactivated user",
249            user=instance,
250            provider=saml_session.provider.name,
251            session_index=saml_session.session_index,
252        )
253
254        send_saml_logout_request.send(
255            provider_pk=saml_session.provider.pk,
256            sls_url=saml_session.provider.sls_url,
257            name_id=saml_session.name_id,
258            name_id_format=saml_session.name_id_format,
259            session_index=saml_session.session_index,
260        )
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(flow_pre_user_logout)
def handle_saml_iframe_pre_user_logout( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, executor: authentik.flows.views.executor.FlowExecutorView, **kwargs):
 29@receiver(flow_pre_user_logout)
 30def handle_saml_iframe_pre_user_logout(
 31    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
 32):
 33    """Handle SAML iframe logout when user logs out via flow"""
 34
 35    # Only proceed if this is actually a UserLogoutStage
 36    if not isinstance(executor.current_stage, UserLogoutStage):
 37        return
 38
 39    if not user.is_authenticated:
 40        return
 41
 42    auth_session = AuthenticatedSession.from_request(request, user)
 43    if not auth_session:
 44        return
 45
 46    iframe_saml_sessions = (
 47        SAMLSession.objects.filter(
 48            session=auth_session,
 49            user=user,
 50            provider__logout_method=SAMLLogoutMethods.FRONTCHANNEL_IFRAME,
 51        )
 52        .exclude(provider__sls_url="")
 53        .select_related("provider")
 54    )
 55
 56    if not iframe_saml_sessions.exists():
 57        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
 58        return
 59
 60    saml_sessions = []
 61
 62    relay_state = request.build_absolute_uri(
 63        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
 64    )
 65
 66    # Store return URL in plan context as fallback if SP doesn't echo relay_state
 67    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
 68
 69    for session in iframe_saml_sessions:
 70        try:
 71            processor = LogoutRequestProcessor(
 72                provider=session.provider,
 73                user=None,  # User context not needed for logout URL generation
 74                destination=session.provider.sls_url,
 75                name_id=session.name_id,
 76                name_id_format=session.name_id_format,
 77                session_index=session.session_index,
 78                relay_state=relay_state,
 79            )
 80
 81            if session.provider.sls_binding == SAMLBindings.POST:
 82                form_data = processor.get_post_form_data()
 83                logout_data = {
 84                    "url": session.provider.sls_url,
 85                    "saml_request": form_data["SAMLRequest"],
 86                    "provider_name": session.provider.name,
 87                    "binding": SAMLBindings.POST,
 88                }
 89            else:
 90                logout_url = processor.get_redirect_url()
 91                logout_data = {
 92                    "url": logout_url,
 93                    "provider_name": session.provider.name,
 94                    "binding": SAMLBindings.REDIRECT,
 95                }
 96
 97            saml_sessions.append(logout_data)
 98        except (KeyError, AttributeError) as exc:
 99            LOGGER.warning(
100                "Failed to generate SAML logout URL",
101                provider=session.provider.name,
102                exc=exc,
103            )
104
105    if saml_sessions:
106        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions
107        # Stage already exists, don't reinject it
108        if not any(
109            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
110        ):
111            iframe_stage = in_memory_stage(IframeLogoutStageView)
112            executor.plan.insert_stage(iframe_stage, index=1)
113
114        LOGGER.debug("saml iframe sessions gathered")

Handle SAML iframe logout when user logs out via flow

@receiver(flow_pre_user_logout)
def handle_flow_pre_user_logout( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, executor: authentik.flows.views.executor.FlowExecutorView, **kwargs):
117@receiver(flow_pre_user_logout)
118def handle_flow_pre_user_logout(
119    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
120):
121    """Handle SAML native logout when user logs out via logout flow"""
122
123    # Only proceed if this is actually a UserLogoutStage
124    if not isinstance(executor.current_stage, UserLogoutStage):
125        return
126
127    if not user.is_authenticated:
128        return
129
130    auth_session = AuthenticatedSession.from_request(request, user)
131    if not auth_session:
132        return
133
134    native_saml_sessions = (
135        SAMLSession.objects.filter(
136            session=auth_session,
137            user=user,
138            provider__logout_method=SAMLLogoutMethods.FRONTCHANNEL_NATIVE,
139        )
140        .exclude(provider__sls_url="")
141        .select_related("provider")
142    )
143
144    if not native_saml_sessions.exists():
145        return
146
147    native_sessions = []
148
149    # Generate return URL back to the flow using the interface URL
150    relay_state = request.build_absolute_uri(
151        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
152    )
153
154    # Store return URL in plan context as fallback if SP doesn't echo relay_state
155    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
156
157    for session in native_saml_sessions:
158        try:
159            processor = LogoutRequestProcessor(
160                provider=session.provider,
161                user=None,  # User is already logged out
162                destination=session.provider.sls_url,
163                name_id=session.name_id,
164                name_id_format=session.name_id_format,
165                session_index=session.session_index,
166                relay_state=relay_state,
167            )
168
169            if session.provider.sls_binding == SAMLBindings.POST:
170                form_data = processor.get_post_form_data()
171                logout_data = {
172                    "post_url": session.provider.sls_url,
173                    "saml_request": form_data["SAMLRequest"],
174                    "saml_relay_state": form_data["RelayState"],
175                    "provider_name": session.provider.name,
176                    "saml_binding": SAMLBindings.POST,
177                }
178            else:
179                logout_url = processor.get_redirect_url()
180                logout_data = {
181                    "redirect_url": logout_url,
182                    "provider_name": session.provider.name,
183                    "saml_binding": SAMLBindings.REDIRECT,
184                }
185
186            native_sessions.append(logout_data)
187        except (KeyError, AttributeError) as exc:
188            LOGGER.warning(
189                "Failed to generate SAML native logout data",
190                provider=session.provider.name,
191                exc=exc,
192            )
193
194    if native_sessions:
195        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_NATIVE_SESSIONS] = native_sessions
196        native_logout_stage = in_memory_stage(NativeLogoutStageView)
197        executor.plan.insert_stage(native_logout_stage, index=2)

Handle SAML native logout when user logs out via logout flow

@receiver(pre_delete, sender=AuthenticatedSession)
def user_session_deleted_saml_logout(sender, instance: authentik.core.models.AuthenticatedSession, **_):
200@receiver(pre_delete, sender=AuthenticatedSession)
201def user_session_deleted_saml_logout(sender, instance: AuthenticatedSession, **_):
202    """Send SAML backchannel logout requests when user session is deleted"""
203
204    backchannel_saml_sessions = (
205        SAMLSession.objects.filter(
206            session=instance,
207            provider__logout_method=SAMLLogoutMethods.BACKCHANNEL,
208            provider__sls_binding=SAMLBindings.POST,
209        )
210        .exclude(provider__sls_url="")
211        .select_related("provider", "user")
212    )
213
214    for saml_session in backchannel_saml_sessions:
215        LOGGER.info(
216            "Triggering backchannel SAML logout for deleted user session",
217            user=saml_session.user,
218            provider=saml_session.provider.name,
219            session_index=saml_session.session_index,
220        )
221
222        send_saml_logout_request.send(
223            provider_pk=saml_session.provider.pk,
224            sls_url=saml_session.provider.sls_url,
225            name_id=saml_session.name_id,
226            name_id_format=saml_session.name_id_format,
227            session_index=saml_session.session_index,
228        )

Send SAML backchannel logout requests when user session is deleted

@receiver(post_save, sender=User)
def user_deactivated_saml_logout(sender, instance: authentik.core.models.User, **kwargs):
231@receiver(post_save, sender=User)
232def user_deactivated_saml_logout(sender, instance: User, **kwargs):
233    """Send SAML backchannel logout requests when user is deactivated"""
234    if instance.is_active:
235        return
236
237    backchannel_saml_sessions = (
238        SAMLSession.objects.filter(
239            user=instance,
240            provider__logout_method=SAMLLogoutMethods.BACKCHANNEL,
241            provider__sls_binding=SAMLBindings.POST,
242        )
243        .exclude(provider__sls_url="")
244        .select_related("provider")
245    )
246
247    for saml_session in backchannel_saml_sessions:
248        LOGGER.info(
249            "Triggering backchannel SAML logout for deactivated user",
250            user=instance,
251            provider=saml_session.provider.name,
252            session_index=saml_session.session_index,
253        )
254
255        send_saml_logout_request.send(
256            provider_pk=saml_session.provider.pk,
257            sls_url=saml_session.provider.sls_url,
258            name_id=saml_session.name_id,
259            name_id_format=saml_session.name_id_format,
260            session_index=saml_session.session_index,
261        )

Send SAML backchannel logout requests when user is deactivated