authentik.providers.oauth2.signals
1from django.db.models.signals import post_save, pre_delete 2from django.dispatch import receiver 3from structlog.stdlib import get_logger 4 5from authentik.common.oauth.constants import ( 6 OAUTH2_BINDING, 7 PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS, 8) 9from authentik.core.models import AuthenticatedSession, User 10from authentik.flows.models import in_memory_stage 11from authentik.providers.iframe_logout import IframeLogoutStageView 12from authentik.providers.oauth2.models import ( 13 AccessToken, 14 DeviceToken, 15 OAuth2LogoutMethod, 16 RefreshToken, 17) 18from authentik.providers.oauth2.tasks import backchannel_logout_notification_dispatch 19from authentik.providers.oauth2.utils import build_frontchannel_logout_url 20from authentik.stages.user_logout.models import UserLogoutStage 21from authentik.stages.user_logout.stage import flow_pre_user_logout 22 23LOGGER = get_logger() 24 25 26@receiver(flow_pre_user_logout) 27def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs): 28 """Handle SAML and OIDC frontchannel logout when user logs out via flow""" 29 30 # Only proceed if this is actually a UserLogoutStage 31 if not isinstance(executor.current_stage, UserLogoutStage): 32 return 33 34 if not user.is_authenticated: 35 return 36 37 auth_session = AuthenticatedSession.from_request(request, user) 38 if not auth_session: 39 return 40 41 oidc_access_tokens = ( 42 AccessToken.objects.filter( 43 user=user, 44 session=auth_session, 45 provider__logout_method=OAuth2LogoutMethod.FRONTCHANNEL, 46 ) 47 .exclude(provider__logout_uri="") 48 .select_related("provider") 49 ) 50 51 if not oidc_access_tokens.exists(): 52 LOGGER.debug("No sessions requiring IFrame frontchannel logout") 53 return 54 55 session_key = auth_session.session.session_key if auth_session.session else None 56 oidc_sessions = [] 57 58 for token in oidc_access_tokens: 59 logout_url = build_frontchannel_logout_url(token.provider, request, session_key) 60 if logout_url: 61 oidc_sessions.append( 62 { 63 "url": logout_url, 64 "provider_name": token.provider.name, 65 "binding": OAUTH2_BINDING, 66 "provider_type": ( 67 f"{token.provider._meta.app_label}.{token.provider._meta.model_name}" 68 ), 69 } 70 ) 71 72 if oidc_sessions: 73 executor.plan.context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = oidc_sessions 74 75 # Stage already exists, don't reinject it 76 if not any( 77 binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings 78 ): 79 iframe_stage = in_memory_stage(IframeLogoutStageView) 80 executor.plan.insert_stage(iframe_stage, index=1) 81 82 LOGGER.debug("Oauth iframe sessions gathered") 83 84 85@receiver(pre_delete, sender=AuthenticatedSession) 86def user_session_deleted_oauth_backchannel_logout_and_tokens_removal( 87 sender, instance: AuthenticatedSession, **_ 88): 89 """Revoke tokens upon user logout""" 90 LOGGER.debug("Sending back-channel logout notifications signal!", session=instance) 91 92 access_tokens = AccessToken.objects.select_related("provider").filter( 93 user=instance.user, 94 session__session__session_key=instance.session.session_key, 95 ) 96 97 # Only send backchannel logout notifications for providers that have 98 # logout_uri configured and backchannel logout method set 99 backchannel_tokens = [ 100 ( 101 token.provider_id, 102 token.id_token.iss, 103 token.id_token.sub, 104 instance.session.session_key, 105 ) 106 for token in access_tokens 107 if token.provider.logout_uri 108 and token.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL 109 ] 110 111 if backchannel_tokens: 112 backchannel_logout_notification_dispatch.send(revocations=backchannel_tokens) 113 114 access_tokens.delete() 115 116 117@receiver(post_save, sender=User) 118def user_deactivated(sender, instance: User, **_): 119 """Remove user tokens when deactivated""" 120 if instance.is_active: 121 return 122 AccessToken.objects.including_expired().filter(user=instance).delete() 123 RefreshToken.objects.including_expired().filter(user=instance).delete() 124 DeviceToken.objects.including_expired().filter(user=instance).delete()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(flow_pre_user_logout)
def
handle_flow_pre_user_logout(sender, request, user, executor, **kwargs):
27@receiver(flow_pre_user_logout) 28def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs): 29 """Handle SAML and OIDC frontchannel logout when user logs out via flow""" 30 31 # Only proceed if this is actually a UserLogoutStage 32 if not isinstance(executor.current_stage, UserLogoutStage): 33 return 34 35 if not user.is_authenticated: 36 return 37 38 auth_session = AuthenticatedSession.from_request(request, user) 39 if not auth_session: 40 return 41 42 oidc_access_tokens = ( 43 AccessToken.objects.filter( 44 user=user, 45 session=auth_session, 46 provider__logout_method=OAuth2LogoutMethod.FRONTCHANNEL, 47 ) 48 .exclude(provider__logout_uri="") 49 .select_related("provider") 50 ) 51 52 if not oidc_access_tokens.exists(): 53 LOGGER.debug("No sessions requiring IFrame frontchannel logout") 54 return 55 56 session_key = auth_session.session.session_key if auth_session.session else None 57 oidc_sessions = [] 58 59 for token in oidc_access_tokens: 60 logout_url = build_frontchannel_logout_url(token.provider, request, session_key) 61 if logout_url: 62 oidc_sessions.append( 63 { 64 "url": logout_url, 65 "provider_name": token.provider.name, 66 "binding": OAUTH2_BINDING, 67 "provider_type": ( 68 f"{token.provider._meta.app_label}.{token.provider._meta.model_name}" 69 ), 70 } 71 ) 72 73 if oidc_sessions: 74 executor.plan.context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = oidc_sessions 75 76 # Stage already exists, don't reinject it 77 if not any( 78 binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings 79 ): 80 iframe_stage = in_memory_stage(IframeLogoutStageView) 81 executor.plan.insert_stage(iframe_stage, index=1) 82 83 LOGGER.debug("Oauth iframe sessions gathered")
Handle SAML and OIDC frontchannel logout when user logs out via flow
@receiver(pre_delete, sender=AuthenticatedSession)
def
user_session_deleted_oauth_backchannel_logout_and_tokens_removal(sender, instance: authentik.core.models.AuthenticatedSession, **_):
86@receiver(pre_delete, sender=AuthenticatedSession) 87def user_session_deleted_oauth_backchannel_logout_and_tokens_removal( 88 sender, instance: AuthenticatedSession, **_ 89): 90 """Revoke tokens upon user logout""" 91 LOGGER.debug("Sending back-channel logout notifications signal!", session=instance) 92 93 access_tokens = AccessToken.objects.select_related("provider").filter( 94 user=instance.user, 95 session__session__session_key=instance.session.session_key, 96 ) 97 98 # Only send backchannel logout notifications for providers that have 99 # logout_uri configured and backchannel logout method set 100 backchannel_tokens = [ 101 ( 102 token.provider_id, 103 token.id_token.iss, 104 token.id_token.sub, 105 instance.session.session_key, 106 ) 107 for token in access_tokens 108 if token.provider.logout_uri 109 and token.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL 110 ] 111 112 if backchannel_tokens: 113 backchannel_logout_notification_dispatch.send(revocations=backchannel_tokens) 114 115 access_tokens.delete()
Revoke tokens upon user logout
@receiver(post_save, sender=User)
def
user_deactivated(sender, instance: authentik.core.models.User, **_):
118@receiver(post_save, sender=User) 119def user_deactivated(sender, instance: User, **_): 120 """Remove user tokens when deactivated""" 121 if instance.is_active: 122 return 123 AccessToken.objects.including_expired().filter(user=instance).delete() 124 RefreshToken.objects.including_expired().filter(user=instance).delete() 125 DeviceToken.objects.including_expired().filter(user=instance).delete()
Remove user tokens when deactivated