authentik.providers.oauth2.signals
1from urllib.parse import parse_qs, urlencode, urlparse, urlunparse 2 3from django.db.models.signals import post_save, pre_delete 4from django.dispatch import receiver 5from structlog.stdlib import get_logger 6 7from authentik.common.oauth.constants import PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS 8from authentik.core.models import AuthenticatedSession, User 9from authentik.flows.models import in_memory_stage 10from authentik.outposts.tasks import hash_session_key 11from authentik.providers.iframe_logout import IframeLogoutStageView 12from authentik.providers.oauth2.models import ( 13 AccessToken, 14 DeviceToken, 15 OAuth2LogoutMethod, 16 RefreshToken, 17) 18from authentik.providers.oauth2.tasks import backchannel_logout_notification_dispatch 19from authentik.stages.user_logout.models import UserLogoutStage 20from authentik.stages.user_logout.stage import flow_pre_user_logout 21 22LOGGER = get_logger() 23 24 25@receiver(flow_pre_user_logout) 26def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs): 27 """Handle SAML and OIDC frontchannel logout when user logs out via flow""" 28 29 # Only proceed if this is actually a UserLogoutStage 30 if not isinstance(executor.current_stage, UserLogoutStage): 31 return 32 33 if not user.is_authenticated: 34 return 35 36 auth_session = AuthenticatedSession.from_request(request, user) 37 if not auth_session: 38 return 39 40 oidc_access_tokens = ( 41 AccessToken.objects.filter( 42 user=user, 43 session=auth_session, 44 provider__logout_method=OAuth2LogoutMethod.FRONTCHANNEL, 45 ) 46 .exclude(provider__logout_uri="") 47 .select_related("provider") 48 ) 49 50 if not oidc_access_tokens.exists(): 51 LOGGER.debug("No sessions requiring IFrame frontchannel logout") 52 return 53 54 oidc_sessions = [] 55 56 for token in oidc_access_tokens: 57 # Parse the logout URI and add query parameters 58 parsed_url = urlparse(token.provider.logout_uri) 59 60 query_params = {} 61 query_params["iss"] = token.provider.get_issuer(request) 62 if auth_session.session: 63 query_params["sid"] = hash_session_key(auth_session.session.session_key) 64 65 # Combine existing query params with new ones 66 if parsed_url.query: 67 existing_params = parse_qs(parsed_url.query, keep_blank_values=True) 68 for key, value in existing_params.items(): 69 if key not in query_params: 70 query_params[key] = value[0] if len(value) == 1 else value 71 72 # Build the final URL with query parameters 73 logout_url = urlunparse( 74 ( 75 parsed_url.scheme, 76 parsed_url.netloc, 77 parsed_url.path, 78 parsed_url.params, 79 urlencode(query_params), 80 parsed_url.fragment, 81 ) 82 ) 83 84 logout_data = { 85 "url": logout_url, 86 "provider_name": token.provider.name, 87 "binding": "redirect", 88 "provider_type": "oidc", 89 } 90 oidc_sessions.append(logout_data) 91 92 if oidc_sessions: 93 executor.plan.context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = oidc_sessions 94 95 # Stage already exists, don't reinject it 96 if not any( 97 binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings 98 ): 99 iframe_stage = in_memory_stage(IframeLogoutStageView) 100 executor.plan.insert_stage(iframe_stage, index=1) 101 102 LOGGER.debug("Oauth iframe sessions gathered") 103 104 105@receiver(pre_delete, sender=AuthenticatedSession) 106def user_session_deleted_oauth_backchannel_logout_and_tokens_removal( 107 sender, instance: AuthenticatedSession, **_ 108): 109 """Revoke tokens upon user logout""" 110 LOGGER.debug("Sending back-channel logout notifications signal!", session=instance) 111 112 access_tokens = AccessToken.objects.select_related("provider").filter( 113 user=instance.user, 114 session__session__session_key=instance.session.session_key, 115 ) 116 117 # Only send backchannel logout notifications for providers that have 118 # logout_uri configured and backchannel logout method set 119 backchannel_tokens = [ 120 ( 121 token.provider_id, 122 token.id_token.iss, 123 token.id_token.sub, 124 instance.session.session_key, 125 ) 126 for token in access_tokens 127 if token.provider.logout_uri 128 and token.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL 129 ] 130 131 if backchannel_tokens: 132 backchannel_logout_notification_dispatch.send(revocations=backchannel_tokens) 133 134 access_tokens.delete() 135 136 137@receiver(post_save, sender=User) 138def user_deactivated(sender, instance: User, **_): 139 """Remove user tokens when deactivated""" 140 if instance.is_active: 141 return 142 AccessToken.objects.including_expired().filter(user=instance).delete() 143 RefreshToken.objects.including_expired().filter(user=instance).delete() 144 DeviceToken.objects.including_expired().filter(user=instance).delete()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(flow_pre_user_logout)
def
handle_flow_pre_user_logout(sender, request, user, executor, **kwargs):
26@receiver(flow_pre_user_logout) 27def handle_flow_pre_user_logout(sender, request, user, executor, **kwargs): 28 """Handle SAML and OIDC frontchannel logout when user logs out via flow""" 29 30 # Only proceed if this is actually a UserLogoutStage 31 if not isinstance(executor.current_stage, UserLogoutStage): 32 return 33 34 if not user.is_authenticated: 35 return 36 37 auth_session = AuthenticatedSession.from_request(request, user) 38 if not auth_session: 39 return 40 41 oidc_access_tokens = ( 42 AccessToken.objects.filter( 43 user=user, 44 session=auth_session, 45 provider__logout_method=OAuth2LogoutMethod.FRONTCHANNEL, 46 ) 47 .exclude(provider__logout_uri="") 48 .select_related("provider") 49 ) 50 51 if not oidc_access_tokens.exists(): 52 LOGGER.debug("No sessions requiring IFrame frontchannel logout") 53 return 54 55 oidc_sessions = [] 56 57 for token in oidc_access_tokens: 58 # Parse the logout URI and add query parameters 59 parsed_url = urlparse(token.provider.logout_uri) 60 61 query_params = {} 62 query_params["iss"] = token.provider.get_issuer(request) 63 if auth_session.session: 64 query_params["sid"] = hash_session_key(auth_session.session.session_key) 65 66 # Combine existing query params with new ones 67 if parsed_url.query: 68 existing_params = parse_qs(parsed_url.query, keep_blank_values=True) 69 for key, value in existing_params.items(): 70 if key not in query_params: 71 query_params[key] = value[0] if len(value) == 1 else value 72 73 # Build the final URL with query parameters 74 logout_url = urlunparse( 75 ( 76 parsed_url.scheme, 77 parsed_url.netloc, 78 parsed_url.path, 79 parsed_url.params, 80 urlencode(query_params), 81 parsed_url.fragment, 82 ) 83 ) 84 85 logout_data = { 86 "url": logout_url, 87 "provider_name": token.provider.name, 88 "binding": "redirect", 89 "provider_type": "oidc", 90 } 91 oidc_sessions.append(logout_data) 92 93 if oidc_sessions: 94 executor.plan.context[PLAN_CONTEXT_OIDC_LOGOUT_IFRAME_SESSIONS] = oidc_sessions 95 96 # Stage already exists, don't reinject it 97 if not any( 98 binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings 99 ): 100 iframe_stage = in_memory_stage(IframeLogoutStageView) 101 executor.plan.insert_stage(iframe_stage, index=1) 102 103 LOGGER.debug("Oauth iframe sessions gathered")
Handle SAML and OIDC frontchannel logout when user logs out via flow
@receiver(pre_delete, sender=AuthenticatedSession)
def
user_session_deleted_oauth_backchannel_logout_and_tokens_removal(sender, instance: authentik.core.models.AuthenticatedSession, **_):
106@receiver(pre_delete, sender=AuthenticatedSession) 107def user_session_deleted_oauth_backchannel_logout_and_tokens_removal( 108 sender, instance: AuthenticatedSession, **_ 109): 110 """Revoke tokens upon user logout""" 111 LOGGER.debug("Sending back-channel logout notifications signal!", session=instance) 112 113 access_tokens = AccessToken.objects.select_related("provider").filter( 114 user=instance.user, 115 session__session__session_key=instance.session.session_key, 116 ) 117 118 # Only send backchannel logout notifications for providers that have 119 # logout_uri configured and backchannel logout method set 120 backchannel_tokens = [ 121 ( 122 token.provider_id, 123 token.id_token.iss, 124 token.id_token.sub, 125 instance.session.session_key, 126 ) 127 for token in access_tokens 128 if token.provider.logout_uri 129 and token.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL 130 ] 131 132 if backchannel_tokens: 133 backchannel_logout_notification_dispatch.send(revocations=backchannel_tokens) 134 135 access_tokens.delete()
Revoke tokens upon user logout
@receiver(post_save, sender=User)
def
user_deactivated(sender, instance: authentik.core.models.User, **_):
138@receiver(post_save, sender=User) 139def user_deactivated(sender, instance: User, **_): 140 """Remove user tokens when deactivated""" 141 if instance.is_active: 142 return 143 AccessToken.objects.including_expired().filter(user=instance).delete() 144 RefreshToken.objects.including_expired().filter(user=instance).delete() 145 DeviceToken.objects.including_expired().filter(user=instance).delete()
Remove user tokens when deactivated