authentik.enterprise.providers.ws_federation.signals
WS-Fed Provider signals
1"""WS-Fed Provider signals""" 2 3from urllib.parse import urlencode, urlparse, urlunparse 4 5from django.dispatch import receiver 6from django.http import HttpRequest 7from django.urls import reverse 8from django.utils import timezone 9from structlog.stdlib import get_logger 10 11from authentik.core.models import AuthenticatedSession, User 12from authentik.enterprise.providers.ws_federation.models import WSFederationProvider 13from authentik.enterprise.providers.ws_federation.processors.constants import ( 14 WS_FED_ACTION_SIGN_OUT_CLEANUP, 15 WS_FED_POST_KEY_ACTION, 16) 17from authentik.flows.models import in_memory_stage 18from authentik.flows.views.executor import FlowExecutorView 19from authentik.providers.iframe_logout import IframeLogoutStageView 20from authentik.providers.saml.models import SAMLBindings, SAMLSession 21from authentik.providers.saml.views.flows import ( 22 PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS, 23 PLAN_CONTEXT_SAML_RELAY_STATE, 24) 25from authentik.stages.user_logout.models import UserLogoutStage 26from authentik.stages.user_logout.stage import flow_pre_user_logout 27 28LOGGER = get_logger() 29 30 31@receiver(flow_pre_user_logout) 32def handle_ws_fed_iframe_pre_user_logout( 33 sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs 34): 35 """Handle WS-Fed iframe logout when user logs out via flow""" 36 37 # Only proceed if this is actually a UserLogoutStage 38 if not isinstance(executor.current_stage, UserLogoutStage): 39 return 40 41 if not user.is_authenticated: 42 return 43 44 auth_session = AuthenticatedSession.from_request(request, user) 45 if not auth_session: 46 return 47 48 wsfed_sessions = SAMLSession.objects.filter( 49 session=auth_session, 50 user=user, 51 expires__gt=timezone.now(), 52 expiring=True, 53 # Only get WS-Federation provider sessions 54 provider__wsfederationprovider__isnull=False, 55 ).select_related("provider__wsfederationprovider") 56 57 if not wsfed_sessions.exists(): 58 LOGGER.debug("No sessions requiring IFrame frontchannel logout") 59 return 60 61 saml_sessions = [] 62 63 relay_state = request.build_absolute_uri( 64 reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug}) 65 ) 66 67 # Store return URL in plan context as fallback if SP doesn't echo relay_state 68 executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state 69 70 for session in wsfed_sessions: 71 provider: WSFederationProvider = session.provider.wsfederationprovider 72 parts = urlparse(str(provider.acs_url)) 73 parts = parts._replace( 74 query=urlencode({WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_OUT_CLEANUP}) 75 ) 76 logout_data = { 77 "url": urlunparse(parts), 78 "provider_name": provider.name, 79 "binding": SAMLBindings.REDIRECT, 80 } 81 82 saml_sessions.append(logout_data) 83 84 if saml_sessions: 85 executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions 86 # Stage already exists, don't reinject it 87 if not any( 88 binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings 89 ): 90 iframe_stage = in_memory_stage(IframeLogoutStageView) 91 executor.plan.insert_stage(iframe_stage, index=1) 92 93 LOGGER.debug("WSFed iframe sessions gathered")
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(flow_pre_user_logout)
def
handle_ws_fed_iframe_pre_user_logout( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, executor: authentik.flows.views.executor.FlowExecutorView, **kwargs):
32@receiver(flow_pre_user_logout) 33def handle_ws_fed_iframe_pre_user_logout( 34 sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs 35): 36 """Handle WS-Fed iframe logout when user logs out via flow""" 37 38 # Only proceed if this is actually a UserLogoutStage 39 if not isinstance(executor.current_stage, UserLogoutStage): 40 return 41 42 if not user.is_authenticated: 43 return 44 45 auth_session = AuthenticatedSession.from_request(request, user) 46 if not auth_session: 47 return 48 49 wsfed_sessions = SAMLSession.objects.filter( 50 session=auth_session, 51 user=user, 52 expires__gt=timezone.now(), 53 expiring=True, 54 # Only get WS-Federation provider sessions 55 provider__wsfederationprovider__isnull=False, 56 ).select_related("provider__wsfederationprovider") 57 58 if not wsfed_sessions.exists(): 59 LOGGER.debug("No sessions requiring IFrame frontchannel logout") 60 return 61 62 saml_sessions = [] 63 64 relay_state = request.build_absolute_uri( 65 reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug}) 66 ) 67 68 # Store return URL in plan context as fallback if SP doesn't echo relay_state 69 executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state 70 71 for session in wsfed_sessions: 72 provider: WSFederationProvider = session.provider.wsfederationprovider 73 parts = urlparse(str(provider.acs_url)) 74 parts = parts._replace( 75 query=urlencode({WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_OUT_CLEANUP}) 76 ) 77 logout_data = { 78 "url": urlunparse(parts), 79 "provider_name": provider.name, 80 "binding": SAMLBindings.REDIRECT, 81 } 82 83 saml_sessions.append(logout_data) 84 85 if saml_sessions: 86 executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions 87 # Stage already exists, don't reinject it 88 if not any( 89 binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings 90 ): 91 iframe_stage = in_memory_stage(IframeLogoutStageView) 92 executor.plan.insert_stage(iframe_stage, index=1) 93 94 LOGGER.debug("WSFed iframe sessions gathered")
Handle WS-Fed iframe logout when user logs out via flow