authentik.enterprise.providers.ws_federation.signals

WS-Fed Provider signals

 1"""WS-Fed Provider signals"""
 2
 3from urllib.parse import urlencode, urlparse, urlunparse
 4
 5from django.dispatch import receiver
 6from django.http import HttpRequest
 7from django.urls import reverse
 8from django.utils import timezone
 9from structlog.stdlib import get_logger
10
11from authentik.core.models import AuthenticatedSession, User
12from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
13from authentik.enterprise.providers.ws_federation.processors.constants import (
14    WS_FED_ACTION_SIGN_OUT_CLEANUP,
15    WS_FED_POST_KEY_ACTION,
16)
17from authentik.flows.models import in_memory_stage
18from authentik.flows.views.executor import FlowExecutorView
19from authentik.providers.iframe_logout import IframeLogoutStageView
20from authentik.providers.saml.models import SAMLBindings, SAMLSession
21from authentik.providers.saml.views.flows import (
22    PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS,
23    PLAN_CONTEXT_SAML_RELAY_STATE,
24)
25from authentik.stages.user_logout.models import UserLogoutStage
26from authentik.stages.user_logout.stage import flow_pre_user_logout
27
28LOGGER = get_logger()
29
30
31@receiver(flow_pre_user_logout)
32def handle_ws_fed_iframe_pre_user_logout(
33    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
34):
35    """Handle WS-Fed iframe logout when user logs out via flow"""
36
37    # Only proceed if this is actually a UserLogoutStage
38    if not isinstance(executor.current_stage, UserLogoutStage):
39        return
40
41    if not user.is_authenticated:
42        return
43
44    auth_session = AuthenticatedSession.from_request(request, user)
45    if not auth_session:
46        return
47
48    wsfed_sessions = SAMLSession.objects.filter(
49        session=auth_session,
50        user=user,
51        expires__gt=timezone.now(),
52        expiring=True,
53        # Only get WS-Federation provider sessions
54        provider__wsfederationprovider__isnull=False,
55    ).select_related("provider__wsfederationprovider")
56
57    if not wsfed_sessions.exists():
58        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
59        return
60
61    saml_sessions = []
62
63    relay_state = request.build_absolute_uri(
64        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
65    )
66
67    # Store return URL in plan context as fallback if SP doesn't echo relay_state
68    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
69
70    for session in wsfed_sessions:
71        provider: WSFederationProvider = session.provider.wsfederationprovider
72        parts = urlparse(str(provider.acs_url))
73        parts = parts._replace(
74            query=urlencode({WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_OUT_CLEANUP})
75        )
76        logout_data = {
77            "url": urlunparse(parts),
78            "provider_name": provider.name,
79            "binding": SAMLBindings.REDIRECT,
80        }
81
82        saml_sessions.append(logout_data)
83
84    if saml_sessions:
85        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions
86        # Stage already exists, don't reinject it
87        if not any(
88            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
89        ):
90            iframe_stage = in_memory_stage(IframeLogoutStageView)
91            executor.plan.insert_stage(iframe_stage, index=1)
92
93        LOGGER.debug("WSFed iframe sessions gathered")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(flow_pre_user_logout)
def handle_ws_fed_iframe_pre_user_logout( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, executor: authentik.flows.views.executor.FlowExecutorView, **kwargs):
32@receiver(flow_pre_user_logout)
33def handle_ws_fed_iframe_pre_user_logout(
34    sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
35):
36    """Handle WS-Fed iframe logout when user logs out via flow"""
37
38    # Only proceed if this is actually a UserLogoutStage
39    if not isinstance(executor.current_stage, UserLogoutStage):
40        return
41
42    if not user.is_authenticated:
43        return
44
45    auth_session = AuthenticatedSession.from_request(request, user)
46    if not auth_session:
47        return
48
49    wsfed_sessions = SAMLSession.objects.filter(
50        session=auth_session,
51        user=user,
52        expires__gt=timezone.now(),
53        expiring=True,
54        # Only get WS-Federation provider sessions
55        provider__wsfederationprovider__isnull=False,
56    ).select_related("provider__wsfederationprovider")
57
58    if not wsfed_sessions.exists():
59        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
60        return
61
62    saml_sessions = []
63
64    relay_state = request.build_absolute_uri(
65        reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
66    )
67
68    # Store return URL in plan context as fallback if SP doesn't echo relay_state
69    executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
70
71    for session in wsfed_sessions:
72        provider: WSFederationProvider = session.provider.wsfederationprovider
73        parts = urlparse(str(provider.acs_url))
74        parts = parts._replace(
75            query=urlencode({WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_OUT_CLEANUP})
76        )
77        logout_data = {
78            "url": urlunparse(parts),
79            "provider_name": provider.name,
80            "binding": SAMLBindings.REDIRECT,
81        }
82
83        saml_sessions.append(logout_data)
84
85    if saml_sessions:
86        executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions
87        # Stage already exists, don't reinject it
88        if not any(
89            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
90        ):
91            iframe_stage = in_memory_stage(IframeLogoutStageView)
92            executor.plan.insert_stage(iframe_stage, index=1)
93
94        LOGGER.debug("WSFed iframe sessions gathered")

Handle WS-Fed iframe logout when user logs out via flow