authentik.enterprise.stages.account_lockdown.stage
Account lockdown stage logic
1"""Account lockdown stage logic""" 2 3from django.apps import apps 4from django.core.exceptions import FieldDoesNotExist 5from django.db.models import Model, QuerySet 6from django.db.models.query_utils import Q 7from django.db.transaction import atomic 8from django.http import HttpRequest, HttpResponse, HttpResponseRedirect 9from django.urls import reverse 10from django.utils.translation import gettext_lazy as _ 11from dramatiq.actor import Actor 12from dramatiq.composition import group 13from dramatiq.results.errors import ResultTimeout 14 15from authentik.core.models import ( 16 AuthenticatedSession, 17 ExpiringModel, 18 Session, 19 Token, 20 User, 21 UserTypes, 22) 23from authentik.enterprise.stages.account_lockdown.models import AccountLockdownStage 24from authentik.events.models import Event, EventAction 25from authentik.flows.stage import StageView 26from authentik.lib.sync.outgoing.models import OutgoingSyncProvider 27from authentik.lib.sync.outgoing.signals import sync_outgoing_inhibit_dispatch 28from authentik.lib.utils.reflection import class_to_path 29from authentik.lib.utils.time import timedelta_from_string 30from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 31 32PLAN_CONTEXT_LOCKDOWN_REASON = "lockdown_reason" 33LOCKDOWN_EVENT_ACTION_ID = "account_lockdown" 34 35TARGET_REQUIRED_MESSAGE = _("No target user specified for account lockdown") 36PERMISSION_DENIED_MESSAGE = _("You do not have permission to lock down this account.") 37ACCOUNT_LOCKDOWN_FAILED_MESSAGE = _("Account lockdown failed for this account.") 38SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE = _( 39 "Self-service account lockdown requires a completion flow." 40) 41 42 43def get_lockdown_target_users() -> QuerySet[User]: 44 """Return users that can be targeted by account lockdown.""" 45 return User.objects.exclude_anonymous().exclude(type=UserTypes.INTERNAL_SERVICE_ACCOUNT) 46 47 48def _get_model_field(model: type[Model], field_name: str): 49 """Get a model field by name, if present.""" 50 try: 51 return model._meta.get_field(field_name) 52 except FieldDoesNotExist: 53 return None 54 55 56def _has_user_field(model: type[Model]) -> bool: 57 """Check if a model has a direct user foreign key.""" 58 field = _get_model_field(model, "user") 59 return bool(field and getattr(field, "remote_field", None) and field.remote_field.model is User) 60 61 62def _has_authenticated_session_field(model: type[Model]) -> bool: 63 """Check if a model is linked to an authenticated session.""" 64 field = _get_model_field(model, "session") 65 return bool( 66 field 67 and getattr(field, "remote_field", None) 68 and field.remote_field.model is AuthenticatedSession 69 ) 70 71 72def _has_provider_field(model: type[Model]) -> bool: 73 """Check if a model is linked to a provider.""" 74 return _get_model_field(model, "provider") is not None 75 76 77def get_lockdown_token_models() -> tuple[type[Model], ...]: 78 """Return token, grant, and provider session models removed by account lockdown.""" 79 token_models: list[type[Model]] = [] 80 for model in apps.get_models(): 81 if model._meta.abstract or not issubclass(model, ExpiringModel): 82 continue 83 if model is Token: 84 token_models.append(model) 85 elif _has_user_field(model) and ( 86 _has_provider_field(model) or _has_authenticated_session_field(model) 87 ): 88 token_models.append(model) 89 elif _has_authenticated_session_field(model): 90 token_models.append(model) 91 return tuple(token_models) 92 93 94def get_lockdown_token_queryset(model: type[Model], user: User) -> QuerySet: 95 """Return account lockdown artifacts for a model and user.""" 96 manager = model.objects.including_expired() 97 if _has_user_field(model): 98 return manager.filter(user=user) 99 return manager.filter(session__user=user) 100 101 102def can_lock_user(actor, user: User) -> bool: 103 """Check whether the actor may lock the target user.""" 104 if not actor.is_authenticated: 105 return False 106 if user.pk == actor.pk: 107 return True 108 return actor.has_perm("authentik_core.change_user", user) 109 110 111def get_outgoing_sync_tasks() -> tuple[tuple[type[OutgoingSyncProvider], Actor], ...]: 112 """Return outgoing sync provider types and their direct sync tasks.""" 113 from authentik.enterprise.providers.google_workspace.models import GoogleWorkspaceProvider 114 from authentik.enterprise.providers.google_workspace.tasks import google_workspace_sync_direct 115 from authentik.enterprise.providers.microsoft_entra.models import MicrosoftEntraProvider 116 from authentik.enterprise.providers.microsoft_entra.tasks import microsoft_entra_sync_direct 117 from authentik.providers.scim.models import SCIMProvider 118 from authentik.providers.scim.tasks import scim_sync_direct 119 120 return ( 121 (SCIMProvider, scim_sync_direct), 122 (GoogleWorkspaceProvider, google_workspace_sync_direct), 123 (MicrosoftEntraProvider, microsoft_entra_sync_direct), 124 ) 125 126 127class AccountLockdownStageView(StageView): 128 """Execute account lockdown actions on the target user.""" 129 130 def is_self_service(self, request: HttpRequest, user: User) -> bool: 131 """Check whether the currently authenticated user is locking their own account.""" 132 return request.user.is_authenticated and user.pk == request.user.pk 133 134 def get_reason(self) -> str: 135 """Get the lockdown reason from the plan context. 136 137 Priority: 138 1. prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON] 139 2. PLAN_CONTEXT_LOCKDOWN_REASON (explicitly set) 140 3. Empty string as fallback 141 """ 142 prompt_data = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}) 143 if PLAN_CONTEXT_LOCKDOWN_REASON in prompt_data: 144 return prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON] 145 return self.executor.plan.context.get(PLAN_CONTEXT_LOCKDOWN_REASON, "") 146 147 def _apply_lockdown_actions(self, stage: AccountLockdownStage, user: User) -> None: 148 """Apply the configured account changes to the target user.""" 149 if stage.deactivate_user: 150 user.is_active = False 151 if stage.set_unusable_password: 152 user.set_unusable_password() 153 if stage.deactivate_user: 154 with sync_outgoing_inhibit_dispatch(): 155 user.save() 156 return 157 user.save() 158 159 def _sync_deactivated_user_to_outgoing_providers(self, user: User) -> None: 160 """Synchronize a deactivated user to outgoing sync providers.""" 161 messages = [] 162 wait_timeout = 0 163 model = class_to_path(User) 164 provider_filter = Q(backchannel_application__isnull=False) | Q(application__isnull=False) 165 166 for provider_model, task_sync_direct in get_outgoing_sync_tasks(): 167 for provider in provider_model.objects.filter(provider_filter): 168 time_limit = int( 169 timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000 170 ) 171 messages.append( 172 task_sync_direct.message_with_options( 173 args=(model, user.pk, provider.pk), 174 rel_obj=provider, 175 time_limit=time_limit, 176 uid=f"{provider.name}:user:{user.pk}:direct", 177 ) 178 ) 179 wait_timeout += time_limit 180 181 if not messages: 182 return 183 try: 184 group(messages).run().wait(timeout=wait_timeout) 185 except ResultTimeout: 186 self.logger.warning( 187 "Timed out waiting for outgoing sync tasks; tasks remain queued", 188 user=user.username, 189 timeout=wait_timeout, 190 ) 191 192 def _get_lockdown_artifact_querysets( 193 self, stage: AccountLockdownStage, user: User 194 ) -> tuple[QuerySet, ...]: 195 """Return the configured sessions and tokens targeted by lockdown.""" 196 querysets: list[QuerySet] = [] 197 if stage.delete_sessions: 198 querysets.append(Session.objects.filter(authenticatedsession__user=user)) 199 if stage.revoke_tokens: 200 querysets.extend( 201 get_lockdown_token_queryset(model, user) for model in get_lockdown_token_models() 202 ) 203 return tuple(querysets) 204 205 def _delete_lockdown_artifacts(self, stage: AccountLockdownStage, user: User) -> None: 206 """Delete sessions and tokens selected by the lockdown configuration.""" 207 for queryset in self._get_lockdown_artifact_querysets(stage, user): 208 queryset.delete() 209 210 def _has_lockdown_artifacts(self, stage: AccountLockdownStage, user: User) -> bool: 211 """Check whether there are still sessions or tokens to remove.""" 212 return any( 213 queryset.exists() for queryset in self._get_lockdown_artifact_querysets(stage, user) 214 ) 215 216 def _emit_lockdown_event(self, request: HttpRequest, user: User, reason: str) -> None: 217 """Emit the audit event for a completed lockdown.""" 218 # Emit the audit event after the transaction commits. If event creation 219 # fails here, dispatch() would otherwise treat the whole lockdown as 220 # failed even though the account changes have already been committed. 221 try: 222 Event.new( 223 EventAction.USER_WRITE, 224 action_id=LOCKDOWN_EVENT_ACTION_ID, 225 reason=reason, 226 affected_user=user.username, 227 ).from_http(request) 228 except Exception as exc: # noqa: BLE001 229 # Event emission should not make the lockdown itself fail. 230 self.logger.warning( 231 "Failed to emit account lockdown event", 232 user=user.username, 233 exc=exc, 234 ) 235 236 def _lockdown_user( 237 self, 238 request: HttpRequest, 239 stage: AccountLockdownStage, 240 user: User, 241 reason: str, 242 ) -> None: 243 """Execute lockdown actions on a single user.""" 244 with atomic(): 245 user = User.objects.get(pk=user.pk) 246 self._apply_lockdown_actions(stage, user) 247 self._delete_lockdown_artifacts(stage, user) 248 249 # These additional checks/deletes are done to prevent a timing attack that creates tokens 250 # with a compromised token that is simultaneously being deleted. 251 while self._has_lockdown_artifacts(stage, user): 252 with atomic(): 253 self._delete_lockdown_artifacts(stage, user) 254 255 if stage.deactivate_user: 256 try: 257 self._sync_deactivated_user_to_outgoing_providers(user) 258 except Exception as exc: # noqa: BLE001 259 # Local lockdown has already committed. Provider sync failures 260 # must not reopen access or mark the lockdown itself as failed. 261 self.logger.warning( 262 "Failed to sync account lockdown deactivation to outgoing providers", 263 user=user.username, 264 exc=exc, 265 ) 266 self._emit_lockdown_event(request, user, reason) 267 268 def dispatch(self, request: HttpRequest) -> HttpResponse: 269 """Execute account lockdown actions.""" 270 self.request = request 271 stage: AccountLockdownStage = self.executor.current_stage 272 273 pending_user = self.get_pending_user() 274 if not pending_user.is_authenticated: 275 self.logger.warning("No target user found for account lockdown") 276 return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE) 277 user = get_lockdown_target_users().filter(pk=pending_user.pk).first() 278 if user is None: 279 self.logger.warning("Target user is not eligible for account lockdown") 280 return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE) 281 if not can_lock_user(request.user, user): 282 self.logger.warning( 283 "Permission denied for account lockdown", 284 actor=getattr(request.user, "username", None), 285 target=user.username, 286 ) 287 return self.executor.stage_invalid(PERMISSION_DENIED_MESSAGE) 288 289 reason = self.get_reason() 290 self_service = self.is_self_service(request, user) 291 if self_service and stage.delete_sessions and not stage.self_service_completion_flow: 292 self.logger.warning("No completion flow configured for self-service account lockdown") 293 return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE) 294 295 self.logger.info( 296 "Executing account lockdown", 297 user=user.username, 298 reason=reason, 299 self_service=self_service, 300 deactivate_user=stage.deactivate_user, 301 set_unusable_password=stage.set_unusable_password, 302 delete_sessions=stage.delete_sessions, 303 revoke_tokens=stage.revoke_tokens, 304 ) 305 306 try: 307 self._lockdown_user(request, stage, user, reason) 308 self.logger.info("Account lockdown completed", user=user.username) 309 except Exception as exc: # noqa: BLE001 310 # Convert unexpected lockdown errors to a flow-stage failure instead 311 # of leaking an exception through the flow executor. 312 self.logger.warning("Account lockdown failed", user=user.username, exc=exc) 313 return self.executor.stage_invalid(ACCOUNT_LOCKDOWN_FAILED_MESSAGE) 314 315 if self_service: 316 if stage.delete_sessions: 317 return self._self_service_completion_response(request) 318 return self.executor.stage_ok() 319 320 return self.executor.stage_ok() 321 322 def _self_service_completion_response(self, request: HttpRequest) -> HttpResponse: 323 """Redirect to completion flow after self-service lockdown. 324 325 Since all sessions are deleted, the user cannot continue in the flow. 326 Redirect them to an unauthenticated completion flow that shows the 327 lockdown message. 328 329 We use a direct HTTP redirect instead of a challenge because the 330 flow executor's challenge handling may try to access the session 331 which we just deleted. 332 """ 333 stage: AccountLockdownStage = self.executor.current_stage 334 completion_flow = stage.self_service_completion_flow 335 if completion_flow: 336 # Flush the current request's session to prevent Django's session 337 # middleware from trying to save a deleted session 338 if hasattr(request, "session"): 339 request.session.flush() 340 redirect_to = reverse( 341 "authentik_core:if-flow", 342 kwargs={"flow_slug": completion_flow.slug}, 343 ) 344 return HttpResponseRedirect(redirect_to) 345 return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE)
PLAN_CONTEXT_LOCKDOWN_REASON =
'lockdown_reason'
LOCKDOWN_EVENT_ACTION_ID =
'account_lockdown'
TARGET_REQUIRED_MESSAGE =
'No target user specified for account lockdown'
PERMISSION_DENIED_MESSAGE =
'You do not have permission to lock down this account.'
ACCOUNT_LOCKDOWN_FAILED_MESSAGE =
'Account lockdown failed for this account.'
SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE =
'Self-service account lockdown requires a completion flow.'
def
get_lockdown_target_users() -> django.db.models.query.QuerySet:
44def get_lockdown_target_users() -> QuerySet[User]: 45 """Return users that can be targeted by account lockdown.""" 46 return User.objects.exclude_anonymous().exclude(type=UserTypes.INTERNAL_SERVICE_ACCOUNT)
Return users that can be targeted by account lockdown.
def
get_lockdown_token_models() -> tuple[type[django.db.models.base.Model], ...]:
78def get_lockdown_token_models() -> tuple[type[Model], ...]: 79 """Return token, grant, and provider session models removed by account lockdown.""" 80 token_models: list[type[Model]] = [] 81 for model in apps.get_models(): 82 if model._meta.abstract or not issubclass(model, ExpiringModel): 83 continue 84 if model is Token: 85 token_models.append(model) 86 elif _has_user_field(model) and ( 87 _has_provider_field(model) or _has_authenticated_session_field(model) 88 ): 89 token_models.append(model) 90 elif _has_authenticated_session_field(model): 91 token_models.append(model) 92 return tuple(token_models)
Return token, grant, and provider session models removed by account lockdown.
def
get_lockdown_token_queryset( model: type[django.db.models.base.Model], user: authentik.core.models.User) -> django.db.models.query.QuerySet:
95def get_lockdown_token_queryset(model: type[Model], user: User) -> QuerySet: 96 """Return account lockdown artifacts for a model and user.""" 97 manager = model.objects.including_expired() 98 if _has_user_field(model): 99 return manager.filter(user=user) 100 return manager.filter(session__user=user)
Return account lockdown artifacts for a model and user.
103def can_lock_user(actor, user: User) -> bool: 104 """Check whether the actor may lock the target user.""" 105 if not actor.is_authenticated: 106 return False 107 if user.pk == actor.pk: 108 return True 109 return actor.has_perm("authentik_core.change_user", user)
Check whether the actor may lock the target user.
def
get_outgoing_sync_tasks() -> tuple[tuple[type[authentik.lib.sync.outgoing.models.OutgoingSyncProvider], dramatiq.actor.Actor], ...]:
112def get_outgoing_sync_tasks() -> tuple[tuple[type[OutgoingSyncProvider], Actor], ...]: 113 """Return outgoing sync provider types and their direct sync tasks.""" 114 from authentik.enterprise.providers.google_workspace.models import GoogleWorkspaceProvider 115 from authentik.enterprise.providers.google_workspace.tasks import google_workspace_sync_direct 116 from authentik.enterprise.providers.microsoft_entra.models import MicrosoftEntraProvider 117 from authentik.enterprise.providers.microsoft_entra.tasks import microsoft_entra_sync_direct 118 from authentik.providers.scim.models import SCIMProvider 119 from authentik.providers.scim.tasks import scim_sync_direct 120 121 return ( 122 (SCIMProvider, scim_sync_direct), 123 (GoogleWorkspaceProvider, google_workspace_sync_direct), 124 (MicrosoftEntraProvider, microsoft_entra_sync_direct), 125 )
Return outgoing sync provider types and their direct sync tasks.
128class AccountLockdownStageView(StageView): 129 """Execute account lockdown actions on the target user.""" 130 131 def is_self_service(self, request: HttpRequest, user: User) -> bool: 132 """Check whether the currently authenticated user is locking their own account.""" 133 return request.user.is_authenticated and user.pk == request.user.pk 134 135 def get_reason(self) -> str: 136 """Get the lockdown reason from the plan context. 137 138 Priority: 139 1. prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON] 140 2. PLAN_CONTEXT_LOCKDOWN_REASON (explicitly set) 141 3. Empty string as fallback 142 """ 143 prompt_data = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}) 144 if PLAN_CONTEXT_LOCKDOWN_REASON in prompt_data: 145 return prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON] 146 return self.executor.plan.context.get(PLAN_CONTEXT_LOCKDOWN_REASON, "") 147 148 def _apply_lockdown_actions(self, stage: AccountLockdownStage, user: User) -> None: 149 """Apply the configured account changes to the target user.""" 150 if stage.deactivate_user: 151 user.is_active = False 152 if stage.set_unusable_password: 153 user.set_unusable_password() 154 if stage.deactivate_user: 155 with sync_outgoing_inhibit_dispatch(): 156 user.save() 157 return 158 user.save() 159 160 def _sync_deactivated_user_to_outgoing_providers(self, user: User) -> None: 161 """Synchronize a deactivated user to outgoing sync providers.""" 162 messages = [] 163 wait_timeout = 0 164 model = class_to_path(User) 165 provider_filter = Q(backchannel_application__isnull=False) | Q(application__isnull=False) 166 167 for provider_model, task_sync_direct in get_outgoing_sync_tasks(): 168 for provider in provider_model.objects.filter(provider_filter): 169 time_limit = int( 170 timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000 171 ) 172 messages.append( 173 task_sync_direct.message_with_options( 174 args=(model, user.pk, provider.pk), 175 rel_obj=provider, 176 time_limit=time_limit, 177 uid=f"{provider.name}:user:{user.pk}:direct", 178 ) 179 ) 180 wait_timeout += time_limit 181 182 if not messages: 183 return 184 try: 185 group(messages).run().wait(timeout=wait_timeout) 186 except ResultTimeout: 187 self.logger.warning( 188 "Timed out waiting for outgoing sync tasks; tasks remain queued", 189 user=user.username, 190 timeout=wait_timeout, 191 ) 192 193 def _get_lockdown_artifact_querysets( 194 self, stage: AccountLockdownStage, user: User 195 ) -> tuple[QuerySet, ...]: 196 """Return the configured sessions and tokens targeted by lockdown.""" 197 querysets: list[QuerySet] = [] 198 if stage.delete_sessions: 199 querysets.append(Session.objects.filter(authenticatedsession__user=user)) 200 if stage.revoke_tokens: 201 querysets.extend( 202 get_lockdown_token_queryset(model, user) for model in get_lockdown_token_models() 203 ) 204 return tuple(querysets) 205 206 def _delete_lockdown_artifacts(self, stage: AccountLockdownStage, user: User) -> None: 207 """Delete sessions and tokens selected by the lockdown configuration.""" 208 for queryset in self._get_lockdown_artifact_querysets(stage, user): 209 queryset.delete() 210 211 def _has_lockdown_artifacts(self, stage: AccountLockdownStage, user: User) -> bool: 212 """Check whether there are still sessions or tokens to remove.""" 213 return any( 214 queryset.exists() for queryset in self._get_lockdown_artifact_querysets(stage, user) 215 ) 216 217 def _emit_lockdown_event(self, request: HttpRequest, user: User, reason: str) -> None: 218 """Emit the audit event for a completed lockdown.""" 219 # Emit the audit event after the transaction commits. If event creation 220 # fails here, dispatch() would otherwise treat the whole lockdown as 221 # failed even though the account changes have already been committed. 222 try: 223 Event.new( 224 EventAction.USER_WRITE, 225 action_id=LOCKDOWN_EVENT_ACTION_ID, 226 reason=reason, 227 affected_user=user.username, 228 ).from_http(request) 229 except Exception as exc: # noqa: BLE001 230 # Event emission should not make the lockdown itself fail. 231 self.logger.warning( 232 "Failed to emit account lockdown event", 233 user=user.username, 234 exc=exc, 235 ) 236 237 def _lockdown_user( 238 self, 239 request: HttpRequest, 240 stage: AccountLockdownStage, 241 user: User, 242 reason: str, 243 ) -> None: 244 """Execute lockdown actions on a single user.""" 245 with atomic(): 246 user = User.objects.get(pk=user.pk) 247 self._apply_lockdown_actions(stage, user) 248 self._delete_lockdown_artifacts(stage, user) 249 250 # These additional checks/deletes are done to prevent a timing attack that creates tokens 251 # with a compromised token that is simultaneously being deleted. 252 while self._has_lockdown_artifacts(stage, user): 253 with atomic(): 254 self._delete_lockdown_artifacts(stage, user) 255 256 if stage.deactivate_user: 257 try: 258 self._sync_deactivated_user_to_outgoing_providers(user) 259 except Exception as exc: # noqa: BLE001 260 # Local lockdown has already committed. Provider sync failures 261 # must not reopen access or mark the lockdown itself as failed. 262 self.logger.warning( 263 "Failed to sync account lockdown deactivation to outgoing providers", 264 user=user.username, 265 exc=exc, 266 ) 267 self._emit_lockdown_event(request, user, reason) 268 269 def dispatch(self, request: HttpRequest) -> HttpResponse: 270 """Execute account lockdown actions.""" 271 self.request = request 272 stage: AccountLockdownStage = self.executor.current_stage 273 274 pending_user = self.get_pending_user() 275 if not pending_user.is_authenticated: 276 self.logger.warning("No target user found for account lockdown") 277 return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE) 278 user = get_lockdown_target_users().filter(pk=pending_user.pk).first() 279 if user is None: 280 self.logger.warning("Target user is not eligible for account lockdown") 281 return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE) 282 if not can_lock_user(request.user, user): 283 self.logger.warning( 284 "Permission denied for account lockdown", 285 actor=getattr(request.user, "username", None), 286 target=user.username, 287 ) 288 return self.executor.stage_invalid(PERMISSION_DENIED_MESSAGE) 289 290 reason = self.get_reason() 291 self_service = self.is_self_service(request, user) 292 if self_service and stage.delete_sessions and not stage.self_service_completion_flow: 293 self.logger.warning("No completion flow configured for self-service account lockdown") 294 return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE) 295 296 self.logger.info( 297 "Executing account lockdown", 298 user=user.username, 299 reason=reason, 300 self_service=self_service, 301 deactivate_user=stage.deactivate_user, 302 set_unusable_password=stage.set_unusable_password, 303 delete_sessions=stage.delete_sessions, 304 revoke_tokens=stage.revoke_tokens, 305 ) 306 307 try: 308 self._lockdown_user(request, stage, user, reason) 309 self.logger.info("Account lockdown completed", user=user.username) 310 except Exception as exc: # noqa: BLE001 311 # Convert unexpected lockdown errors to a flow-stage failure instead 312 # of leaking an exception through the flow executor. 313 self.logger.warning("Account lockdown failed", user=user.username, exc=exc) 314 return self.executor.stage_invalid(ACCOUNT_LOCKDOWN_FAILED_MESSAGE) 315 316 if self_service: 317 if stage.delete_sessions: 318 return self._self_service_completion_response(request) 319 return self.executor.stage_ok() 320 321 return self.executor.stage_ok() 322 323 def _self_service_completion_response(self, request: HttpRequest) -> HttpResponse: 324 """Redirect to completion flow after self-service lockdown. 325 326 Since all sessions are deleted, the user cannot continue in the flow. 327 Redirect them to an unauthenticated completion flow that shows the 328 lockdown message. 329 330 We use a direct HTTP redirect instead of a challenge because the 331 flow executor's challenge handling may try to access the session 332 which we just deleted. 333 """ 334 stage: AccountLockdownStage = self.executor.current_stage 335 completion_flow = stage.self_service_completion_flow 336 if completion_flow: 337 # Flush the current request's session to prevent Django's session 338 # middleware from trying to save a deleted session 339 if hasattr(request, "session"): 340 request.session.flush() 341 redirect_to = reverse( 342 "authentik_core:if-flow", 343 kwargs={"flow_slug": completion_flow.slug}, 344 ) 345 return HttpResponseRedirect(redirect_to) 346 return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE)
Execute account lockdown actions on the target user.
def
is_self_service( self, request: django.http.request.HttpRequest, user: authentik.core.models.User) -> bool:
131 def is_self_service(self, request: HttpRequest, user: User) -> bool: 132 """Check whether the currently authenticated user is locking their own account.""" 133 return request.user.is_authenticated and user.pk == request.user.pk
Check whether the currently authenticated user is locking their own account.
def
get_reason(self) -> str:
135 def get_reason(self) -> str: 136 """Get the lockdown reason from the plan context. 137 138 Priority: 139 1. prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON] 140 2. PLAN_CONTEXT_LOCKDOWN_REASON (explicitly set) 141 3. Empty string as fallback 142 """ 143 prompt_data = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}) 144 if PLAN_CONTEXT_LOCKDOWN_REASON in prompt_data: 145 return prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON] 146 return self.executor.plan.context.get(PLAN_CONTEXT_LOCKDOWN_REASON, "")
Get the lockdown reason from the plan context.
Priority:
- prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON]
- PLAN_CONTEXT_LOCKDOWN_REASON (explicitly set)
- Empty string as fallback
def
dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
269 def dispatch(self, request: HttpRequest) -> HttpResponse: 270 """Execute account lockdown actions.""" 271 self.request = request 272 stage: AccountLockdownStage = self.executor.current_stage 273 274 pending_user = self.get_pending_user() 275 if not pending_user.is_authenticated: 276 self.logger.warning("No target user found for account lockdown") 277 return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE) 278 user = get_lockdown_target_users().filter(pk=pending_user.pk).first() 279 if user is None: 280 self.logger.warning("Target user is not eligible for account lockdown") 281 return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE) 282 if not can_lock_user(request.user, user): 283 self.logger.warning( 284 "Permission denied for account lockdown", 285 actor=getattr(request.user, "username", None), 286 target=user.username, 287 ) 288 return self.executor.stage_invalid(PERMISSION_DENIED_MESSAGE) 289 290 reason = self.get_reason() 291 self_service = self.is_self_service(request, user) 292 if self_service and stage.delete_sessions and not stage.self_service_completion_flow: 293 self.logger.warning("No completion flow configured for self-service account lockdown") 294 return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE) 295 296 self.logger.info( 297 "Executing account lockdown", 298 user=user.username, 299 reason=reason, 300 self_service=self_service, 301 deactivate_user=stage.deactivate_user, 302 set_unusable_password=stage.set_unusable_password, 303 delete_sessions=stage.delete_sessions, 304 revoke_tokens=stage.revoke_tokens, 305 ) 306 307 try: 308 self._lockdown_user(request, stage, user, reason) 309 self.logger.info("Account lockdown completed", user=user.username) 310 except Exception as exc: # noqa: BLE001 311 # Convert unexpected lockdown errors to a flow-stage failure instead 312 # of leaking an exception through the flow executor. 313 self.logger.warning("Account lockdown failed", user=user.username, exc=exc) 314 return self.executor.stage_invalid(ACCOUNT_LOCKDOWN_FAILED_MESSAGE) 315 316 if self_service: 317 if stage.delete_sessions: 318 return self._self_service_completion_response(request) 319 return self.executor.stage_ok() 320 321 return self.executor.stage_ok()
Execute account lockdown actions.