authentik.providers.oauth2.signals

  1from django.db.models.signals import post_save, pre_delete
  2from django.dispatch import receiver
  3from structlog.stdlib import get_logger
  4
  5from authentik.common.oauth.constants import (
  6    OAUTH2_BINDING,
  7    PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS,
  8)
  9from authentik.core.models import AuthenticatedSession, User
 10from authentik.flows.models import in_memory_stage
 11from authentik.providers.iframe_logout import IframeLogoutStageView
 12from authentik.providers.oauth2.models import (
 13    AccessToken,
 14    DeviceToken,
 15    OAuth2LogoutMethod,
 16    RefreshToken,
 17)
 18from authentik.providers.oauth2.tasks import backchannel_logout_notification_dispatch
 19from authentik.providers.oauth2.utils import build_frontchannel_logout_url
 20from authentik.stages.user_logout.models import UserLogoutStage
 21from authentik.stages.user_logout.stage import flow_pre_user_logout
 22
 23LOGGER = get_logger()
 24
 25
 26@receiver(flow_pre_user_logout)
 27def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs):
 28    """Handle SAML and OIDC frontchannel logout when user logs out via flow"""
 29
 30    # Only proceed if this is actually a UserLogoutStage
 31    if not isinstance(executor.current_stage, UserLogoutStage):
 32        return
 33
 34    if not user.is_authenticated:
 35        return
 36
 37    auth_session = AuthenticatedSession.from_request(request, user)
 38    if not auth_session:
 39        return
 40
 41    oidc_access_tokens = (
 42        AccessToken.objects.filter(
 43            user=user,
 44            session=auth_session,
 45            provider__logout_method=OAuth2LogoutMethod.FRONTCHANNEL,
 46        )
 47        .exclude(provider__logout_uri="")
 48        .select_related("provider")
 49    )
 50
 51    if not oidc_access_tokens.exists():
 52        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
 53        return
 54
 55    session_key = auth_session.session.session_key if auth_session.session else None
 56    oidc_sessions = []
 57
 58    for token in oidc_access_tokens:
 59        logout_url = build_frontchannel_logout_url(token.provider, request, session_key)
 60        if logout_url:
 61            oidc_sessions.append(
 62                {
 63                    "url": logout_url,
 64                    "provider_name": token.provider.name,
 65                    "binding": OAUTH2_BINDING,
 66                    "provider_type": (
 67                        f"{token.provider._meta.app_label}.{token.provider._meta.model_name}"
 68                    ),
 69                }
 70            )
 71
 72    if oidc_sessions:
 73        executor.plan.context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = oidc_sessions
 74
 75        # Stage already exists, don't reinject it
 76        if not any(
 77            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
 78        ):
 79            iframe_stage = in_memory_stage(IframeLogoutStageView)
 80            executor.plan.insert_stage(iframe_stage, index=1)
 81
 82        LOGGER.debug("Oauth iframe sessions gathered")
 83
 84
 85@receiver(pre_delete, sender=AuthenticatedSession)
 86def user_session_deleted_oauth_backchannel_logout_and_tokens_removal(
 87    sender, instance: AuthenticatedSession, **_
 88):
 89    """Revoke tokens upon user logout"""
 90    LOGGER.debug("Sending back-channel logout notifications signal!", session=instance)
 91
 92    access_tokens = AccessToken.objects.select_related("provider").filter(
 93        user=instance.user,
 94        session__session__session_key=instance.session.session_key,
 95    )
 96
 97    # Only send backchannel logout notifications for providers that have
 98    # logout_uri configured and backchannel logout method set
 99    backchannel_tokens = [
100        (
101            token.provider_id,
102            token.id_token.iss,
103            token.id_token.sub,
104            instance.session.session_key,
105        )
106        for token in access_tokens
107        if token.provider.logout_uri
108        and token.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL
109    ]
110
111    if backchannel_tokens:
112        backchannel_logout_notification_dispatch.send(revocations=backchannel_tokens)
113
114    access_tokens.delete()
115
116
117@receiver(post_save, sender=User)
118def user_deactivated(sender, instance: User, **_):
119    """Remove user tokens when deactivated"""
120    if instance.is_active:
121        return
122    AccessToken.objects.including_expired().filter(user=instance).delete()
123    RefreshToken.objects.including_expired().filter(user=instance).delete()
124    DeviceToken.objects.including_expired().filter(user=instance).delete()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(flow_pre_user_logout)
def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs):
27@receiver(flow_pre_user_logout)
28def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs):
29    """Handle SAML and OIDC frontchannel logout when user logs out via flow"""
30
31    # Only proceed if this is actually a UserLogoutStage
32    if not isinstance(executor.current_stage, UserLogoutStage):
33        return
34
35    if not user.is_authenticated:
36        return
37
38    auth_session = AuthenticatedSession.from_request(request, user)
39    if not auth_session:
40        return
41
42    oidc_access_tokens = (
43        AccessToken.objects.filter(
44            user=user,
45            session=auth_session,
46            provider__logout_method=OAuth2LogoutMethod.FRONTCHANNEL,
47        )
48        .exclude(provider__logout_uri="")
49        .select_related("provider")
50    )
51
52    if not oidc_access_tokens.exists():
53        LOGGER.debug("No sessions requiring IFrame frontchannel logout")
54        return
55
56    session_key = auth_session.session.session_key if auth_session.session else None
57    oidc_sessions = []
58
59    for token in oidc_access_tokens:
60        logout_url = build_frontchannel_logout_url(token.provider, request, session_key)
61        if logout_url:
62            oidc_sessions.append(
63                {
64                    "url": logout_url,
65                    "provider_name": token.provider.name,
66                    "binding": OAUTH2_BINDING,
67                    "provider_type": (
68                        f"{token.provider._meta.app_label}.{token.provider._meta.model_name}"
69                    ),
70                }
71            )
72
73    if oidc_sessions:
74        executor.plan.context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = oidc_sessions
75
76        # Stage already exists, don't reinject it
77        if not any(
78            binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
79        ):
80            iframe_stage = in_memory_stage(IframeLogoutStageView)
81            executor.plan.insert_stage(iframe_stage, index=1)
82
83        LOGGER.debug("Oauth iframe sessions gathered")

Handle SAML and OIDC frontchannel logout when user logs out via flow

@receiver(pre_delete, sender=AuthenticatedSession)
def user_session_deleted_oauth_backchannel_logout_and_tokens_removal(sender, instance: authentik.core.models.AuthenticatedSession, **_):
 86@receiver(pre_delete, sender=AuthenticatedSession)
 87def user_session_deleted_oauth_backchannel_logout_and_tokens_removal(
 88    sender, instance: AuthenticatedSession, **_
 89):
 90    """Revoke tokens upon user logout"""
 91    LOGGER.debug("Sending back-channel logout notifications signal!", session=instance)
 92
 93    access_tokens = AccessToken.objects.select_related("provider").filter(
 94        user=instance.user,
 95        session__session__session_key=instance.session.session_key,
 96    )
 97
 98    # Only send backchannel logout notifications for providers that have
 99    # logout_uri configured and backchannel logout method set
100    backchannel_tokens = [
101        (
102            token.provider_id,
103            token.id_token.iss,
104            token.id_token.sub,
105            instance.session.session_key,
106        )
107        for token in access_tokens
108        if token.provider.logout_uri
109        and token.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL
110    ]
111
112    if backchannel_tokens:
113        backchannel_logout_notification_dispatch.send(revocations=backchannel_tokens)
114
115    access_tokens.delete()

Revoke tokens upon user logout

@receiver(post_save, sender=User)
def user_deactivated(sender, instance: authentik.core.models.User, **_):
118@receiver(post_save, sender=User)
119def user_deactivated(sender, instance: User, **_):
120    """Remove user tokens when deactivated"""
121    if instance.is_active:
122        return
123    AccessToken.objects.including_expired().filter(user=instance).delete()
124    RefreshToken.objects.including_expired().filter(user=instance).delete()
125    DeviceToken.objects.including_expired().filter(user=instance).delete()

Remove user tokens when deactivated