authentik.enterprise.stages.account_lockdown.stage

Account lockdown stage logic

  1"""Account lockdown stage logic"""
  2
  3from django.apps import apps
  4from django.core.exceptions import FieldDoesNotExist
  5from django.db.models import Model, QuerySet
  6from django.db.models.query_utils import Q
  7from django.db.transaction import atomic
  8from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
  9from django.urls import reverse
 10from django.utils.translation import gettext_lazy as _
 11from dramatiq.actor import Actor
 12from dramatiq.composition import group
 13from dramatiq.results.errors import ResultTimeout
 14
 15from authentik.core.models import (
 16    AuthenticatedSession,
 17    ExpiringModel,
 18    Session,
 19    Token,
 20    User,
 21    UserTypes,
 22)
 23from authentik.enterprise.stages.account_lockdown.models import AccountLockdownStage
 24from authentik.events.models import Event, EventAction
 25from authentik.flows.stage import StageView
 26from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
 27from authentik.lib.sync.outgoing.signals import sync_outgoing_inhibit_dispatch
 28from authentik.lib.utils.reflection import class_to_path
 29from authentik.lib.utils.time import timedelta_from_string
 30from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 31
 32PLAN_CONTEXT_LOCKDOWN_REASON = "lockdown_reason"
 33LOCKDOWN_EVENT_ACTION_ID = "account_lockdown"
 34
 35TARGET_REQUIRED_MESSAGE = _("No target user specified for account lockdown")
 36PERMISSION_DENIED_MESSAGE = _("You do not have permission to lock down this account.")
 37ACCOUNT_LOCKDOWN_FAILED_MESSAGE = _("Account lockdown failed for this account.")
 38SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE = _(
 39    "Self-service account lockdown requires a completion flow."
 40)
 41
 42
 43def get_lockdown_target_users() -> QuerySet[User]:
 44    """Return users that can be targeted by account lockdown."""
 45    return User.objects.exclude_anonymous().exclude(type=UserTypes.INTERNAL_SERVICE_ACCOUNT)
 46
 47
 48def _get_model_field(model: type[Model], field_name: str):
 49    """Get a model field by name, if present."""
 50    try:
 51        return model._meta.get_field(field_name)
 52    except FieldDoesNotExist:
 53        return None
 54
 55
 56def _has_user_field(model: type[Model]) -> bool:
 57    """Check if a model has a direct user foreign key."""
 58    field = _get_model_field(model, "user")
 59    return bool(field and getattr(field, "remote_field", None) and field.remote_field.model is User)
 60
 61
 62def _has_authenticated_session_field(model: type[Model]) -> bool:
 63    """Check if a model is linked to an authenticated session."""
 64    field = _get_model_field(model, "session")
 65    return bool(
 66        field
 67        and getattr(field, "remote_field", None)
 68        and field.remote_field.model is AuthenticatedSession
 69    )
 70
 71
 72def _has_provider_field(model: type[Model]) -> bool:
 73    """Check if a model is linked to a provider."""
 74    return _get_model_field(model, "provider") is not None
 75
 76
 77def get_lockdown_token_models() -> tuple[type[Model], ...]:
 78    """Return token, grant, and provider session models removed by account lockdown."""
 79    token_models: list[type[Model]] = []
 80    for model in apps.get_models():
 81        if model._meta.abstract or not issubclass(model, ExpiringModel):
 82            continue
 83        if model is Token:
 84            token_models.append(model)
 85        elif _has_user_field(model) and (
 86            _has_provider_field(model) or _has_authenticated_session_field(model)
 87        ):
 88            token_models.append(model)
 89        elif _has_authenticated_session_field(model):
 90            token_models.append(model)
 91    return tuple(token_models)
 92
 93
 94def get_lockdown_token_queryset(model: type[Model], user: User) -> QuerySet:
 95    """Return account lockdown artifacts for a model and user."""
 96    manager = model.objects.including_expired()
 97    if _has_user_field(model):
 98        return manager.filter(user=user)
 99    return manager.filter(session__user=user)
100
101
102def can_lock_user(actor, user: User) -> bool:
103    """Check whether the actor may lock the target user."""
104    if not actor.is_authenticated:
105        return False
106    if user.pk == actor.pk:
107        return True
108    return actor.has_perm("authentik_core.change_user", user)
109
110
111def get_outgoing_sync_tasks() -> tuple[tuple[type[OutgoingSyncProvider], Actor], ...]:
112    """Return outgoing sync provider types and their direct sync tasks."""
113    from authentik.enterprise.providers.google_workspace.models import GoogleWorkspaceProvider
114    from authentik.enterprise.providers.google_workspace.tasks import google_workspace_sync_direct
115    from authentik.enterprise.providers.microsoft_entra.models import MicrosoftEntraProvider
116    from authentik.enterprise.providers.microsoft_entra.tasks import microsoft_entra_sync_direct
117    from authentik.providers.scim.models import SCIMProvider
118    from authentik.providers.scim.tasks import scim_sync_direct
119
120    return (
121        (SCIMProvider, scim_sync_direct),
122        (GoogleWorkspaceProvider, google_workspace_sync_direct),
123        (MicrosoftEntraProvider, microsoft_entra_sync_direct),
124    )
125
126
127class AccountLockdownStageView(StageView):
128    """Execute account lockdown actions on the target user."""
129
130    def is_self_service(self, request: HttpRequest, user: User) -> bool:
131        """Check whether the currently authenticated user is locking their own account."""
132        return request.user.is_authenticated and user.pk == request.user.pk
133
134    def get_reason(self) -> str:
135        """Get the lockdown reason from the plan context.
136
137        Priority:
138        1. prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON]
139        2. PLAN_CONTEXT_LOCKDOWN_REASON (explicitly set)
140        3. Empty string as fallback
141        """
142        prompt_data = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})
143        if PLAN_CONTEXT_LOCKDOWN_REASON in prompt_data:
144            return prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON]
145        return self.executor.plan.context.get(PLAN_CONTEXT_LOCKDOWN_REASON, "")
146
147    def _apply_lockdown_actions(self, stage: AccountLockdownStage, user: User) -> None:
148        """Apply the configured account changes to the target user."""
149        if stage.deactivate_user:
150            user.is_active = False
151        if stage.set_unusable_password:
152            user.set_unusable_password()
153        if stage.deactivate_user:
154            with sync_outgoing_inhibit_dispatch():
155                user.save()
156            return
157        user.save()
158
159    def _sync_deactivated_user_to_outgoing_providers(self, user: User) -> None:
160        """Synchronize a deactivated user to outgoing sync providers."""
161        messages = []
162        wait_timeout = 0
163        model = class_to_path(User)
164        provider_filter = Q(backchannel_application__isnull=False) | Q(application__isnull=False)
165
166        for provider_model, task_sync_direct in get_outgoing_sync_tasks():
167            for provider in provider_model.objects.filter(provider_filter):
168                time_limit = int(
169                    timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000
170                )
171                messages.append(
172                    task_sync_direct.message_with_options(
173                        args=(model, user.pk, provider.pk),
174                        rel_obj=provider,
175                        time_limit=time_limit,
176                        uid=f"{provider.name}:user:{user.pk}:direct",
177                    )
178                )
179                wait_timeout += time_limit
180
181        if not messages:
182            return
183        try:
184            group(messages).run().wait(timeout=wait_timeout)
185        except ResultTimeout:
186            self.logger.warning(
187                "Timed out waiting for outgoing sync tasks; tasks remain queued",
188                user=user.username,
189                timeout=wait_timeout,
190            )
191
192    def _get_lockdown_artifact_querysets(
193        self, stage: AccountLockdownStage, user: User
194    ) -> tuple[QuerySet, ...]:
195        """Return the configured sessions and tokens targeted by lockdown."""
196        querysets: list[QuerySet] = []
197        if stage.delete_sessions:
198            querysets.append(Session.objects.filter(authenticatedsession__user=user))
199        if stage.revoke_tokens:
200            querysets.extend(
201                get_lockdown_token_queryset(model, user) for model in get_lockdown_token_models()
202            )
203        return tuple(querysets)
204
205    def _delete_lockdown_artifacts(self, stage: AccountLockdownStage, user: User) -> None:
206        """Delete sessions and tokens selected by the lockdown configuration."""
207        for queryset in self._get_lockdown_artifact_querysets(stage, user):
208            queryset.delete()
209
210    def _has_lockdown_artifacts(self, stage: AccountLockdownStage, user: User) -> bool:
211        """Check whether there are still sessions or tokens to remove."""
212        return any(
213            queryset.exists() for queryset in self._get_lockdown_artifact_querysets(stage, user)
214        )
215
216    def _emit_lockdown_event(self, request: HttpRequest, user: User, reason: str) -> None:
217        """Emit the audit event for a completed lockdown."""
218        # Emit the audit event after the transaction commits. If event creation
219        # fails here, dispatch() would otherwise treat the whole lockdown as
220        # failed even though the account changes have already been committed.
221        try:
222            Event.new(
223                EventAction.USER_WRITE,
224                action_id=LOCKDOWN_EVENT_ACTION_ID,
225                reason=reason,
226                affected_user=user.username,
227            ).from_http(request)
228        except Exception as exc:  # noqa: BLE001
229            # Event emission should not make the lockdown itself fail.
230            self.logger.warning(
231                "Failed to emit account lockdown event",
232                user=user.username,
233                exc=exc,
234            )
235
236    def _lockdown_user(
237        self,
238        request: HttpRequest,
239        stage: AccountLockdownStage,
240        user: User,
241        reason: str,
242    ) -> None:
243        """Execute lockdown actions on a single user."""
244        with atomic():
245            user = User.objects.get(pk=user.pk)
246            self._apply_lockdown_actions(stage, user)
247            self._delete_lockdown_artifacts(stage, user)
248
249        # These additional checks/deletes are done to prevent a timing attack that creates tokens
250        # with a compromised token that is simultaneously being deleted.
251        while self._has_lockdown_artifacts(stage, user):
252            with atomic():
253                self._delete_lockdown_artifacts(stage, user)
254
255        if stage.deactivate_user:
256            try:
257                self._sync_deactivated_user_to_outgoing_providers(user)
258            except Exception as exc:  # noqa: BLE001
259                # Local lockdown has already committed. Provider sync failures
260                # must not reopen access or mark the lockdown itself as failed.
261                self.logger.warning(
262                    "Failed to sync account lockdown deactivation to outgoing providers",
263                    user=user.username,
264                    exc=exc,
265                )
266        self._emit_lockdown_event(request, user, reason)
267
268    def dispatch(self, request: HttpRequest) -> HttpResponse:
269        """Execute account lockdown actions."""
270        self.request = request
271        stage: AccountLockdownStage = self.executor.current_stage
272
273        pending_user = self.get_pending_user()
274        if not pending_user.is_authenticated:
275            self.logger.warning("No target user found for account lockdown")
276            return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE)
277        user = get_lockdown_target_users().filter(pk=pending_user.pk).first()
278        if user is None:
279            self.logger.warning("Target user is not eligible for account lockdown")
280            return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE)
281        if not can_lock_user(request.user, user):
282            self.logger.warning(
283                "Permission denied for account lockdown",
284                actor=getattr(request.user, "username", None),
285                target=user.username,
286            )
287            return self.executor.stage_invalid(PERMISSION_DENIED_MESSAGE)
288
289        reason = self.get_reason()
290        self_service = self.is_self_service(request, user)
291        if self_service and stage.delete_sessions and not stage.self_service_completion_flow:
292            self.logger.warning("No completion flow configured for self-service account lockdown")
293            return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE)
294
295        self.logger.info(
296            "Executing account lockdown",
297            user=user.username,
298            reason=reason,
299            self_service=self_service,
300            deactivate_user=stage.deactivate_user,
301            set_unusable_password=stage.set_unusable_password,
302            delete_sessions=stage.delete_sessions,
303            revoke_tokens=stage.revoke_tokens,
304        )
305
306        try:
307            self._lockdown_user(request, stage, user, reason)
308            self.logger.info("Account lockdown completed", user=user.username)
309        except Exception as exc:  # noqa: BLE001
310            # Convert unexpected lockdown errors to a flow-stage failure instead
311            # of leaking an exception through the flow executor.
312            self.logger.warning("Account lockdown failed", user=user.username, exc=exc)
313            return self.executor.stage_invalid(ACCOUNT_LOCKDOWN_FAILED_MESSAGE)
314
315        if self_service:
316            if stage.delete_sessions:
317                return self._self_service_completion_response(request)
318            return self.executor.stage_ok()
319
320        return self.executor.stage_ok()
321
322    def _self_service_completion_response(self, request: HttpRequest) -> HttpResponse:
323        """Redirect to completion flow after self-service lockdown.
324
325        Since all sessions are deleted, the user cannot continue in the flow.
326        Redirect them to an unauthenticated completion flow that shows the
327        lockdown message.
328
329        We use a direct HTTP redirect instead of a challenge because the
330        flow executor's challenge handling may try to access the session
331        which we just deleted.
332        """
333        stage: AccountLockdownStage = self.executor.current_stage
334        completion_flow = stage.self_service_completion_flow
335        if completion_flow:
336            # Flush the current request's session to prevent Django's session
337            # middleware from trying to save a deleted session
338            if hasattr(request, "session"):
339                request.session.flush()
340            redirect_to = reverse(
341                "authentik_core:if-flow",
342                kwargs={"flow_slug": completion_flow.slug},
343            )
344            return HttpResponseRedirect(redirect_to)
345        return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE)
PLAN_CONTEXT_LOCKDOWN_REASON = 'lockdown_reason'
LOCKDOWN_EVENT_ACTION_ID = 'account_lockdown'
TARGET_REQUIRED_MESSAGE = 'No target user specified for account lockdown'
PERMISSION_DENIED_MESSAGE = 'You do not have permission to lock down this account.'
ACCOUNT_LOCKDOWN_FAILED_MESSAGE = 'Account lockdown failed for this account.'
SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE = 'Self-service account lockdown requires a completion flow.'
def get_lockdown_target_users() -> django.db.models.query.QuerySet:
44def get_lockdown_target_users() -> QuerySet[User]:
45    """Return users that can be targeted by account lockdown."""
46    return User.objects.exclude_anonymous().exclude(type=UserTypes.INTERNAL_SERVICE_ACCOUNT)

Return users that can be targeted by account lockdown.

def get_lockdown_token_models() -> tuple[type[django.db.models.base.Model], ...]:
78def get_lockdown_token_models() -> tuple[type[Model], ...]:
79    """Return token, grant, and provider session models removed by account lockdown."""
80    token_models: list[type[Model]] = []
81    for model in apps.get_models():
82        if model._meta.abstract or not issubclass(model, ExpiringModel):
83            continue
84        if model is Token:
85            token_models.append(model)
86        elif _has_user_field(model) and (
87            _has_provider_field(model) or _has_authenticated_session_field(model)
88        ):
89            token_models.append(model)
90        elif _has_authenticated_session_field(model):
91            token_models.append(model)
92    return tuple(token_models)

Return token, grant, and provider session models removed by account lockdown.

def get_lockdown_token_queryset( model: type[django.db.models.base.Model], user: authentik.core.models.User) -> django.db.models.query.QuerySet:
 95def get_lockdown_token_queryset(model: type[Model], user: User) -> QuerySet:
 96    """Return account lockdown artifacts for a model and user."""
 97    manager = model.objects.including_expired()
 98    if _has_user_field(model):
 99        return manager.filter(user=user)
100    return manager.filter(session__user=user)

Return account lockdown artifacts for a model and user.

def can_lock_user(actor, user: authentik.core.models.User) -> bool:
103def can_lock_user(actor, user: User) -> bool:
104    """Check whether the actor may lock the target user."""
105    if not actor.is_authenticated:
106        return False
107    if user.pk == actor.pk:
108        return True
109    return actor.has_perm("authentik_core.change_user", user)

Check whether the actor may lock the target user.

def get_outgoing_sync_tasks() -> tuple[tuple[type[authentik.lib.sync.outgoing.models.OutgoingSyncProvider], dramatiq.actor.Actor], ...]:
112def get_outgoing_sync_tasks() -> tuple[tuple[type[OutgoingSyncProvider], Actor], ...]:
113    """Return outgoing sync provider types and their direct sync tasks."""
114    from authentik.enterprise.providers.google_workspace.models import GoogleWorkspaceProvider
115    from authentik.enterprise.providers.google_workspace.tasks import google_workspace_sync_direct
116    from authentik.enterprise.providers.microsoft_entra.models import MicrosoftEntraProvider
117    from authentik.enterprise.providers.microsoft_entra.tasks import microsoft_entra_sync_direct
118    from authentik.providers.scim.models import SCIMProvider
119    from authentik.providers.scim.tasks import scim_sync_direct
120
121    return (
122        (SCIMProvider, scim_sync_direct),
123        (GoogleWorkspaceProvider, google_workspace_sync_direct),
124        (MicrosoftEntraProvider, microsoft_entra_sync_direct),
125    )

Return outgoing sync provider types and their direct sync tasks.

class AccountLockdownStageView(authentik.flows.stage.StageView):
128class AccountLockdownStageView(StageView):
129    """Execute account lockdown actions on the target user."""
130
131    def is_self_service(self, request: HttpRequest, user: User) -> bool:
132        """Check whether the currently authenticated user is locking their own account."""
133        return request.user.is_authenticated and user.pk == request.user.pk
134
135    def get_reason(self) -> str:
136        """Get the lockdown reason from the plan context.
137
138        Priority:
139        1. prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON]
140        2. PLAN_CONTEXT_LOCKDOWN_REASON (explicitly set)
141        3. Empty string as fallback
142        """
143        prompt_data = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})
144        if PLAN_CONTEXT_LOCKDOWN_REASON in prompt_data:
145            return prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON]
146        return self.executor.plan.context.get(PLAN_CONTEXT_LOCKDOWN_REASON, "")
147
148    def _apply_lockdown_actions(self, stage: AccountLockdownStage, user: User) -> None:
149        """Apply the configured account changes to the target user."""
150        if stage.deactivate_user:
151            user.is_active = False
152        if stage.set_unusable_password:
153            user.set_unusable_password()
154        if stage.deactivate_user:
155            with sync_outgoing_inhibit_dispatch():
156                user.save()
157            return
158        user.save()
159
160    def _sync_deactivated_user_to_outgoing_providers(self, user: User) -> None:
161        """Synchronize a deactivated user to outgoing sync providers."""
162        messages = []
163        wait_timeout = 0
164        model = class_to_path(User)
165        provider_filter = Q(backchannel_application__isnull=False) | Q(application__isnull=False)
166
167        for provider_model, task_sync_direct in get_outgoing_sync_tasks():
168            for provider in provider_model.objects.filter(provider_filter):
169                time_limit = int(
170                    timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000
171                )
172                messages.append(
173                    task_sync_direct.message_with_options(
174                        args=(model, user.pk, provider.pk),
175                        rel_obj=provider,
176                        time_limit=time_limit,
177                        uid=f"{provider.name}:user:{user.pk}:direct",
178                    )
179                )
180                wait_timeout += time_limit
181
182        if not messages:
183            return
184        try:
185            group(messages).run().wait(timeout=wait_timeout)
186        except ResultTimeout:
187            self.logger.warning(
188                "Timed out waiting for outgoing sync tasks; tasks remain queued",
189                user=user.username,
190                timeout=wait_timeout,
191            )
192
193    def _get_lockdown_artifact_querysets(
194        self, stage: AccountLockdownStage, user: User
195    ) -> tuple[QuerySet, ...]:
196        """Return the configured sessions and tokens targeted by lockdown."""
197        querysets: list[QuerySet] = []
198        if stage.delete_sessions:
199            querysets.append(Session.objects.filter(authenticatedsession__user=user))
200        if stage.revoke_tokens:
201            querysets.extend(
202                get_lockdown_token_queryset(model, user) for model in get_lockdown_token_models()
203            )
204        return tuple(querysets)
205
206    def _delete_lockdown_artifacts(self, stage: AccountLockdownStage, user: User) -> None:
207        """Delete sessions and tokens selected by the lockdown configuration."""
208        for queryset in self._get_lockdown_artifact_querysets(stage, user):
209            queryset.delete()
210
211    def _has_lockdown_artifacts(self, stage: AccountLockdownStage, user: User) -> bool:
212        """Check whether there are still sessions or tokens to remove."""
213        return any(
214            queryset.exists() for queryset in self._get_lockdown_artifact_querysets(stage, user)
215        )
216
217    def _emit_lockdown_event(self, request: HttpRequest, user: User, reason: str) -> None:
218        """Emit the audit event for a completed lockdown."""
219        # Emit the audit event after the transaction commits. If event creation
220        # fails here, dispatch() would otherwise treat the whole lockdown as
221        # failed even though the account changes have already been committed.
222        try:
223            Event.new(
224                EventAction.USER_WRITE,
225                action_id=LOCKDOWN_EVENT_ACTION_ID,
226                reason=reason,
227                affected_user=user.username,
228            ).from_http(request)
229        except Exception as exc:  # noqa: BLE001
230            # Event emission should not make the lockdown itself fail.
231            self.logger.warning(
232                "Failed to emit account lockdown event",
233                user=user.username,
234                exc=exc,
235            )
236
237    def _lockdown_user(
238        self,
239        request: HttpRequest,
240        stage: AccountLockdownStage,
241        user: User,
242        reason: str,
243    ) -> None:
244        """Execute lockdown actions on a single user."""
245        with atomic():
246            user = User.objects.get(pk=user.pk)
247            self._apply_lockdown_actions(stage, user)
248            self._delete_lockdown_artifacts(stage, user)
249
250        # These additional checks/deletes are done to prevent a timing attack that creates tokens
251        # with a compromised token that is simultaneously being deleted.
252        while self._has_lockdown_artifacts(stage, user):
253            with atomic():
254                self._delete_lockdown_artifacts(stage, user)
255
256        if stage.deactivate_user:
257            try:
258                self._sync_deactivated_user_to_outgoing_providers(user)
259            except Exception as exc:  # noqa: BLE001
260                # Local lockdown has already committed. Provider sync failures
261                # must not reopen access or mark the lockdown itself as failed.
262                self.logger.warning(
263                    "Failed to sync account lockdown deactivation to outgoing providers",
264                    user=user.username,
265                    exc=exc,
266                )
267        self._emit_lockdown_event(request, user, reason)
268
269    def dispatch(self, request: HttpRequest) -> HttpResponse:
270        """Execute account lockdown actions."""
271        self.request = request
272        stage: AccountLockdownStage = self.executor.current_stage
273
274        pending_user = self.get_pending_user()
275        if not pending_user.is_authenticated:
276            self.logger.warning("No target user found for account lockdown")
277            return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE)
278        user = get_lockdown_target_users().filter(pk=pending_user.pk).first()
279        if user is None:
280            self.logger.warning("Target user is not eligible for account lockdown")
281            return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE)
282        if not can_lock_user(request.user, user):
283            self.logger.warning(
284                "Permission denied for account lockdown",
285                actor=getattr(request.user, "username", None),
286                target=user.username,
287            )
288            return self.executor.stage_invalid(PERMISSION_DENIED_MESSAGE)
289
290        reason = self.get_reason()
291        self_service = self.is_self_service(request, user)
292        if self_service and stage.delete_sessions and not stage.self_service_completion_flow:
293            self.logger.warning("No completion flow configured for self-service account lockdown")
294            return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE)
295
296        self.logger.info(
297            "Executing account lockdown",
298            user=user.username,
299            reason=reason,
300            self_service=self_service,
301            deactivate_user=stage.deactivate_user,
302            set_unusable_password=stage.set_unusable_password,
303            delete_sessions=stage.delete_sessions,
304            revoke_tokens=stage.revoke_tokens,
305        )
306
307        try:
308            self._lockdown_user(request, stage, user, reason)
309            self.logger.info("Account lockdown completed", user=user.username)
310        except Exception as exc:  # noqa: BLE001
311            # Convert unexpected lockdown errors to a flow-stage failure instead
312            # of leaking an exception through the flow executor.
313            self.logger.warning("Account lockdown failed", user=user.username, exc=exc)
314            return self.executor.stage_invalid(ACCOUNT_LOCKDOWN_FAILED_MESSAGE)
315
316        if self_service:
317            if stage.delete_sessions:
318                return self._self_service_completion_response(request)
319            return self.executor.stage_ok()
320
321        return self.executor.stage_ok()
322
323    def _self_service_completion_response(self, request: HttpRequest) -> HttpResponse:
324        """Redirect to completion flow after self-service lockdown.
325
326        Since all sessions are deleted, the user cannot continue in the flow.
327        Redirect them to an unauthenticated completion flow that shows the
328        lockdown message.
329
330        We use a direct HTTP redirect instead of a challenge because the
331        flow executor's challenge handling may try to access the session
332        which we just deleted.
333        """
334        stage: AccountLockdownStage = self.executor.current_stage
335        completion_flow = stage.self_service_completion_flow
336        if completion_flow:
337            # Flush the current request's session to prevent Django's session
338            # middleware from trying to save a deleted session
339            if hasattr(request, "session"):
340                request.session.flush()
341            redirect_to = reverse(
342                "authentik_core:if-flow",
343                kwargs={"flow_slug": completion_flow.slug},
344            )
345            return HttpResponseRedirect(redirect_to)
346        return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE)

Execute account lockdown actions on the target user.

def is_self_service( self, request: django.http.request.HttpRequest, user: authentik.core.models.User) -> bool:
131    def is_self_service(self, request: HttpRequest, user: User) -> bool:
132        """Check whether the currently authenticated user is locking their own account."""
133        return request.user.is_authenticated and user.pk == request.user.pk

Check whether the currently authenticated user is locking their own account.

def get_reason(self) -> str:
135    def get_reason(self) -> str:
136        """Get the lockdown reason from the plan context.
137
138        Priority:
139        1. prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON]
140        2. PLAN_CONTEXT_LOCKDOWN_REASON (explicitly set)
141        3. Empty string as fallback
142        """
143        prompt_data = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})
144        if PLAN_CONTEXT_LOCKDOWN_REASON in prompt_data:
145            return prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON]
146        return self.executor.plan.context.get(PLAN_CONTEXT_LOCKDOWN_REASON, "")

Get the lockdown reason from the plan context.

Priority:

  1. prompt_data[PLAN_CONTEXT_LOCKDOWN_REASON]
  2. PLAN_CONTEXT_LOCKDOWN_REASON (explicitly set)
  3. Empty string as fallback
def dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
269    def dispatch(self, request: HttpRequest) -> HttpResponse:
270        """Execute account lockdown actions."""
271        self.request = request
272        stage: AccountLockdownStage = self.executor.current_stage
273
274        pending_user = self.get_pending_user()
275        if not pending_user.is_authenticated:
276            self.logger.warning("No target user found for account lockdown")
277            return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE)
278        user = get_lockdown_target_users().filter(pk=pending_user.pk).first()
279        if user is None:
280            self.logger.warning("Target user is not eligible for account lockdown")
281            return self.executor.stage_invalid(TARGET_REQUIRED_MESSAGE)
282        if not can_lock_user(request.user, user):
283            self.logger.warning(
284                "Permission denied for account lockdown",
285                actor=getattr(request.user, "username", None),
286                target=user.username,
287            )
288            return self.executor.stage_invalid(PERMISSION_DENIED_MESSAGE)
289
290        reason = self.get_reason()
291        self_service = self.is_self_service(request, user)
292        if self_service and stage.delete_sessions and not stage.self_service_completion_flow:
293            self.logger.warning("No completion flow configured for self-service account lockdown")
294            return self.executor.stage_invalid(SELF_SERVICE_COMPLETION_FLOW_REQUIRED_MESSAGE)
295
296        self.logger.info(
297            "Executing account lockdown",
298            user=user.username,
299            reason=reason,
300            self_service=self_service,
301            deactivate_user=stage.deactivate_user,
302            set_unusable_password=stage.set_unusable_password,
303            delete_sessions=stage.delete_sessions,
304            revoke_tokens=stage.revoke_tokens,
305        )
306
307        try:
308            self._lockdown_user(request, stage, user, reason)
309            self.logger.info("Account lockdown completed", user=user.username)
310        except Exception as exc:  # noqa: BLE001
311            # Convert unexpected lockdown errors to a flow-stage failure instead
312            # of leaking an exception through the flow executor.
313            self.logger.warning("Account lockdown failed", user=user.username, exc=exc)
314            return self.executor.stage_invalid(ACCOUNT_LOCKDOWN_FAILED_MESSAGE)
315
316        if self_service:
317            if stage.delete_sessions:
318                return self._self_service_completion_response(request)
319            return self.executor.stage_ok()
320
321        return self.executor.stage_ok()

Execute account lockdown actions.