authentik.enterprise.stages.account_lockdown.tests.test_stage

Account lockdown stage tests

  1"""Account lockdown stage tests"""
  2
  3import json
  4from dataclasses import asdict
  5from threading import Event as ThreadEvent
  6from threading import Thread
  7from types import SimpleNamespace
  8from unittest.mock import MagicMock, patch
  9
 10from django.db import connection
 11from django.http import HttpResponse
 12from django.test import TransactionTestCase
 13from django.urls import reverse
 14from django.utils import timezone
 15from dramatiq.results.errors import ResultTimeout
 16
 17from authentik.core.models import AuthenticatedSession, Session, Token, TokenIntents
 18from authentik.core.tests.utils import (
 19    RequestFactory,
 20    create_test_admin_user,
 21    create_test_cert,
 22    create_test_flow,
 23    create_test_user,
 24)
 25from authentik.enterprise.stages.account_lockdown.models import AccountLockdownStage
 26from authentik.enterprise.stages.account_lockdown.stage import (
 27    LOCKDOWN_EVENT_ACTION_ID,
 28    PLAN_CONTEXT_LOCKDOWN_REASON,
 29    AccountLockdownStageView,
 30    can_lock_user,
 31)
 32from authentik.events.models import Event, EventAction
 33from authentik.flows.markers import StageMarker
 34from authentik.flows.models import FlowDesignation, FlowStageBinding
 35from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 36from authentik.flows.tests import FlowTestCase
 37from authentik.lib.generators import generate_id
 38from authentik.lib.utils.reflection import class_to_path
 39from authentik.providers.oauth2.id_token import IDToken
 40from authentik.providers.oauth2.models import (
 41    AccessToken,
 42    AuthorizationCode,
 43    DeviceToken,
 44    OAuth2Provider,
 45    RedirectURI,
 46    RedirectURIMatchingMode,
 47    RefreshToken,
 48)
 49from authentik.providers.saml.models import SAMLProvider, SAMLSession
 50from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 51
 52patch_enterprise_enabled = patch(
 53    "authentik.enterprise.apps.AuthentikEnterpriseConfig.check_enabled",
 54    return_value=True,
 55)
 56
 57
 58class AccountLockdownStageTestMixin:
 59    """Shared setup helpers for account lockdown stage tests."""
 60
 61    @classmethod
 62    def setUpClass(cls):
 63        cls.patch_enterprise_enabled = patch_enterprise_enabled.start()
 64        cls.patch_event_dispatch = patch("authentik.events.tasks.event_trigger_dispatch.send")
 65        cls.patch_event_dispatch.start()
 66        super().setUpClass()
 67
 68    @classmethod
 69    def tearDownClass(cls):
 70        cls.patch_event_dispatch.stop()
 71        patch_enterprise_enabled.stop()
 72        super().tearDownClass()
 73
 74    def setUp(self):
 75        super().setUp()
 76        self.user = create_test_admin_user()
 77        self.target_user = create_test_admin_user()
 78        self.flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION)
 79        self.stage = AccountLockdownStage.objects.create(
 80            name="lockdown",
 81        )
 82        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 83        self.request_factory = RequestFactory()
 84
 85    def make_stage_view(self, plan: FlowPlan):
 86        def _stage_ok():
 87            return HttpResponse(status=204)
 88
 89        def _stage_invalid(_error_message=None):
 90            return HttpResponse(status=400)
 91
 92        return AccountLockdownStageView(
 93            SimpleNamespace(
 94                plan=plan,
 95                current_stage=self.stage,
 96                current_binding=self.binding,
 97                flow=self.flow,
 98                stage_ok=_stage_ok,
 99                stage_invalid=_stage_invalid,
100            )
101        )
102
103    def make_request(self, *, user=None, query=None):
104        return self.request_factory.post(
105            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
106            query_params=query or {},
107            user=user,
108        )
109
110    def get_lockdown_event(self):
111        """Return the account-lockdown user-write event."""
112        return Event.objects.filter(
113            action=EventAction.USER_WRITE,
114            context__action_id=LOCKDOWN_EVENT_ACTION_ID,
115        ).first()
116
117
118class TestAccountLockdownStage(AccountLockdownStageTestMixin, FlowTestCase):
119    """Account lockdown stage tests"""
120
121    def test_lockdown_no_target(self):
122        """Test lockdown stage with no pending user fails"""
123        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
124        view = self.make_stage_view(plan)
125
126        response = view.dispatch(self.make_request())
127
128        self.assertEqual(response.status_code, 400)
129
130    def test_lockdown_with_pending_user(self):
131        """Test lockdown stage with a pending target user."""
132        self.target_user.is_active = True
133        self.target_user.save()
134
135        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
136        plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Security incident"
137        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
138        view = self.make_stage_view(plan)
139        request = self.make_request(user=self.user)
140
141        self.assertTrue(can_lock_user(request.user, self.target_user))
142        response = view.dispatch(request)
143
144        self.target_user.refresh_from_db()
145        self.assertFalse(self.target_user.is_active)
146        self.assertFalse(self.target_user.has_usable_password())
147        self.assertEqual(response.status_code, 204)
148
149        # Check event was created
150        event = self.get_lockdown_event()
151        self.assertIsNotNone(event)
152        self.assertEqual(event.context["action_id"], LOCKDOWN_EVENT_ACTION_ID)
153        self.assertEqual(event.context["reason"], "Security incident")
154        self.assertEqual(event.context["affected_user"], self.target_user.username)
155
156    def test_lockdown_with_pending_user_reason(self):
157        """Test lockdown stage with a pending target and explicit reason."""
158        self.target_user.is_active = True
159        self.target_user.save()
160
161        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
162        plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Compromised account"
163        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
164        view = self.make_stage_view(plan)
165        request = self.make_request(user=self.user)
166
167        self.assertTrue(can_lock_user(request.user, self.target_user))
168        response = view.dispatch(request)
169
170        self.target_user.refresh_from_db()
171        self.assertFalse(self.target_user.is_active)
172        self.assertEqual(response.status_code, 204)
173
174    def test_lockdown_reason_from_prompt(self):
175        """Test lockdown stage reads the reason from prompt data."""
176        self.target_user.is_active = True
177        self.target_user.save()
178
179        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
180        plan.context[PLAN_CONTEXT_PROMPT] = {
181            PLAN_CONTEXT_LOCKDOWN_REASON: "User requested lockdown",
182        }
183        view = self.make_stage_view(plan)
184        request = self.make_request(user=self.user)
185        view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
186
187        event = self.get_lockdown_event()
188        self.assertIsNotNone(event)
189        self.assertEqual(event.context["reason"], "User requested lockdown")
190
191    def test_lockdown_event_failure_does_not_fail_self_service(self):
192        """Test lockdown still succeeds when event emission fails."""
193        self.stage.delete_sessions = False
194        self.stage.save()
195
196        self.target_user.is_active = True
197        self.target_user.save()
198
199        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
200        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
201        view = self.make_stage_view(plan)
202        request = self.make_request(user=self.target_user)
203
204        original_event_new = Event.new
205
206        def _event_new_side_effect(action, *args, **kwargs):
207            if (
208                action == EventAction.USER_WRITE
209                and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID
210            ):
211                raise RuntimeError("simulated event failure")
212            return original_event_new(action, *args, **kwargs)
213
214        with patch(
215            "authentik.enterprise.stages.account_lockdown.stage.Event.new",
216            side_effect=_event_new_side_effect,
217        ):
218            view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
219
220        self.target_user.refresh_from_db()
221        self.assertFalse(self.target_user.is_active)
222
223    def test_dispatch_records_success_when_event_emission_fails(self):
224        """Test dispatch still completes if event emission fails."""
225        self.stage.delete_sessions = False
226        self.stage.save()
227
228        self.target_user.is_active = True
229        self.target_user.save()
230
231        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
232        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
233        view = self.make_stage_view(plan)
234        request = self.make_request(
235            user=self.target_user,
236        )
237
238        original_event_new = Event.new
239
240        def _event_new_side_effect(action, *args, **kwargs):
241            if (
242                action == EventAction.USER_WRITE
243                and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID
244            ):
245                raise RuntimeError("simulated event failure")
246            return original_event_new(action, *args, **kwargs)
247
248        with patch(
249            "authentik.enterprise.stages.account_lockdown.stage.Event.new",
250            side_effect=_event_new_side_effect,
251        ):
252            response = view.dispatch(request)
253
254        self.target_user.refresh_from_db()
255        self.assertFalse(self.target_user.is_active)
256        self.assertEqual(response.status_code, 204)
257
258    def test_lockdown_self_service_redirects_to_completion_flow(self):
259        """Test self-service lockdown redirects to completion flow when sessions are deleted."""
260        completion_flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION)
261        self.stage.self_service_completion_flow = completion_flow
262        self.stage.save()
263
264        self.target_user.is_active = True
265        self.target_user.save()
266
267        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
268        view = self.make_stage_view(plan)
269        request = self.make_request(user=self.target_user)
270        view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
271        response = view._self_service_completion_response(request)
272
273        self.assertEqual(response.status_code, 302)
274        self.assertEqual(
275            response.url,
276            reverse("authentik_core:if-flow", kwargs={"flow_slug": completion_flow.slug}),
277        )
278
279    def test_lockdown_self_service_requires_completion_flow(self):
280        """Test self-service lockdown fails before deleting sessions without a completion flow."""
281        self.stage.self_service_completion_flow = None
282        self.stage.save()
283
284        self.target_user.is_active = True
285        self.target_user.save()
286
287        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
288        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
289        view = self.make_stage_view(plan)
290        request = self.make_request(user=self.target_user)
291
292        response = view.dispatch(request)
293
294        self.assertEqual(response.status_code, 400)
295        self.target_user.refresh_from_db()
296        self.assertTrue(self.target_user.is_active)
297
298    def test_lockdown_denies_other_user_without_permission(self):
299        """Test lockdown stage rejects non-self requests without change_user permission."""
300        actor = create_test_user()
301
302        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
303        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
304        view = self.make_stage_view(plan)
305        request = self.make_request(user=actor)
306
307        self.assertFalse(can_lock_user(request.user, self.target_user))
308        response = view.dispatch(request)
309        self.assertEqual(response.status_code, 400)
310
311    def test_lockdown_revokes_tokens(self):
312        """Test lockdown stage revokes tokens"""
313        Token.objects.create(
314            user=self.target_user,
315            identifier="test-token",
316            intent=TokenIntents.INTENT_API,
317            key=generate_id(),
318            expiring=False,
319        )
320        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1)
321
322        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
323        view = self.make_stage_view(plan)
324        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
325
326        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)
327
328    def test_lockdown_revokes_provider_tokens(self):
329        """Test lockdown stage revokes provider tokens and sessions."""
330        oauth_provider = OAuth2Provider.objects.create(
331            name=generate_id(),
332            authorization_flow=create_test_flow(),
333            redirect_uris=[
334                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback")
335            ],
336            signing_key=create_test_cert(),
337        )
338        saml_provider = SAMLProvider.objects.create(
339            name=generate_id(),
340            authorization_flow=create_test_flow(),
341            acs_url="https://sp.example.com/acs",
342            issuer_override="https://idp.example.com",
343        )
344        session = Session.objects.create(
345            session_key=generate_id(),
346            expires=timezone.now() + timezone.timedelta(hours=1),
347            last_ip="127.0.0.1",
348        )
349        auth_session = AuthenticatedSession.objects.create(
350            session=session,
351            user=self.target_user,
352        )
353        grant_kwargs = {
354            "provider": oauth_provider,
355            "user": self.target_user,
356            "auth_time": timezone.now(),
357            "_scope": "openid profile",
358            "expiring": False,
359        }
360        token_kwargs = grant_kwargs | {"_id_token": json.dumps(asdict(IDToken("foo", "bar")))}
361        AuthorizationCode.objects.create(
362            code=generate_id(),
363            session=auth_session,
364            **grant_kwargs,
365        )
366        AccessToken.objects.create(
367            token=generate_id(),
368            session=auth_session,
369            **token_kwargs,
370        )
371        RefreshToken.objects.create(
372            token=generate_id(),
373            session=auth_session,
374            **token_kwargs,
375        )
376        DeviceToken.objects.create(
377            provider=oauth_provider,
378            user=self.target_user,
379            session=auth_session,
380            _scope="openid profile",
381            expiring=False,
382        )
383        SAMLSession.objects.create(
384            provider=saml_provider,
385            user=self.target_user,
386            session=auth_session,
387            session_index=generate_id(),
388            name_id=self.target_user.email,
389            expires=timezone.now() + timezone.timedelta(hours=1),
390            expiring=True,
391        )
392
393        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
394        view = self.make_stage_view(plan)
395        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
396
397        self.assertEqual(AuthorizationCode.objects.filter(user=self.target_user).count(), 0)
398        self.assertEqual(AccessToken.objects.filter(user=self.target_user).count(), 0)
399        self.assertEqual(RefreshToken.objects.filter(user=self.target_user).count(), 0)
400        self.assertEqual(DeviceToken.objects.filter(user=self.target_user).count(), 0)
401        self.assertEqual(SAMLSession.objects.filter(user=self.target_user).count(), 0)
402
403    def test_lockdown_selective_actions(self):
404        """Test lockdown stage with selective actions"""
405        self.stage.deactivate_user = True
406        self.stage.set_unusable_password = False
407        self.stage.delete_sessions = False
408        self.stage.revoke_tokens = False
409        self.stage.save()
410
411        self.target_user.is_active = True
412        self.target_user.set_password("testpassword")
413        self.target_user.save()
414
415        Token.objects.create(
416            user=self.target_user,
417            identifier="test-token",
418            intent=TokenIntents.INTENT_API,
419            key=generate_id(),
420            expiring=False,
421        )
422
423        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
424        view = self.make_stage_view(plan)
425        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
426
427        self.target_user.refresh_from_db()
428        # User should be deactivated
429        self.assertFalse(self.target_user.is_active)
430        # Password should still be usable
431        self.assertTrue(self.target_user.has_usable_password())
432        # Token should still exist
433        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1)
434
435    def test_lockdown_no_actions(self):
436        """Test lockdown stage with all actions disabled"""
437        self.stage.deactivate_user = False
438        self.stage.set_unusable_password = False
439        self.stage.delete_sessions = False
440        self.stage.revoke_tokens = False
441        self.stage.save()
442
443        self.target_user.is_active = True
444        self.target_user.set_password("testpassword")
445        self.target_user.save()
446
447        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
448        view = self.make_stage_view(plan)
449        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
450
451        self.target_user.refresh_from_db()
452        # User should still be active
453        self.assertTrue(self.target_user.is_active)
454        # Password should still be usable
455        self.assertTrue(self.target_user.has_usable_password())
456        # Event should still be created
457        event = self.get_lockdown_event()
458        self.assertIsNotNone(event)
459
460    def test_lockdown_deactivation_inhibits_signal_dispatch_until_after_commit(self):
461        """Test lockdown queues explicit outgoing syncs after the deactivation transaction."""
462        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
463        view = self.make_stage_view(plan)
464
465        with (
466            patch(
467                "authentik.enterprise.stages.account_lockdown.stage.sync_outgoing_inhibit_dispatch"
468            ) as inhibit,
469            patch.object(view, "_sync_deactivated_user_to_outgoing_providers") as sync_outgoing,
470        ):
471            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
472
473        inhibit.assert_called_once()
474        sync_outgoing.assert_called_once()
475        synced_user = sync_outgoing.call_args.args[0]
476        self.assertEqual(synced_user.pk, self.target_user.pk)
477        self.assertFalse(synced_user.is_active)
478
479    def test_lockdown_waits_for_direct_outgoing_provider_syncs(self):
480        """Test direct outgoing sync tasks are enqueued and waited on."""
481        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
482        view = self.make_stage_view(plan)
483        provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5")
484        task_sync_direct = MagicMock()
485        task_sync_direct.message_with_options.return_value = "direct-message"
486        provider_model = SimpleNamespace(
487            objects=SimpleNamespace(filter=MagicMock(return_value=[provider]))
488        )
489        task_group = MagicMock()
490
491        with (
492            patch(
493                "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks",
494                return_value=((provider_model, task_sync_direct),),
495            ),
496            patch(
497                "authentik.enterprise.stages.account_lockdown.stage.group",
498                return_value=task_group,
499            ) as task_group_cls,
500        ):
501            view._sync_deactivated_user_to_outgoing_providers(self.target_user)
502
503        task_sync_direct.message_with_options.assert_called_once_with(
504            args=(class_to_path(type(self.target_user)), self.target_user.pk, provider.pk),
505            rel_obj=provider,
506            time_limit=5000,
507            uid=f"{provider.name}:user:{self.target_user.pk}:direct",
508        )
509        task_group_cls.assert_called_once_with(["direct-message"])
510        task_group.run.return_value.wait.assert_called_once_with(timeout=5000)
511
512    def test_lockdown_outgoing_provider_sync_timeout_leaves_tasks_running(self):
513        """Test timeout while waiting for direct outgoing syncs does not fail lockdown."""
514        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
515        view = self.make_stage_view(plan)
516        provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5")
517        task_sync_direct = MagicMock()
518        task_sync_direct.message_with_options.return_value = "direct-message"
519        provider_model = SimpleNamespace(
520            objects=SimpleNamespace(filter=MagicMock(return_value=[provider]))
521        )
522        task_group = MagicMock()
523        task_group.run.return_value.wait.side_effect = ResultTimeout("timed out")
524
525        with (
526            patch(
527                "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks",
528                return_value=((provider_model, task_sync_direct),),
529            ),
530            patch(
531                "authentik.enterprise.stages.account_lockdown.stage.group",
532                return_value=task_group,
533            ),
534        ):
535            view._sync_deactivated_user_to_outgoing_providers(self.target_user)
536
537        task_group.run.assert_called_once_with()
538        task_group.run.return_value.wait.assert_called_once_with(timeout=5000)
539
540    def test_lockdown_outgoing_provider_sync_failure_does_not_fail_lockdown(self):
541        """Test completed local lockdown still emits an event if outgoing sync fails."""
542        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
543        view = self.make_stage_view(plan)
544
545        with patch.object(
546            view,
547            "_sync_deactivated_user_to_outgoing_providers",
548            side_effect=ValueError("sync failed"),
549        ):
550            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
551
552        self.target_user.refresh_from_db()
553        self.assertFalse(self.target_user.is_active)
554        event = self.get_lockdown_event()
555        self.assertIsNotNone(event)
556
557
558class TestAccountLockdownStageConcurrency(AccountLockdownStageTestMixin, TransactionTestCase):
559    """Account lockdown concurrency tests."""
560
561    def test_lockdown_retries_when_another_transaction_recreates_a_token(self):
562        """Lockdown should remove a token recreated before the retry check runs."""
563        Token.objects.create(
564            user=self.target_user,
565            identifier=f"initial-token-{generate_id()}",
566            intent=TokenIntents.INTENT_API,
567            key=generate_id(),
568            expiring=False,
569        )
570
571        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
572        view = self.make_stage_view(plan)
573        original_has_artifacts = view._has_lockdown_artifacts
574        target_user = self.target_user
575        thread_ready = ThreadEvent()
576        start_create = ThreadEvent()
577        thread_done = ThreadEvent()
578        thread_errors = []
579
580        class TokenCreatorThread(Thread):
581            __test__ = False
582
583            def run(self):
584                try:
585                    thread_ready.set()
586                    if not start_create.wait(timeout=5):
587                        thread_errors.append("timed out waiting to recreate token")
588                        return
589                    Token.objects.create(
590                        user=target_user,
591                        identifier=f"concurrent-token-{generate_id()}",
592                        intent=TokenIntents.INTENT_API,
593                        key=generate_id(),
594                        expiring=False,
595                    )
596                except Exception as exc:  # noqa: BLE001
597                    thread_errors.append(exc)
598                finally:
599                    thread_done.set()
600                    connection.close()
601
602        def has_artifacts_after_concurrent_create(stage, user):
603            if not start_create.is_set():
604                start_create.set()
605                self.assertTrue(
606                    thread_done.wait(timeout=30),
607                    (
608                        "Concurrent token creation did not complete "
609                        f"before retry check: {thread_errors}"
610                    ),
611                )
612            return original_has_artifacts(stage, user)
613
614        creator = TokenCreatorThread()
615        with patch.object(
616            view, "_has_lockdown_artifacts", side_effect=has_artifacts_after_concurrent_create
617        ):
618            creator.start()
619            self.assertTrue(
620                thread_ready.wait(timeout=5),
621                "Concurrent token creation thread did not start",
622            )
623            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
624            creator.join()
625
626        self.assertEqual(thread_errors, [])
627        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)
patch_enterprise_enabled = <unittest.mock._patch object>
class AccountLockdownStageTestMixin:
 59class AccountLockdownStageTestMixin:
 60    """Shared setup helpers for account lockdown stage tests."""
 61
 62    @classmethod
 63    def setUpClass(cls):
 64        cls.patch_enterprise_enabled = patch_enterprise_enabled.start()
 65        cls.patch_event_dispatch = patch("authentik.events.tasks.event_trigger_dispatch.send")
 66        cls.patch_event_dispatch.start()
 67        super().setUpClass()
 68
 69    @classmethod
 70    def tearDownClass(cls):
 71        cls.patch_event_dispatch.stop()
 72        patch_enterprise_enabled.stop()
 73        super().tearDownClass()
 74
 75    def setUp(self):
 76        super().setUp()
 77        self.user = create_test_admin_user()
 78        self.target_user = create_test_admin_user()
 79        self.flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION)
 80        self.stage = AccountLockdownStage.objects.create(
 81            name="lockdown",
 82        )
 83        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 84        self.request_factory = RequestFactory()
 85
 86    def make_stage_view(self, plan: FlowPlan):
 87        def _stage_ok():
 88            return HttpResponse(status=204)
 89
 90        def _stage_invalid(_error_message=None):
 91            return HttpResponse(status=400)
 92
 93        return AccountLockdownStageView(
 94            SimpleNamespace(
 95                plan=plan,
 96                current_stage=self.stage,
 97                current_binding=self.binding,
 98                flow=self.flow,
 99                stage_ok=_stage_ok,
100                stage_invalid=_stage_invalid,
101            )
102        )
103
104    def make_request(self, *, user=None, query=None):
105        return self.request_factory.post(
106            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
107            query_params=query or {},
108            user=user,
109        )
110
111    def get_lockdown_event(self):
112        """Return the account-lockdown user-write event."""
113        return Event.objects.filter(
114            action=EventAction.USER_WRITE,
115            context__action_id=LOCKDOWN_EVENT_ACTION_ID,
116        ).first()

Shared setup helpers for account lockdown stage tests.

@classmethod
def setUpClass(cls):
62    @classmethod
63    def setUpClass(cls):
64        cls.patch_enterprise_enabled = patch_enterprise_enabled.start()
65        cls.patch_event_dispatch = patch("authentik.events.tasks.event_trigger_dispatch.send")
66        cls.patch_event_dispatch.start()
67        super().setUpClass()
@classmethod
def tearDownClass(cls):
69    @classmethod
70    def tearDownClass(cls):
71        cls.patch_event_dispatch.stop()
72        patch_enterprise_enabled.stop()
73        super().tearDownClass()
def setUp(self):
75    def setUp(self):
76        super().setUp()
77        self.user = create_test_admin_user()
78        self.target_user = create_test_admin_user()
79        self.flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION)
80        self.stage = AccountLockdownStage.objects.create(
81            name="lockdown",
82        )
83        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
84        self.request_factory = RequestFactory()
def make_stage_view(self, plan: authentik.flows.planner.FlowPlan):
 86    def make_stage_view(self, plan: FlowPlan):
 87        def _stage_ok():
 88            return HttpResponse(status=204)
 89
 90        def _stage_invalid(_error_message=None):
 91            return HttpResponse(status=400)
 92
 93        return AccountLockdownStageView(
 94            SimpleNamespace(
 95                plan=plan,
 96                current_stage=self.stage,
 97                current_binding=self.binding,
 98                flow=self.flow,
 99                stage_ok=_stage_ok,
100                stage_invalid=_stage_invalid,
101            )
102        )
def make_request(self, *, user=None, query=None):
104    def make_request(self, *, user=None, query=None):
105        return self.request_factory.post(
106            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
107            query_params=query or {},
108            user=user,
109        )
def get_lockdown_event(self):
111    def get_lockdown_event(self):
112        """Return the account-lockdown user-write event."""
113        return Event.objects.filter(
114            action=EventAction.USER_WRITE,
115            context__action_id=LOCKDOWN_EVENT_ACTION_ID,
116        ).first()

Return the account-lockdown user-write event.

class TestAccountLockdownStage(AccountLockdownStageTestMixin, authentik.flows.tests.FlowTestCase):
119class TestAccountLockdownStage(AccountLockdownStageTestMixin, FlowTestCase):
120    """Account lockdown stage tests"""
121
122    def test_lockdown_no_target(self):
123        """Test lockdown stage with no pending user fails"""
124        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
125        view = self.make_stage_view(plan)
126
127        response = view.dispatch(self.make_request())
128
129        self.assertEqual(response.status_code, 400)
130
131    def test_lockdown_with_pending_user(self):
132        """Test lockdown stage with a pending target user."""
133        self.target_user.is_active = True
134        self.target_user.save()
135
136        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
137        plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Security incident"
138        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
139        view = self.make_stage_view(plan)
140        request = self.make_request(user=self.user)
141
142        self.assertTrue(can_lock_user(request.user, self.target_user))
143        response = view.dispatch(request)
144
145        self.target_user.refresh_from_db()
146        self.assertFalse(self.target_user.is_active)
147        self.assertFalse(self.target_user.has_usable_password())
148        self.assertEqual(response.status_code, 204)
149
150        # Check event was created
151        event = self.get_lockdown_event()
152        self.assertIsNotNone(event)
153        self.assertEqual(event.context["action_id"], LOCKDOWN_EVENT_ACTION_ID)
154        self.assertEqual(event.context["reason"], "Security incident")
155        self.assertEqual(event.context["affected_user"], self.target_user.username)
156
157    def test_lockdown_with_pending_user_reason(self):
158        """Test lockdown stage with a pending target and explicit reason."""
159        self.target_user.is_active = True
160        self.target_user.save()
161
162        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
163        plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Compromised account"
164        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
165        view = self.make_stage_view(plan)
166        request = self.make_request(user=self.user)
167
168        self.assertTrue(can_lock_user(request.user, self.target_user))
169        response = view.dispatch(request)
170
171        self.target_user.refresh_from_db()
172        self.assertFalse(self.target_user.is_active)
173        self.assertEqual(response.status_code, 204)
174
175    def test_lockdown_reason_from_prompt(self):
176        """Test lockdown stage reads the reason from prompt data."""
177        self.target_user.is_active = True
178        self.target_user.save()
179
180        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
181        plan.context[PLAN_CONTEXT_PROMPT] = {
182            PLAN_CONTEXT_LOCKDOWN_REASON: "User requested lockdown",
183        }
184        view = self.make_stage_view(plan)
185        request = self.make_request(user=self.user)
186        view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
187
188        event = self.get_lockdown_event()
189        self.assertIsNotNone(event)
190        self.assertEqual(event.context["reason"], "User requested lockdown")
191
192    def test_lockdown_event_failure_does_not_fail_self_service(self):
193        """Test lockdown still succeeds when event emission fails."""
194        self.stage.delete_sessions = False
195        self.stage.save()
196
197        self.target_user.is_active = True
198        self.target_user.save()
199
200        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
201        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
202        view = self.make_stage_view(plan)
203        request = self.make_request(user=self.target_user)
204
205        original_event_new = Event.new
206
207        def _event_new_side_effect(action, *args, **kwargs):
208            if (
209                action == EventAction.USER_WRITE
210                and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID
211            ):
212                raise RuntimeError("simulated event failure")
213            return original_event_new(action, *args, **kwargs)
214
215        with patch(
216            "authentik.enterprise.stages.account_lockdown.stage.Event.new",
217            side_effect=_event_new_side_effect,
218        ):
219            view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
220
221        self.target_user.refresh_from_db()
222        self.assertFalse(self.target_user.is_active)
223
224    def test_dispatch_records_success_when_event_emission_fails(self):
225        """Test dispatch still completes if event emission fails."""
226        self.stage.delete_sessions = False
227        self.stage.save()
228
229        self.target_user.is_active = True
230        self.target_user.save()
231
232        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
233        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
234        view = self.make_stage_view(plan)
235        request = self.make_request(
236            user=self.target_user,
237        )
238
239        original_event_new = Event.new
240
241        def _event_new_side_effect(action, *args, **kwargs):
242            if (
243                action == EventAction.USER_WRITE
244                and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID
245            ):
246                raise RuntimeError("simulated event failure")
247            return original_event_new(action, *args, **kwargs)
248
249        with patch(
250            "authentik.enterprise.stages.account_lockdown.stage.Event.new",
251            side_effect=_event_new_side_effect,
252        ):
253            response = view.dispatch(request)
254
255        self.target_user.refresh_from_db()
256        self.assertFalse(self.target_user.is_active)
257        self.assertEqual(response.status_code, 204)
258
259    def test_lockdown_self_service_redirects_to_completion_flow(self):
260        """Test self-service lockdown redirects to completion flow when sessions are deleted."""
261        completion_flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION)
262        self.stage.self_service_completion_flow = completion_flow
263        self.stage.save()
264
265        self.target_user.is_active = True
266        self.target_user.save()
267
268        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
269        view = self.make_stage_view(plan)
270        request = self.make_request(user=self.target_user)
271        view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
272        response = view._self_service_completion_response(request)
273
274        self.assertEqual(response.status_code, 302)
275        self.assertEqual(
276            response.url,
277            reverse("authentik_core:if-flow", kwargs={"flow_slug": completion_flow.slug}),
278        )
279
280    def test_lockdown_self_service_requires_completion_flow(self):
281        """Test self-service lockdown fails before deleting sessions without a completion flow."""
282        self.stage.self_service_completion_flow = None
283        self.stage.save()
284
285        self.target_user.is_active = True
286        self.target_user.save()
287
288        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
289        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
290        view = self.make_stage_view(plan)
291        request = self.make_request(user=self.target_user)
292
293        response = view.dispatch(request)
294
295        self.assertEqual(response.status_code, 400)
296        self.target_user.refresh_from_db()
297        self.assertTrue(self.target_user.is_active)
298
299    def test_lockdown_denies_other_user_without_permission(self):
300        """Test lockdown stage rejects non-self requests without change_user permission."""
301        actor = create_test_user()
302
303        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
304        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
305        view = self.make_stage_view(plan)
306        request = self.make_request(user=actor)
307
308        self.assertFalse(can_lock_user(request.user, self.target_user))
309        response = view.dispatch(request)
310        self.assertEqual(response.status_code, 400)
311
312    def test_lockdown_revokes_tokens(self):
313        """Test lockdown stage revokes tokens"""
314        Token.objects.create(
315            user=self.target_user,
316            identifier="test-token",
317            intent=TokenIntents.INTENT_API,
318            key=generate_id(),
319            expiring=False,
320        )
321        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1)
322
323        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
324        view = self.make_stage_view(plan)
325        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
326
327        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)
328
329    def test_lockdown_revokes_provider_tokens(self):
330        """Test lockdown stage revokes provider tokens and sessions."""
331        oauth_provider = OAuth2Provider.objects.create(
332            name=generate_id(),
333            authorization_flow=create_test_flow(),
334            redirect_uris=[
335                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback")
336            ],
337            signing_key=create_test_cert(),
338        )
339        saml_provider = SAMLProvider.objects.create(
340            name=generate_id(),
341            authorization_flow=create_test_flow(),
342            acs_url="https://sp.example.com/acs",
343            issuer_override="https://idp.example.com",
344        )
345        session = Session.objects.create(
346            session_key=generate_id(),
347            expires=timezone.now() + timezone.timedelta(hours=1),
348            last_ip="127.0.0.1",
349        )
350        auth_session = AuthenticatedSession.objects.create(
351            session=session,
352            user=self.target_user,
353        )
354        grant_kwargs = {
355            "provider": oauth_provider,
356            "user": self.target_user,
357            "auth_time": timezone.now(),
358            "_scope": "openid profile",
359            "expiring": False,
360        }
361        token_kwargs = grant_kwargs | {"_id_token": json.dumps(asdict(IDToken("foo", "bar")))}
362        AuthorizationCode.objects.create(
363            code=generate_id(),
364            session=auth_session,
365            **grant_kwargs,
366        )
367        AccessToken.objects.create(
368            token=generate_id(),
369            session=auth_session,
370            **token_kwargs,
371        )
372        RefreshToken.objects.create(
373            token=generate_id(),
374            session=auth_session,
375            **token_kwargs,
376        )
377        DeviceToken.objects.create(
378            provider=oauth_provider,
379            user=self.target_user,
380            session=auth_session,
381            _scope="openid profile",
382            expiring=False,
383        )
384        SAMLSession.objects.create(
385            provider=saml_provider,
386            user=self.target_user,
387            session=auth_session,
388            session_index=generate_id(),
389            name_id=self.target_user.email,
390            expires=timezone.now() + timezone.timedelta(hours=1),
391            expiring=True,
392        )
393
394        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
395        view = self.make_stage_view(plan)
396        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
397
398        self.assertEqual(AuthorizationCode.objects.filter(user=self.target_user).count(), 0)
399        self.assertEqual(AccessToken.objects.filter(user=self.target_user).count(), 0)
400        self.assertEqual(RefreshToken.objects.filter(user=self.target_user).count(), 0)
401        self.assertEqual(DeviceToken.objects.filter(user=self.target_user).count(), 0)
402        self.assertEqual(SAMLSession.objects.filter(user=self.target_user).count(), 0)
403
404    def test_lockdown_selective_actions(self):
405        """Test lockdown stage with selective actions"""
406        self.stage.deactivate_user = True
407        self.stage.set_unusable_password = False
408        self.stage.delete_sessions = False
409        self.stage.revoke_tokens = False
410        self.stage.save()
411
412        self.target_user.is_active = True
413        self.target_user.set_password("testpassword")
414        self.target_user.save()
415
416        Token.objects.create(
417            user=self.target_user,
418            identifier="test-token",
419            intent=TokenIntents.INTENT_API,
420            key=generate_id(),
421            expiring=False,
422        )
423
424        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
425        view = self.make_stage_view(plan)
426        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
427
428        self.target_user.refresh_from_db()
429        # User should be deactivated
430        self.assertFalse(self.target_user.is_active)
431        # Password should still be usable
432        self.assertTrue(self.target_user.has_usable_password())
433        # Token should still exist
434        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1)
435
436    def test_lockdown_no_actions(self):
437        """Test lockdown stage with all actions disabled"""
438        self.stage.deactivate_user = False
439        self.stage.set_unusable_password = False
440        self.stage.delete_sessions = False
441        self.stage.revoke_tokens = False
442        self.stage.save()
443
444        self.target_user.is_active = True
445        self.target_user.set_password("testpassword")
446        self.target_user.save()
447
448        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
449        view = self.make_stage_view(plan)
450        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
451
452        self.target_user.refresh_from_db()
453        # User should still be active
454        self.assertTrue(self.target_user.is_active)
455        # Password should still be usable
456        self.assertTrue(self.target_user.has_usable_password())
457        # Event should still be created
458        event = self.get_lockdown_event()
459        self.assertIsNotNone(event)
460
461    def test_lockdown_deactivation_inhibits_signal_dispatch_until_after_commit(self):
462        """Test lockdown queues explicit outgoing syncs after the deactivation transaction."""
463        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
464        view = self.make_stage_view(plan)
465
466        with (
467            patch(
468                "authentik.enterprise.stages.account_lockdown.stage.sync_outgoing_inhibit_dispatch"
469            ) as inhibit,
470            patch.object(view, "_sync_deactivated_user_to_outgoing_providers") as sync_outgoing,
471        ):
472            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
473
474        inhibit.assert_called_once()
475        sync_outgoing.assert_called_once()
476        synced_user = sync_outgoing.call_args.args[0]
477        self.assertEqual(synced_user.pk, self.target_user.pk)
478        self.assertFalse(synced_user.is_active)
479
480    def test_lockdown_waits_for_direct_outgoing_provider_syncs(self):
481        """Test direct outgoing sync tasks are enqueued and waited on."""
482        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
483        view = self.make_stage_view(plan)
484        provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5")
485        task_sync_direct = MagicMock()
486        task_sync_direct.message_with_options.return_value = "direct-message"
487        provider_model = SimpleNamespace(
488            objects=SimpleNamespace(filter=MagicMock(return_value=[provider]))
489        )
490        task_group = MagicMock()
491
492        with (
493            patch(
494                "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks",
495                return_value=((provider_model, task_sync_direct),),
496            ),
497            patch(
498                "authentik.enterprise.stages.account_lockdown.stage.group",
499                return_value=task_group,
500            ) as task_group_cls,
501        ):
502            view._sync_deactivated_user_to_outgoing_providers(self.target_user)
503
504        task_sync_direct.message_with_options.assert_called_once_with(
505            args=(class_to_path(type(self.target_user)), self.target_user.pk, provider.pk),
506            rel_obj=provider,
507            time_limit=5000,
508            uid=f"{provider.name}:user:{self.target_user.pk}:direct",
509        )
510        task_group_cls.assert_called_once_with(["direct-message"])
511        task_group.run.return_value.wait.assert_called_once_with(timeout=5000)
512
513    def test_lockdown_outgoing_provider_sync_timeout_leaves_tasks_running(self):
514        """Test timeout while waiting for direct outgoing syncs does not fail lockdown."""
515        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
516        view = self.make_stage_view(plan)
517        provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5")
518        task_sync_direct = MagicMock()
519        task_sync_direct.message_with_options.return_value = "direct-message"
520        provider_model = SimpleNamespace(
521            objects=SimpleNamespace(filter=MagicMock(return_value=[provider]))
522        )
523        task_group = MagicMock()
524        task_group.run.return_value.wait.side_effect = ResultTimeout("timed out")
525
526        with (
527            patch(
528                "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks",
529                return_value=((provider_model, task_sync_direct),),
530            ),
531            patch(
532                "authentik.enterprise.stages.account_lockdown.stage.group",
533                return_value=task_group,
534            ),
535        ):
536            view._sync_deactivated_user_to_outgoing_providers(self.target_user)
537
538        task_group.run.assert_called_once_with()
539        task_group.run.return_value.wait.assert_called_once_with(timeout=5000)
540
541    def test_lockdown_outgoing_provider_sync_failure_does_not_fail_lockdown(self):
542        """Test completed local lockdown still emits an event if outgoing sync fails."""
543        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
544        view = self.make_stage_view(plan)
545
546        with patch.object(
547            view,
548            "_sync_deactivated_user_to_outgoing_providers",
549            side_effect=ValueError("sync failed"),
550        ):
551            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
552
553        self.target_user.refresh_from_db()
554        self.assertFalse(self.target_user.is_active)
555        event = self.get_lockdown_event()
556        self.assertIsNotNone(event)

Account lockdown stage tests

def test_lockdown_no_target(self):
122    def test_lockdown_no_target(self):
123        """Test lockdown stage with no pending user fails"""
124        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
125        view = self.make_stage_view(plan)
126
127        response = view.dispatch(self.make_request())
128
129        self.assertEqual(response.status_code, 400)

Test lockdown stage with no pending user fails

def test_lockdown_with_pending_user(self):
131    def test_lockdown_with_pending_user(self):
132        """Test lockdown stage with a pending target user."""
133        self.target_user.is_active = True
134        self.target_user.save()
135
136        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
137        plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Security incident"
138        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
139        view = self.make_stage_view(plan)
140        request = self.make_request(user=self.user)
141
142        self.assertTrue(can_lock_user(request.user, self.target_user))
143        response = view.dispatch(request)
144
145        self.target_user.refresh_from_db()
146        self.assertFalse(self.target_user.is_active)
147        self.assertFalse(self.target_user.has_usable_password())
148        self.assertEqual(response.status_code, 204)
149
150        # Check event was created
151        event = self.get_lockdown_event()
152        self.assertIsNotNone(event)
153        self.assertEqual(event.context["action_id"], LOCKDOWN_EVENT_ACTION_ID)
154        self.assertEqual(event.context["reason"], "Security incident")
155        self.assertEqual(event.context["affected_user"], self.target_user.username)

Test lockdown stage with a pending target user.

def test_lockdown_with_pending_user_reason(self):
157    def test_lockdown_with_pending_user_reason(self):
158        """Test lockdown stage with a pending target and explicit reason."""
159        self.target_user.is_active = True
160        self.target_user.save()
161
162        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
163        plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Compromised account"
164        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
165        view = self.make_stage_view(plan)
166        request = self.make_request(user=self.user)
167
168        self.assertTrue(can_lock_user(request.user, self.target_user))
169        response = view.dispatch(request)
170
171        self.target_user.refresh_from_db()
172        self.assertFalse(self.target_user.is_active)
173        self.assertEqual(response.status_code, 204)

Test lockdown stage with a pending target and explicit reason.

def test_lockdown_reason_from_prompt(self):
175    def test_lockdown_reason_from_prompt(self):
176        """Test lockdown stage reads the reason from prompt data."""
177        self.target_user.is_active = True
178        self.target_user.save()
179
180        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
181        plan.context[PLAN_CONTEXT_PROMPT] = {
182            PLAN_CONTEXT_LOCKDOWN_REASON: "User requested lockdown",
183        }
184        view = self.make_stage_view(plan)
185        request = self.make_request(user=self.user)
186        view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
187
188        event = self.get_lockdown_event()
189        self.assertIsNotNone(event)
190        self.assertEqual(event.context["reason"], "User requested lockdown")

Test lockdown stage reads the reason from prompt data.

def test_lockdown_event_failure_does_not_fail_self_service(self):
192    def test_lockdown_event_failure_does_not_fail_self_service(self):
193        """Test lockdown still succeeds when event emission fails."""
194        self.stage.delete_sessions = False
195        self.stage.save()
196
197        self.target_user.is_active = True
198        self.target_user.save()
199
200        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
201        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
202        view = self.make_stage_view(plan)
203        request = self.make_request(user=self.target_user)
204
205        original_event_new = Event.new
206
207        def _event_new_side_effect(action, *args, **kwargs):
208            if (
209                action == EventAction.USER_WRITE
210                and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID
211            ):
212                raise RuntimeError("simulated event failure")
213            return original_event_new(action, *args, **kwargs)
214
215        with patch(
216            "authentik.enterprise.stages.account_lockdown.stage.Event.new",
217            side_effect=_event_new_side_effect,
218        ):
219            view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
220
221        self.target_user.refresh_from_db()
222        self.assertFalse(self.target_user.is_active)

Test lockdown still succeeds when event emission fails.

def test_dispatch_records_success_when_event_emission_fails(self):
224    def test_dispatch_records_success_when_event_emission_fails(self):
225        """Test dispatch still completes if event emission fails."""
226        self.stage.delete_sessions = False
227        self.stage.save()
228
229        self.target_user.is_active = True
230        self.target_user.save()
231
232        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
233        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
234        view = self.make_stage_view(plan)
235        request = self.make_request(
236            user=self.target_user,
237        )
238
239        original_event_new = Event.new
240
241        def _event_new_side_effect(action, *args, **kwargs):
242            if (
243                action == EventAction.USER_WRITE
244                and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID
245            ):
246                raise RuntimeError("simulated event failure")
247            return original_event_new(action, *args, **kwargs)
248
249        with patch(
250            "authentik.enterprise.stages.account_lockdown.stage.Event.new",
251            side_effect=_event_new_side_effect,
252        ):
253            response = view.dispatch(request)
254
255        self.target_user.refresh_from_db()
256        self.assertFalse(self.target_user.is_active)
257        self.assertEqual(response.status_code, 204)

Test dispatch still completes if event emission fails.

def test_lockdown_self_service_redirects_to_completion_flow(self):
259    def test_lockdown_self_service_redirects_to_completion_flow(self):
260        """Test self-service lockdown redirects to completion flow when sessions are deleted."""
261        completion_flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION)
262        self.stage.self_service_completion_flow = completion_flow
263        self.stage.save()
264
265        self.target_user.is_active = True
266        self.target_user.save()
267
268        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
269        view = self.make_stage_view(plan)
270        request = self.make_request(user=self.target_user)
271        view._lockdown_user(request, self.stage, self.target_user, view.get_reason())
272        response = view._self_service_completion_response(request)
273
274        self.assertEqual(response.status_code, 302)
275        self.assertEqual(
276            response.url,
277            reverse("authentik_core:if-flow", kwargs={"flow_slug": completion_flow.slug}),
278        )

Test self-service lockdown redirects to completion flow when sessions are deleted.

def test_lockdown_self_service_requires_completion_flow(self):
280    def test_lockdown_self_service_requires_completion_flow(self):
281        """Test self-service lockdown fails before deleting sessions without a completion flow."""
282        self.stage.self_service_completion_flow = None
283        self.stage.save()
284
285        self.target_user.is_active = True
286        self.target_user.save()
287
288        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
289        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
290        view = self.make_stage_view(plan)
291        request = self.make_request(user=self.target_user)
292
293        response = view.dispatch(request)
294
295        self.assertEqual(response.status_code, 400)
296        self.target_user.refresh_from_db()
297        self.assertTrue(self.target_user.is_active)

Test self-service lockdown fails before deleting sessions without a completion flow.

def test_lockdown_denies_other_user_without_permission(self):
299    def test_lockdown_denies_other_user_without_permission(self):
300        """Test lockdown stage rejects non-self requests without change_user permission."""
301        actor = create_test_user()
302
303        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
304        plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user
305        view = self.make_stage_view(plan)
306        request = self.make_request(user=actor)
307
308        self.assertFalse(can_lock_user(request.user, self.target_user))
309        response = view.dispatch(request)
310        self.assertEqual(response.status_code, 400)

Test lockdown stage rejects non-self requests without change_user permission.

def test_lockdown_revokes_tokens(self):
312    def test_lockdown_revokes_tokens(self):
313        """Test lockdown stage revokes tokens"""
314        Token.objects.create(
315            user=self.target_user,
316            identifier="test-token",
317            intent=TokenIntents.INTENT_API,
318            key=generate_id(),
319            expiring=False,
320        )
321        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1)
322
323        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
324        view = self.make_stage_view(plan)
325        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
326
327        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)

Test lockdown stage revokes tokens

def test_lockdown_revokes_provider_tokens(self):
329    def test_lockdown_revokes_provider_tokens(self):
330        """Test lockdown stage revokes provider tokens and sessions."""
331        oauth_provider = OAuth2Provider.objects.create(
332            name=generate_id(),
333            authorization_flow=create_test_flow(),
334            redirect_uris=[
335                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback")
336            ],
337            signing_key=create_test_cert(),
338        )
339        saml_provider = SAMLProvider.objects.create(
340            name=generate_id(),
341            authorization_flow=create_test_flow(),
342            acs_url="https://sp.example.com/acs",
343            issuer_override="https://idp.example.com",
344        )
345        session = Session.objects.create(
346            session_key=generate_id(),
347            expires=timezone.now() + timezone.timedelta(hours=1),
348            last_ip="127.0.0.1",
349        )
350        auth_session = AuthenticatedSession.objects.create(
351            session=session,
352            user=self.target_user,
353        )
354        grant_kwargs = {
355            "provider": oauth_provider,
356            "user": self.target_user,
357            "auth_time": timezone.now(),
358            "_scope": "openid profile",
359            "expiring": False,
360        }
361        token_kwargs = grant_kwargs | {"_id_token": json.dumps(asdict(IDToken("foo", "bar")))}
362        AuthorizationCode.objects.create(
363            code=generate_id(),
364            session=auth_session,
365            **grant_kwargs,
366        )
367        AccessToken.objects.create(
368            token=generate_id(),
369            session=auth_session,
370            **token_kwargs,
371        )
372        RefreshToken.objects.create(
373            token=generate_id(),
374            session=auth_session,
375            **token_kwargs,
376        )
377        DeviceToken.objects.create(
378            provider=oauth_provider,
379            user=self.target_user,
380            session=auth_session,
381            _scope="openid profile",
382            expiring=False,
383        )
384        SAMLSession.objects.create(
385            provider=saml_provider,
386            user=self.target_user,
387            session=auth_session,
388            session_index=generate_id(),
389            name_id=self.target_user.email,
390            expires=timezone.now() + timezone.timedelta(hours=1),
391            expiring=True,
392        )
393
394        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
395        view = self.make_stage_view(plan)
396        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
397
398        self.assertEqual(AuthorizationCode.objects.filter(user=self.target_user).count(), 0)
399        self.assertEqual(AccessToken.objects.filter(user=self.target_user).count(), 0)
400        self.assertEqual(RefreshToken.objects.filter(user=self.target_user).count(), 0)
401        self.assertEqual(DeviceToken.objects.filter(user=self.target_user).count(), 0)
402        self.assertEqual(SAMLSession.objects.filter(user=self.target_user).count(), 0)

Test lockdown stage revokes provider tokens and sessions.

def test_lockdown_selective_actions(self):
404    def test_lockdown_selective_actions(self):
405        """Test lockdown stage with selective actions"""
406        self.stage.deactivate_user = True
407        self.stage.set_unusable_password = False
408        self.stage.delete_sessions = False
409        self.stage.revoke_tokens = False
410        self.stage.save()
411
412        self.target_user.is_active = True
413        self.target_user.set_password("testpassword")
414        self.target_user.save()
415
416        Token.objects.create(
417            user=self.target_user,
418            identifier="test-token",
419            intent=TokenIntents.INTENT_API,
420            key=generate_id(),
421            expiring=False,
422        )
423
424        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
425        view = self.make_stage_view(plan)
426        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
427
428        self.target_user.refresh_from_db()
429        # User should be deactivated
430        self.assertFalse(self.target_user.is_active)
431        # Password should still be usable
432        self.assertTrue(self.target_user.has_usable_password())
433        # Token should still exist
434        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1)

Test lockdown stage with selective actions

def test_lockdown_no_actions(self):
436    def test_lockdown_no_actions(self):
437        """Test lockdown stage with all actions disabled"""
438        self.stage.deactivate_user = False
439        self.stage.set_unusable_password = False
440        self.stage.delete_sessions = False
441        self.stage.revoke_tokens = False
442        self.stage.save()
443
444        self.target_user.is_active = True
445        self.target_user.set_password("testpassword")
446        self.target_user.save()
447
448        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
449        view = self.make_stage_view(plan)
450        view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
451
452        self.target_user.refresh_from_db()
453        # User should still be active
454        self.assertTrue(self.target_user.is_active)
455        # Password should still be usable
456        self.assertTrue(self.target_user.has_usable_password())
457        # Event should still be created
458        event = self.get_lockdown_event()
459        self.assertIsNotNone(event)

Test lockdown stage with all actions disabled

def test_lockdown_deactivation_inhibits_signal_dispatch_until_after_commit(self):
461    def test_lockdown_deactivation_inhibits_signal_dispatch_until_after_commit(self):
462        """Test lockdown queues explicit outgoing syncs after the deactivation transaction."""
463        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
464        view = self.make_stage_view(plan)
465
466        with (
467            patch(
468                "authentik.enterprise.stages.account_lockdown.stage.sync_outgoing_inhibit_dispatch"
469            ) as inhibit,
470            patch.object(view, "_sync_deactivated_user_to_outgoing_providers") as sync_outgoing,
471        ):
472            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
473
474        inhibit.assert_called_once()
475        sync_outgoing.assert_called_once()
476        synced_user = sync_outgoing.call_args.args[0]
477        self.assertEqual(synced_user.pk, self.target_user.pk)
478        self.assertFalse(synced_user.is_active)

Test lockdown queues explicit outgoing syncs after the deactivation transaction.

def test_lockdown_waits_for_direct_outgoing_provider_syncs(self):
480    def test_lockdown_waits_for_direct_outgoing_provider_syncs(self):
481        """Test direct outgoing sync tasks are enqueued and waited on."""
482        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
483        view = self.make_stage_view(plan)
484        provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5")
485        task_sync_direct = MagicMock()
486        task_sync_direct.message_with_options.return_value = "direct-message"
487        provider_model = SimpleNamespace(
488            objects=SimpleNamespace(filter=MagicMock(return_value=[provider]))
489        )
490        task_group = MagicMock()
491
492        with (
493            patch(
494                "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks",
495                return_value=((provider_model, task_sync_direct),),
496            ),
497            patch(
498                "authentik.enterprise.stages.account_lockdown.stage.group",
499                return_value=task_group,
500            ) as task_group_cls,
501        ):
502            view._sync_deactivated_user_to_outgoing_providers(self.target_user)
503
504        task_sync_direct.message_with_options.assert_called_once_with(
505            args=(class_to_path(type(self.target_user)), self.target_user.pk, provider.pk),
506            rel_obj=provider,
507            time_limit=5000,
508            uid=f"{provider.name}:user:{self.target_user.pk}:direct",
509        )
510        task_group_cls.assert_called_once_with(["direct-message"])
511        task_group.run.return_value.wait.assert_called_once_with(timeout=5000)

Test direct outgoing sync tasks are enqueued and waited on.

def test_lockdown_outgoing_provider_sync_timeout_leaves_tasks_running(self):
513    def test_lockdown_outgoing_provider_sync_timeout_leaves_tasks_running(self):
514        """Test timeout while waiting for direct outgoing syncs does not fail lockdown."""
515        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
516        view = self.make_stage_view(plan)
517        provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5")
518        task_sync_direct = MagicMock()
519        task_sync_direct.message_with_options.return_value = "direct-message"
520        provider_model = SimpleNamespace(
521            objects=SimpleNamespace(filter=MagicMock(return_value=[provider]))
522        )
523        task_group = MagicMock()
524        task_group.run.return_value.wait.side_effect = ResultTimeout("timed out")
525
526        with (
527            patch(
528                "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks",
529                return_value=((provider_model, task_sync_direct),),
530            ),
531            patch(
532                "authentik.enterprise.stages.account_lockdown.stage.group",
533                return_value=task_group,
534            ),
535        ):
536            view._sync_deactivated_user_to_outgoing_providers(self.target_user)
537
538        task_group.run.assert_called_once_with()
539        task_group.run.return_value.wait.assert_called_once_with(timeout=5000)

Test timeout while waiting for direct outgoing syncs does not fail lockdown.

def test_lockdown_outgoing_provider_sync_failure_does_not_fail_lockdown(self):
541    def test_lockdown_outgoing_provider_sync_failure_does_not_fail_lockdown(self):
542        """Test completed local lockdown still emits an event if outgoing sync fails."""
543        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
544        view = self.make_stage_view(plan)
545
546        with patch.object(
547            view,
548            "_sync_deactivated_user_to_outgoing_providers",
549            side_effect=ValueError("sync failed"),
550        ):
551            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
552
553        self.target_user.refresh_from_db()
554        self.assertFalse(self.target_user.is_active)
555        event = self.get_lockdown_event()
556        self.assertIsNotNone(event)

Test completed local lockdown still emits an event if outgoing sync fails.

class TestAccountLockdownStageConcurrency(AccountLockdownStageTestMixin, django.test.testcases.TransactionTestCase):
559class TestAccountLockdownStageConcurrency(AccountLockdownStageTestMixin, TransactionTestCase):
560    """Account lockdown concurrency tests."""
561
562    def test_lockdown_retries_when_another_transaction_recreates_a_token(self):
563        """Lockdown should remove a token recreated before the retry check runs."""
564        Token.objects.create(
565            user=self.target_user,
566            identifier=f"initial-token-{generate_id()}",
567            intent=TokenIntents.INTENT_API,
568            key=generate_id(),
569            expiring=False,
570        )
571
572        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
573        view = self.make_stage_view(plan)
574        original_has_artifacts = view._has_lockdown_artifacts
575        target_user = self.target_user
576        thread_ready = ThreadEvent()
577        start_create = ThreadEvent()
578        thread_done = ThreadEvent()
579        thread_errors = []
580
581        class TokenCreatorThread(Thread):
582            __test__ = False
583
584            def run(self):
585                try:
586                    thread_ready.set()
587                    if not start_create.wait(timeout=5):
588                        thread_errors.append("timed out waiting to recreate token")
589                        return
590                    Token.objects.create(
591                        user=target_user,
592                        identifier=f"concurrent-token-{generate_id()}",
593                        intent=TokenIntents.INTENT_API,
594                        key=generate_id(),
595                        expiring=False,
596                    )
597                except Exception as exc:  # noqa: BLE001
598                    thread_errors.append(exc)
599                finally:
600                    thread_done.set()
601                    connection.close()
602
603        def has_artifacts_after_concurrent_create(stage, user):
604            if not start_create.is_set():
605                start_create.set()
606                self.assertTrue(
607                    thread_done.wait(timeout=30),
608                    (
609                        "Concurrent token creation did not complete "
610                        f"before retry check: {thread_errors}"
611                    ),
612                )
613            return original_has_artifacts(stage, user)
614
615        creator = TokenCreatorThread()
616        with patch.object(
617            view, "_has_lockdown_artifacts", side_effect=has_artifacts_after_concurrent_create
618        ):
619            creator.start()
620            self.assertTrue(
621                thread_ready.wait(timeout=5),
622                "Concurrent token creation thread did not start",
623            )
624            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
625            creator.join()
626
627        self.assertEqual(thread_errors, [])
628        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)

Account lockdown concurrency tests.

def test_lockdown_retries_when_another_transaction_recreates_a_token(self):
562    def test_lockdown_retries_when_another_transaction_recreates_a_token(self):
563        """Lockdown should remove a token recreated before the retry check runs."""
564        Token.objects.create(
565            user=self.target_user,
566            identifier=f"initial-token-{generate_id()}",
567            intent=TokenIntents.INTENT_API,
568            key=generate_id(),
569            expiring=False,
570        )
571
572        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
573        view = self.make_stage_view(plan)
574        original_has_artifacts = view._has_lockdown_artifacts
575        target_user = self.target_user
576        thread_ready = ThreadEvent()
577        start_create = ThreadEvent()
578        thread_done = ThreadEvent()
579        thread_errors = []
580
581        class TokenCreatorThread(Thread):
582            __test__ = False
583
584            def run(self):
585                try:
586                    thread_ready.set()
587                    if not start_create.wait(timeout=5):
588                        thread_errors.append("timed out waiting to recreate token")
589                        return
590                    Token.objects.create(
591                        user=target_user,
592                        identifier=f"concurrent-token-{generate_id()}",
593                        intent=TokenIntents.INTENT_API,
594                        key=generate_id(),
595                        expiring=False,
596                    )
597                except Exception as exc:  # noqa: BLE001
598                    thread_errors.append(exc)
599                finally:
600                    thread_done.set()
601                    connection.close()
602
603        def has_artifacts_after_concurrent_create(stage, user):
604            if not start_create.is_set():
605                start_create.set()
606                self.assertTrue(
607                    thread_done.wait(timeout=30),
608                    (
609                        "Concurrent token creation did not complete "
610                        f"before retry check: {thread_errors}"
611                    ),
612                )
613            return original_has_artifacts(stage, user)
614
615        creator = TokenCreatorThread()
616        with patch.object(
617            view, "_has_lockdown_artifacts", side_effect=has_artifacts_after_concurrent_create
618        ):
619            creator.start()
620            self.assertTrue(
621                thread_ready.wait(timeout=5),
622                "Concurrent token creation thread did not start",
623            )
624            view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "")
625            creator.join()
626
627        self.assertEqual(thread_errors, [])
628        self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)

Lockdown should remove a token recreated before the retry check runs.