authentik.providers.oauth2.signals

  1from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
  2
  3from django.db.models.signals import post_save, pre_delete
  4from django.dispatch import receiver
  5from structlog.stdlib import get_logger
  6
  7from authentik.common.oauth.constants import PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS
  8from authentik.core.models import AuthenticatedSession, User
  9from authentik.flows.models import in_memory_stage
 10from authentik.outposts.tasks import hash_session_key
 11from authentik.providers.iframe_logout import IframeLogoutStageView
 12from authentik.providers.oauth2.models import (
 13    AccessToken,
 14    DeviceToken,
 15    OAuth2LogoutMethod,
 16    RefreshToken,
 17)
 18from authentik.providers.oauth2.tasks import backchannel_logout_notification_dispatch
 19from authentik.stages.user_logout.models import UserLogoutStage
 20from authentik.stages.user_logout.stage import flow_pre_user_logout
 21
 22LOGGER = get_logger()
 23
 24
 25@receiver(flow_pre_user_logout)
 26def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs):
 27    """Handle SAML and OIDC frontchannel logout when user logs out via flow"""
 28
 29    # Only proceed if this is actually a UserLogoutStage
 30    if not isinstance(executor.current_stage, UserLogoutStage):
 31        return
 32
 33    if not user.is_authenticated:
 34        return
 35
 36    auth_session = AuthenticatedSession.from_request(request, user)
 37    if not auth_session:
 38        return
 39
 40    oidc_access_tokens = (
 41        AccessToken.objects.filter(
 42            user=user,
 43            session=auth_session,
 44            provider__logout_method=OAuth2LogoutMethod.FRONTCHANNEL,
 45        )
 46        .exclude(provider__logout_uri="")
 47        .select_related("provider")
 48    )
 49
 50    if not oidc_access_tokens.exists():
 51        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
 52        return
 53
 54    oidc_sessions = []
 55
 56    for token in oidc_access_tokens:
 57        # Parse the logout URI and add query parameters
 58        parsed_url = urlparse(token.provider.logout_uri)
 59
 60        query_params = {}
 61        query_params["iss"] = token.provider.get_issuer(request)
 62        if auth_session.session:
 63            query_params["sid"] = hash_session_key(auth_session.session.session_key)
 64
 65        # Combine existing query params with new ones
 66        if parsed_url.query:
 67            existing_params = parse_qs(parsed_url.query, keep_blank_values=True)
 68            for key, value in existing_params.items():
 69                if key not in query_params:
 70                    query_params[key] = value[0] if len(value) == 1 else value
 71
 72        # Build the final URL with query parameters
 73        logout_url = urlunparse(
 74            (
 75                parsed_url.scheme,
 76                parsed_url.netloc,
 77                parsed_url.path,
 78                parsed_url.params,
 79                urlencode(query_params),
 80                parsed_url.fragment,
 81            )
 82        )
 83
 84        logout_data = {
 85            "url": logout_url,
 86            "provider_name": token.provider.name,
 87            "binding": "redirect",
 88            "provider_type": "oidc",
 89        }
 90        oidc_sessions.append(logout_data)
 91
 92    if oidc_sessions:
 93        executor.plan.context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = oidc_sessions
 94
 95        # Stage already exists, don't reinject it
 96        if not any(
 97            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
 98        ):
 99            iframe_stage = in_memory_stage(IframeLogoutStageView)
100            executor.plan.insert_stage(iframe_stage, index=1)
101
102        LOGGER.debug("Oauth iframe sessions gathered")
103
104
105@receiver(pre_delete, sender=AuthenticatedSession)
106def user_session_deleted_oauth_backchannel_logout_and_tokens_removal(
107    sender, instance: AuthenticatedSession, **_
108):
109    """Revoke tokens upon user logout"""
110    LOGGER.debug("Sending back-channel logout notifications signal!", session=instance)
111
112    access_tokens = AccessToken.objects.select_related("provider").filter(
113        user=instance.user,
114        session__session__session_key=instance.session.session_key,
115    )
116
117    # Only send backchannel logout notifications for providers that have
118    # logout_uri configured and backchannel logout method set
119    backchannel_tokens = [
120        (
121            token.provider_id,
122            token.id_token.iss,
123            token.id_token.sub,
124            instance.session.session_key,
125        )
126        for token in access_tokens
127        if token.provider.logout_uri
128        and token.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL
129    ]
130
131    if backchannel_tokens:
132        backchannel_logout_notification_dispatch.send(revocations=backchannel_tokens)
133
134    access_tokens.delete()
135
136
137@receiver(post_save, sender=User)
138def user_deactivated(sender, instance: User, **_):
139    """Remove user tokens when deactivated"""
140    if instance.is_active:
141        return
142    AccessToken.objects.including_expired().filter(user=instance).delete()
143    RefreshToken.objects.including_expired().filter(user=instance).delete()
144    DeviceToken.objects.including_expired().filter(user=instance).delete()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(flow_pre_user_logout)
def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs):
 26@receiver(flow_pre_user_logout)
 27def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs):
 28    """Handle SAML and OIDC frontchannel logout when user logs out via flow"""
 29
 30    # Only proceed if this is actually a UserLogoutStage
 31    if not isinstance(executor.current_stage, UserLogoutStage):
 32        return
 33
 34    if not user.is_authenticated:
 35        return
 36
 37    auth_session = AuthenticatedSession.from_request(request, user)
 38    if not auth_session:
 39        return
 40
 41    oidc_access_tokens = (
 42        AccessToken.objects.filter(
 43            user=user,
 44            session=auth_session,
 45            provider__logout_method=OAuth2LogoutMethod.FRONTCHANNEL,
 46        )
 47        .exclude(provider__logout_uri="")
 48        .select_related("provider")
 49    )
 50
 51    if not oidc_access_tokens.exists():
 52        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
 53        return
 54
 55    oidc_sessions = []
 56
 57    for token in oidc_access_tokens:
 58        # Parse the logout URI and add query parameters
 59        parsed_url = urlparse(token.provider.logout_uri)
 60
 61        query_params = {}
 62        query_params["iss"] = token.provider.get_issuer(request)
 63        if auth_session.session:
 64            query_params["sid"] = hash_session_key(auth_session.session.session_key)
 65
 66        # Combine existing query params with new ones
 67        if parsed_url.query:
 68            existing_params = parse_qs(parsed_url.query, keep_blank_values=True)
 69            for key, value in existing_params.items():
 70                if key not in query_params:
 71                    query_params[key] = value[0] if len(value) == 1 else value
 72
 73        # Build the final URL with query parameters
 74        logout_url = urlunparse(
 75            (
 76                parsed_url.scheme,
 77                parsed_url.netloc,
 78                parsed_url.path,
 79                parsed_url.params,
 80                urlencode(query_params),
 81                parsed_url.fragment,
 82            )
 83        )
 84
 85        logout_data = {
 86            "url": logout_url,
 87            "provider_name": token.provider.name,
 88            "binding": "redirect",
 89            "provider_type": "oidc",
 90        }
 91        oidc_sessions.append(logout_data)
 92
 93    if oidc_sessions:
 94        executor.plan.context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = oidc_sessions
 95
 96        # Stage already exists, don't reinject it
 97        if not any(
 98            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
 99        ):
100            iframe_stage = in_memory_stage(IframeLogoutStageView)
101            executor.plan.insert_stage(iframe_stage, index=1)
102
103        LOGGER.debug("Oauth iframe sessions gathered")

Handle SAML and OIDC frontchannel logout when user logs out via flow

@receiver(pre_delete, sender=AuthenticatedSession)
def user_session_deleted_oauth_backchannel_logout_and_tokens_removal(sender, instance: authentik.core.models.AuthenticatedSession, **_):
106@receiver(pre_delete, sender=AuthenticatedSession)
107def user_session_deleted_oauth_backchannel_logout_and_tokens_removal(
108    sender, instance: AuthenticatedSession, **_
109):
110    """Revoke tokens upon user logout"""
111    LOGGER.debug("Sending back-channel logout notifications signal!", session=instance)
112
113    access_tokens = AccessToken.objects.select_related("provider").filter(
114        user=instance.user,
115        session__session__session_key=instance.session.session_key,
116    )
117
118    # Only send backchannel logout notifications for providers that have
119    # logout_uri configured and backchannel logout method set
120    backchannel_tokens = [
121        (
122            token.provider_id,
123            token.id_token.iss,
124            token.id_token.sub,
125            instance.session.session_key,
126        )
127        for token in access_tokens
128        if token.provider.logout_uri
129        and token.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL
130    ]
131
132    if backchannel_tokens:
133        backchannel_logout_notification_dispatch.send(revocations=backchannel_tokens)
134
135    access_tokens.delete()

Revoke tokens upon user logout

@receiver(post_save, sender=User)
def user_deactivated(sender, instance: authentik.core.models.User, **_):
138@receiver(post_save, sender=User)
139def user_deactivated(sender, instance: User, **_):
140    """Remove user tokens when deactivated"""
141    if instance.is_active:
142        return
143    AccessToken.objects.including_expired().filter(user=instance).delete()
144    RefreshToken.objects.including_expired().filter(user=instance).delete()
145    DeviceToken.objects.including_expired().filter(user=instance).delete()

Remove user tokens when deactivated