authentik.enterprise.stages.account_lockdown.tests.test_stage
Account lockdown stage tests
1"""Account lockdown stage tests""" 2 3import json 4from dataclasses import asdict 5from threading import Event as ThreadEvent 6from threading import Thread 7from types import SimpleNamespace 8from unittest.mock import MagicMock, patch 9 10from django.db import connection 11from django.http import HttpResponse 12from django.test import TransactionTestCase 13from django.urls import reverse 14from django.utils import timezone 15from dramatiq.results.errors import ResultTimeout 16 17from authentik.core.models import AuthenticatedSession, Session, Token, TokenIntents 18from authentik.core.tests.utils import ( 19 RequestFactory, 20 create_test_admin_user, 21 create_test_cert, 22 create_test_flow, 23 create_test_user, 24) 25from authentik.enterprise.stages.account_lockdown.models import AccountLockdownStage 26from authentik.enterprise.stages.account_lockdown.stage import ( 27 LOCKDOWN_EVENT_ACTION_ID, 28 PLAN_CONTEXT_LOCKDOWN_REASON, 29 AccountLockdownStageView, 30 can_lock_user, 31) 32from authentik.events.models import Event, EventAction 33from authentik.flows.markers import StageMarker 34from authentik.flows.models import FlowDesignation, FlowStageBinding 35from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 36from authentik.flows.tests import FlowTestCase 37from authentik.lib.generators import generate_id 38from authentik.lib.utils.reflection import class_to_path 39from authentik.providers.oauth2.id_token import IDToken 40from authentik.providers.oauth2.models import ( 41 AccessToken, 42 AuthorizationCode, 43 DeviceToken, 44 OAuth2Provider, 45 RedirectURI, 46 RedirectURIMatchingMode, 47 RefreshToken, 48) 49from authentik.providers.saml.models import SAMLProvider, SAMLSession 50from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 51 52patch_enterprise_enabled = patch( 53 "authentik.enterprise.apps.AuthentikEnterpriseConfig.check_enabled", 54 return_value=True, 55) 56 57 58class AccountLockdownStageTestMixin: 59 """Shared setup helpers for account lockdown stage tests.""" 60 61 @classmethod 62 def setUpClass(cls): 63 cls.patch_enterprise_enabled = patch_enterprise_enabled.start() 64 cls.patch_event_dispatch = patch("authentik.events.tasks.event_trigger_dispatch.send") 65 cls.patch_event_dispatch.start() 66 super().setUpClass() 67 68 @classmethod 69 def tearDownClass(cls): 70 cls.patch_event_dispatch.stop() 71 patch_enterprise_enabled.stop() 72 super().tearDownClass() 73 74 def setUp(self): 75 super().setUp() 76 self.user = create_test_admin_user() 77 self.target_user = create_test_admin_user() 78 self.flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION) 79 self.stage = AccountLockdownStage.objects.create( 80 name="lockdown", 81 ) 82 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 83 self.request_factory = RequestFactory() 84 85 def make_stage_view(self, plan: FlowPlan): 86 def _stage_ok(): 87 return HttpResponse(status=204) 88 89 def _stage_invalid(_error_message=None): 90 return HttpResponse(status=400) 91 92 return AccountLockdownStageView( 93 SimpleNamespace( 94 plan=plan, 95 current_stage=self.stage, 96 current_binding=self.binding, 97 flow=self.flow, 98 stage_ok=_stage_ok, 99 stage_invalid=_stage_invalid, 100 ) 101 ) 102 103 def make_request(self, *, user=None, query=None): 104 return self.request_factory.post( 105 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 106 query_params=query or {}, 107 user=user, 108 ) 109 110 def get_lockdown_event(self): 111 """Return the account-lockdown user-write event.""" 112 return Event.objects.filter( 113 action=EventAction.USER_WRITE, 114 context__action_id=LOCKDOWN_EVENT_ACTION_ID, 115 ).first() 116 117 118class TestAccountLockdownStage(AccountLockdownStageTestMixin, FlowTestCase): 119 """Account lockdown stage tests""" 120 121 def test_lockdown_no_target(self): 122 """Test lockdown stage with no pending user fails""" 123 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 124 view = self.make_stage_view(plan) 125 126 response = view.dispatch(self.make_request()) 127 128 self.assertEqual(response.status_code, 400) 129 130 def test_lockdown_with_pending_user(self): 131 """Test lockdown stage with a pending target user.""" 132 self.target_user.is_active = True 133 self.target_user.save() 134 135 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 136 plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Security incident" 137 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 138 view = self.make_stage_view(plan) 139 request = self.make_request(user=self.user) 140 141 self.assertTrue(can_lock_user(request.user, self.target_user)) 142 response = view.dispatch(request) 143 144 self.target_user.refresh_from_db() 145 self.assertFalse(self.target_user.is_active) 146 self.assertFalse(self.target_user.has_usable_password()) 147 self.assertEqual(response.status_code, 204) 148 149 # Check event was created 150 event = self.get_lockdown_event() 151 self.assertIsNotNone(event) 152 self.assertEqual(event.context["action_id"], LOCKDOWN_EVENT_ACTION_ID) 153 self.assertEqual(event.context["reason"], "Security incident") 154 self.assertEqual(event.context["affected_user"], self.target_user.username) 155 156 def test_lockdown_with_pending_user_reason(self): 157 """Test lockdown stage with a pending target and explicit reason.""" 158 self.target_user.is_active = True 159 self.target_user.save() 160 161 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 162 plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Compromised account" 163 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 164 view = self.make_stage_view(plan) 165 request = self.make_request(user=self.user) 166 167 self.assertTrue(can_lock_user(request.user, self.target_user)) 168 response = view.dispatch(request) 169 170 self.target_user.refresh_from_db() 171 self.assertFalse(self.target_user.is_active) 172 self.assertEqual(response.status_code, 204) 173 174 def test_lockdown_reason_from_prompt(self): 175 """Test lockdown stage reads the reason from prompt data.""" 176 self.target_user.is_active = True 177 self.target_user.save() 178 179 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 180 plan.context[PLAN_CONTEXT_PROMPT] = { 181 PLAN_CONTEXT_LOCKDOWN_REASON: "User requested lockdown", 182 } 183 view = self.make_stage_view(plan) 184 request = self.make_request(user=self.user) 185 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 186 187 event = self.get_lockdown_event() 188 self.assertIsNotNone(event) 189 self.assertEqual(event.context["reason"], "User requested lockdown") 190 191 def test_lockdown_event_failure_does_not_fail_self_service(self): 192 """Test lockdown still succeeds when event emission fails.""" 193 self.stage.delete_sessions = False 194 self.stage.save() 195 196 self.target_user.is_active = True 197 self.target_user.save() 198 199 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 200 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 201 view = self.make_stage_view(plan) 202 request = self.make_request(user=self.target_user) 203 204 original_event_new = Event.new 205 206 def _event_new_side_effect(action, *args, **kwargs): 207 if ( 208 action == EventAction.USER_WRITE 209 and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID 210 ): 211 raise RuntimeError("simulated event failure") 212 return original_event_new(action, *args, **kwargs) 213 214 with patch( 215 "authentik.enterprise.stages.account_lockdown.stage.Event.new", 216 side_effect=_event_new_side_effect, 217 ): 218 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 219 220 self.target_user.refresh_from_db() 221 self.assertFalse(self.target_user.is_active) 222 223 def test_dispatch_records_success_when_event_emission_fails(self): 224 """Test dispatch still completes if event emission fails.""" 225 self.stage.delete_sessions = False 226 self.stage.save() 227 228 self.target_user.is_active = True 229 self.target_user.save() 230 231 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 232 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 233 view = self.make_stage_view(plan) 234 request = self.make_request( 235 user=self.target_user, 236 ) 237 238 original_event_new = Event.new 239 240 def _event_new_side_effect(action, *args, **kwargs): 241 if ( 242 action == EventAction.USER_WRITE 243 and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID 244 ): 245 raise RuntimeError("simulated event failure") 246 return original_event_new(action, *args, **kwargs) 247 248 with patch( 249 "authentik.enterprise.stages.account_lockdown.stage.Event.new", 250 side_effect=_event_new_side_effect, 251 ): 252 response = view.dispatch(request) 253 254 self.target_user.refresh_from_db() 255 self.assertFalse(self.target_user.is_active) 256 self.assertEqual(response.status_code, 204) 257 258 def test_lockdown_self_service_redirects_to_completion_flow(self): 259 """Test self-service lockdown redirects to completion flow when sessions are deleted.""" 260 completion_flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION) 261 self.stage.self_service_completion_flow = completion_flow 262 self.stage.save() 263 264 self.target_user.is_active = True 265 self.target_user.save() 266 267 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 268 view = self.make_stage_view(plan) 269 request = self.make_request(user=self.target_user) 270 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 271 response = view._self_service_completion_response(request) 272 273 self.assertEqual(response.status_code, 302) 274 self.assertEqual( 275 response.url, 276 reverse("authentik_core:if-flow", kwargs={"flow_slug": completion_flow.slug}), 277 ) 278 279 def test_lockdown_self_service_requires_completion_flow(self): 280 """Test self-service lockdown fails before deleting sessions without a completion flow.""" 281 self.stage.self_service_completion_flow = None 282 self.stage.save() 283 284 self.target_user.is_active = True 285 self.target_user.save() 286 287 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 288 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 289 view = self.make_stage_view(plan) 290 request = self.make_request(user=self.target_user) 291 292 response = view.dispatch(request) 293 294 self.assertEqual(response.status_code, 400) 295 self.target_user.refresh_from_db() 296 self.assertTrue(self.target_user.is_active) 297 298 def test_lockdown_denies_other_user_without_permission(self): 299 """Test lockdown stage rejects non-self requests without change_user permission.""" 300 actor = create_test_user() 301 302 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 303 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 304 view = self.make_stage_view(plan) 305 request = self.make_request(user=actor) 306 307 self.assertFalse(can_lock_user(request.user, self.target_user)) 308 response = view.dispatch(request) 309 self.assertEqual(response.status_code, 400) 310 311 def test_lockdown_revokes_tokens(self): 312 """Test lockdown stage revokes tokens""" 313 Token.objects.create( 314 user=self.target_user, 315 identifier="test-token", 316 intent=TokenIntents.INTENT_API, 317 key=generate_id(), 318 expiring=False, 319 ) 320 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1) 321 322 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 323 view = self.make_stage_view(plan) 324 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 325 326 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0) 327 328 def test_lockdown_revokes_provider_tokens(self): 329 """Test lockdown stage revokes provider tokens and sessions.""" 330 oauth_provider = OAuth2Provider.objects.create( 331 name=generate_id(), 332 authorization_flow=create_test_flow(), 333 redirect_uris=[ 334 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback") 335 ], 336 signing_key=create_test_cert(), 337 ) 338 saml_provider = SAMLProvider.objects.create( 339 name=generate_id(), 340 authorization_flow=create_test_flow(), 341 acs_url="https://sp.example.com/acs", 342 issuer_override="https://idp.example.com", 343 ) 344 session = Session.objects.create( 345 session_key=generate_id(), 346 expires=timezone.now() + timezone.timedelta(hours=1), 347 last_ip="127.0.0.1", 348 ) 349 auth_session = AuthenticatedSession.objects.create( 350 session=session, 351 user=self.target_user, 352 ) 353 grant_kwargs = { 354 "provider": oauth_provider, 355 "user": self.target_user, 356 "auth_time": timezone.now(), 357 "_scope": "openid profile", 358 "expiring": False, 359 } 360 token_kwargs = grant_kwargs | {"_id_token": json.dumps(asdict(IDToken("foo", "bar")))} 361 AuthorizationCode.objects.create( 362 code=generate_id(), 363 session=auth_session, 364 **grant_kwargs, 365 ) 366 AccessToken.objects.create( 367 token=generate_id(), 368 session=auth_session, 369 **token_kwargs, 370 ) 371 RefreshToken.objects.create( 372 token=generate_id(), 373 session=auth_session, 374 **token_kwargs, 375 ) 376 DeviceToken.objects.create( 377 provider=oauth_provider, 378 user=self.target_user, 379 session=auth_session, 380 _scope="openid profile", 381 expiring=False, 382 ) 383 SAMLSession.objects.create( 384 provider=saml_provider, 385 user=self.target_user, 386 session=auth_session, 387 session_index=generate_id(), 388 name_id=self.target_user.email, 389 expires=timezone.now() + timezone.timedelta(hours=1), 390 expiring=True, 391 ) 392 393 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 394 view = self.make_stage_view(plan) 395 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 396 397 self.assertEqual(AuthorizationCode.objects.filter(user=self.target_user).count(), 0) 398 self.assertEqual(AccessToken.objects.filter(user=self.target_user).count(), 0) 399 self.assertEqual(RefreshToken.objects.filter(user=self.target_user).count(), 0) 400 self.assertEqual(DeviceToken.objects.filter(user=self.target_user).count(), 0) 401 self.assertEqual(SAMLSession.objects.filter(user=self.target_user).count(), 0) 402 403 def test_lockdown_selective_actions(self): 404 """Test lockdown stage with selective actions""" 405 self.stage.deactivate_user = True 406 self.stage.set_unusable_password = False 407 self.stage.delete_sessions = False 408 self.stage.revoke_tokens = False 409 self.stage.save() 410 411 self.target_user.is_active = True 412 self.target_user.set_password("testpassword") 413 self.target_user.save() 414 415 Token.objects.create( 416 user=self.target_user, 417 identifier="test-token", 418 intent=TokenIntents.INTENT_API, 419 key=generate_id(), 420 expiring=False, 421 ) 422 423 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 424 view = self.make_stage_view(plan) 425 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 426 427 self.target_user.refresh_from_db() 428 # User should be deactivated 429 self.assertFalse(self.target_user.is_active) 430 # Password should still be usable 431 self.assertTrue(self.target_user.has_usable_password()) 432 # Token should still exist 433 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1) 434 435 def test_lockdown_no_actions(self): 436 """Test lockdown stage with all actions disabled""" 437 self.stage.deactivate_user = False 438 self.stage.set_unusable_password = False 439 self.stage.delete_sessions = False 440 self.stage.revoke_tokens = False 441 self.stage.save() 442 443 self.target_user.is_active = True 444 self.target_user.set_password("testpassword") 445 self.target_user.save() 446 447 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 448 view = self.make_stage_view(plan) 449 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 450 451 self.target_user.refresh_from_db() 452 # User should still be active 453 self.assertTrue(self.target_user.is_active) 454 # Password should still be usable 455 self.assertTrue(self.target_user.has_usable_password()) 456 # Event should still be created 457 event = self.get_lockdown_event() 458 self.assertIsNotNone(event) 459 460 def test_lockdown_deactivation_inhibits_signal_dispatch_until_after_commit(self): 461 """Test lockdown queues explicit outgoing syncs after the deactivation transaction.""" 462 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 463 view = self.make_stage_view(plan) 464 465 with ( 466 patch( 467 "authentik.enterprise.stages.account_lockdown.stage.sync_outgoing_inhibit_dispatch" 468 ) as inhibit, 469 patch.object(view, "_sync_deactivated_user_to_outgoing_providers") as sync_outgoing, 470 ): 471 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 472 473 inhibit.assert_called_once() 474 sync_outgoing.assert_called_once() 475 synced_user = sync_outgoing.call_args.args[0] 476 self.assertEqual(synced_user.pk, self.target_user.pk) 477 self.assertFalse(synced_user.is_active) 478 479 def test_lockdown_waits_for_direct_outgoing_provider_syncs(self): 480 """Test direct outgoing sync tasks are enqueued and waited on.""" 481 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 482 view = self.make_stage_view(plan) 483 provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5") 484 task_sync_direct = MagicMock() 485 task_sync_direct.message_with_options.return_value = "direct-message" 486 provider_model = SimpleNamespace( 487 objects=SimpleNamespace(filter=MagicMock(return_value=[provider])) 488 ) 489 task_group = MagicMock() 490 491 with ( 492 patch( 493 "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks", 494 return_value=((provider_model, task_sync_direct),), 495 ), 496 patch( 497 "authentik.enterprise.stages.account_lockdown.stage.group", 498 return_value=task_group, 499 ) as task_group_cls, 500 ): 501 view._sync_deactivated_user_to_outgoing_providers(self.target_user) 502 503 task_sync_direct.message_with_options.assert_called_once_with( 504 args=(class_to_path(type(self.target_user)), self.target_user.pk, provider.pk), 505 rel_obj=provider, 506 time_limit=5000, 507 uid=f"{provider.name}:user:{self.target_user.pk}:direct", 508 ) 509 task_group_cls.assert_called_once_with(["direct-message"]) 510 task_group.run.return_value.wait.assert_called_once_with(timeout=5000) 511 512 def test_lockdown_outgoing_provider_sync_timeout_leaves_tasks_running(self): 513 """Test timeout while waiting for direct outgoing syncs does not fail lockdown.""" 514 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 515 view = self.make_stage_view(plan) 516 provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5") 517 task_sync_direct = MagicMock() 518 task_sync_direct.message_with_options.return_value = "direct-message" 519 provider_model = SimpleNamespace( 520 objects=SimpleNamespace(filter=MagicMock(return_value=[provider])) 521 ) 522 task_group = MagicMock() 523 task_group.run.return_value.wait.side_effect = ResultTimeout("timed out") 524 525 with ( 526 patch( 527 "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks", 528 return_value=((provider_model, task_sync_direct),), 529 ), 530 patch( 531 "authentik.enterprise.stages.account_lockdown.stage.group", 532 return_value=task_group, 533 ), 534 ): 535 view._sync_deactivated_user_to_outgoing_providers(self.target_user) 536 537 task_group.run.assert_called_once_with() 538 task_group.run.return_value.wait.assert_called_once_with(timeout=5000) 539 540 def test_lockdown_outgoing_provider_sync_failure_does_not_fail_lockdown(self): 541 """Test completed local lockdown still emits an event if outgoing sync fails.""" 542 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 543 view = self.make_stage_view(plan) 544 545 with patch.object( 546 view, 547 "_sync_deactivated_user_to_outgoing_providers", 548 side_effect=ValueError("sync failed"), 549 ): 550 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 551 552 self.target_user.refresh_from_db() 553 self.assertFalse(self.target_user.is_active) 554 event = self.get_lockdown_event() 555 self.assertIsNotNone(event) 556 557 558class TestAccountLockdownStageConcurrency(AccountLockdownStageTestMixin, TransactionTestCase): 559 """Account lockdown concurrency tests.""" 560 561 def test_lockdown_retries_when_another_transaction_recreates_a_token(self): 562 """Lockdown should remove a token recreated before the retry check runs.""" 563 Token.objects.create( 564 user=self.target_user, 565 identifier=f"initial-token-{generate_id()}", 566 intent=TokenIntents.INTENT_API, 567 key=generate_id(), 568 expiring=False, 569 ) 570 571 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 572 view = self.make_stage_view(plan) 573 original_has_artifacts = view._has_lockdown_artifacts 574 target_user = self.target_user 575 thread_ready = ThreadEvent() 576 start_create = ThreadEvent() 577 thread_done = ThreadEvent() 578 thread_errors = [] 579 580 class TokenCreatorThread(Thread): 581 __test__ = False 582 583 def run(self): 584 try: 585 thread_ready.set() 586 if not start_create.wait(timeout=5): 587 thread_errors.append("timed out waiting to recreate token") 588 return 589 Token.objects.create( 590 user=target_user, 591 identifier=f"concurrent-token-{generate_id()}", 592 intent=TokenIntents.INTENT_API, 593 key=generate_id(), 594 expiring=False, 595 ) 596 except Exception as exc: # noqa: BLE001 597 thread_errors.append(exc) 598 finally: 599 thread_done.set() 600 connection.close() 601 602 def has_artifacts_after_concurrent_create(stage, user): 603 if not start_create.is_set(): 604 start_create.set() 605 self.assertTrue( 606 thread_done.wait(timeout=30), 607 ( 608 "Concurrent token creation did not complete " 609 f"before retry check: {thread_errors}" 610 ), 611 ) 612 return original_has_artifacts(stage, user) 613 614 creator = TokenCreatorThread() 615 with patch.object( 616 view, "_has_lockdown_artifacts", side_effect=has_artifacts_after_concurrent_create 617 ): 618 creator.start() 619 self.assertTrue( 620 thread_ready.wait(timeout=5), 621 "Concurrent token creation thread did not start", 622 ) 623 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 624 creator.join() 625 626 self.assertEqual(thread_errors, []) 627 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)
59class AccountLockdownStageTestMixin: 60 """Shared setup helpers for account lockdown stage tests.""" 61 62 @classmethod 63 def setUpClass(cls): 64 cls.patch_enterprise_enabled = patch_enterprise_enabled.start() 65 cls.patch_event_dispatch = patch("authentik.events.tasks.event_trigger_dispatch.send") 66 cls.patch_event_dispatch.start() 67 super().setUpClass() 68 69 @classmethod 70 def tearDownClass(cls): 71 cls.patch_event_dispatch.stop() 72 patch_enterprise_enabled.stop() 73 super().tearDownClass() 74 75 def setUp(self): 76 super().setUp() 77 self.user = create_test_admin_user() 78 self.target_user = create_test_admin_user() 79 self.flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION) 80 self.stage = AccountLockdownStage.objects.create( 81 name="lockdown", 82 ) 83 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 84 self.request_factory = RequestFactory() 85 86 def make_stage_view(self, plan: FlowPlan): 87 def _stage_ok(): 88 return HttpResponse(status=204) 89 90 def _stage_invalid(_error_message=None): 91 return HttpResponse(status=400) 92 93 return AccountLockdownStageView( 94 SimpleNamespace( 95 plan=plan, 96 current_stage=self.stage, 97 current_binding=self.binding, 98 flow=self.flow, 99 stage_ok=_stage_ok, 100 stage_invalid=_stage_invalid, 101 ) 102 ) 103 104 def make_request(self, *, user=None, query=None): 105 return self.request_factory.post( 106 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 107 query_params=query or {}, 108 user=user, 109 ) 110 111 def get_lockdown_event(self): 112 """Return the account-lockdown user-write event.""" 113 return Event.objects.filter( 114 action=EventAction.USER_WRITE, 115 context__action_id=LOCKDOWN_EVENT_ACTION_ID, 116 ).first()
Shared setup helpers for account lockdown stage tests.
75 def setUp(self): 76 super().setUp() 77 self.user = create_test_admin_user() 78 self.target_user = create_test_admin_user() 79 self.flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION) 80 self.stage = AccountLockdownStage.objects.create( 81 name="lockdown", 82 ) 83 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 84 self.request_factory = RequestFactory()
86 def make_stage_view(self, plan: FlowPlan): 87 def _stage_ok(): 88 return HttpResponse(status=204) 89 90 def _stage_invalid(_error_message=None): 91 return HttpResponse(status=400) 92 93 return AccountLockdownStageView( 94 SimpleNamespace( 95 plan=plan, 96 current_stage=self.stage, 97 current_binding=self.binding, 98 flow=self.flow, 99 stage_ok=_stage_ok, 100 stage_invalid=_stage_invalid, 101 ) 102 )
119class TestAccountLockdownStage(AccountLockdownStageTestMixin, FlowTestCase): 120 """Account lockdown stage tests""" 121 122 def test_lockdown_no_target(self): 123 """Test lockdown stage with no pending user fails""" 124 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 125 view = self.make_stage_view(plan) 126 127 response = view.dispatch(self.make_request()) 128 129 self.assertEqual(response.status_code, 400) 130 131 def test_lockdown_with_pending_user(self): 132 """Test lockdown stage with a pending target user.""" 133 self.target_user.is_active = True 134 self.target_user.save() 135 136 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 137 plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Security incident" 138 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 139 view = self.make_stage_view(plan) 140 request = self.make_request(user=self.user) 141 142 self.assertTrue(can_lock_user(request.user, self.target_user)) 143 response = view.dispatch(request) 144 145 self.target_user.refresh_from_db() 146 self.assertFalse(self.target_user.is_active) 147 self.assertFalse(self.target_user.has_usable_password()) 148 self.assertEqual(response.status_code, 204) 149 150 # Check event was created 151 event = self.get_lockdown_event() 152 self.assertIsNotNone(event) 153 self.assertEqual(event.context["action_id"], LOCKDOWN_EVENT_ACTION_ID) 154 self.assertEqual(event.context["reason"], "Security incident") 155 self.assertEqual(event.context["affected_user"], self.target_user.username) 156 157 def test_lockdown_with_pending_user_reason(self): 158 """Test lockdown stage with a pending target and explicit reason.""" 159 self.target_user.is_active = True 160 self.target_user.save() 161 162 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 163 plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Compromised account" 164 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 165 view = self.make_stage_view(plan) 166 request = self.make_request(user=self.user) 167 168 self.assertTrue(can_lock_user(request.user, self.target_user)) 169 response = view.dispatch(request) 170 171 self.target_user.refresh_from_db() 172 self.assertFalse(self.target_user.is_active) 173 self.assertEqual(response.status_code, 204) 174 175 def test_lockdown_reason_from_prompt(self): 176 """Test lockdown stage reads the reason from prompt data.""" 177 self.target_user.is_active = True 178 self.target_user.save() 179 180 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 181 plan.context[PLAN_CONTEXT_PROMPT] = { 182 PLAN_CONTEXT_LOCKDOWN_REASON: "User requested lockdown", 183 } 184 view = self.make_stage_view(plan) 185 request = self.make_request(user=self.user) 186 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 187 188 event = self.get_lockdown_event() 189 self.assertIsNotNone(event) 190 self.assertEqual(event.context["reason"], "User requested lockdown") 191 192 def test_lockdown_event_failure_does_not_fail_self_service(self): 193 """Test lockdown still succeeds when event emission fails.""" 194 self.stage.delete_sessions = False 195 self.stage.save() 196 197 self.target_user.is_active = True 198 self.target_user.save() 199 200 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 201 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 202 view = self.make_stage_view(plan) 203 request = self.make_request(user=self.target_user) 204 205 original_event_new = Event.new 206 207 def _event_new_side_effect(action, *args, **kwargs): 208 if ( 209 action == EventAction.USER_WRITE 210 and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID 211 ): 212 raise RuntimeError("simulated event failure") 213 return original_event_new(action, *args, **kwargs) 214 215 with patch( 216 "authentik.enterprise.stages.account_lockdown.stage.Event.new", 217 side_effect=_event_new_side_effect, 218 ): 219 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 220 221 self.target_user.refresh_from_db() 222 self.assertFalse(self.target_user.is_active) 223 224 def test_dispatch_records_success_when_event_emission_fails(self): 225 """Test dispatch still completes if event emission fails.""" 226 self.stage.delete_sessions = False 227 self.stage.save() 228 229 self.target_user.is_active = True 230 self.target_user.save() 231 232 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 233 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 234 view = self.make_stage_view(plan) 235 request = self.make_request( 236 user=self.target_user, 237 ) 238 239 original_event_new = Event.new 240 241 def _event_new_side_effect(action, *args, **kwargs): 242 if ( 243 action == EventAction.USER_WRITE 244 and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID 245 ): 246 raise RuntimeError("simulated event failure") 247 return original_event_new(action, *args, **kwargs) 248 249 with patch( 250 "authentik.enterprise.stages.account_lockdown.stage.Event.new", 251 side_effect=_event_new_side_effect, 252 ): 253 response = view.dispatch(request) 254 255 self.target_user.refresh_from_db() 256 self.assertFalse(self.target_user.is_active) 257 self.assertEqual(response.status_code, 204) 258 259 def test_lockdown_self_service_redirects_to_completion_flow(self): 260 """Test self-service lockdown redirects to completion flow when sessions are deleted.""" 261 completion_flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION) 262 self.stage.self_service_completion_flow = completion_flow 263 self.stage.save() 264 265 self.target_user.is_active = True 266 self.target_user.save() 267 268 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 269 view = self.make_stage_view(plan) 270 request = self.make_request(user=self.target_user) 271 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 272 response = view._self_service_completion_response(request) 273 274 self.assertEqual(response.status_code, 302) 275 self.assertEqual( 276 response.url, 277 reverse("authentik_core:if-flow", kwargs={"flow_slug": completion_flow.slug}), 278 ) 279 280 def test_lockdown_self_service_requires_completion_flow(self): 281 """Test self-service lockdown fails before deleting sessions without a completion flow.""" 282 self.stage.self_service_completion_flow = None 283 self.stage.save() 284 285 self.target_user.is_active = True 286 self.target_user.save() 287 288 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 289 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 290 view = self.make_stage_view(plan) 291 request = self.make_request(user=self.target_user) 292 293 response = view.dispatch(request) 294 295 self.assertEqual(response.status_code, 400) 296 self.target_user.refresh_from_db() 297 self.assertTrue(self.target_user.is_active) 298 299 def test_lockdown_denies_other_user_without_permission(self): 300 """Test lockdown stage rejects non-self requests without change_user permission.""" 301 actor = create_test_user() 302 303 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 304 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 305 view = self.make_stage_view(plan) 306 request = self.make_request(user=actor) 307 308 self.assertFalse(can_lock_user(request.user, self.target_user)) 309 response = view.dispatch(request) 310 self.assertEqual(response.status_code, 400) 311 312 def test_lockdown_revokes_tokens(self): 313 """Test lockdown stage revokes tokens""" 314 Token.objects.create( 315 user=self.target_user, 316 identifier="test-token", 317 intent=TokenIntents.INTENT_API, 318 key=generate_id(), 319 expiring=False, 320 ) 321 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1) 322 323 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 324 view = self.make_stage_view(plan) 325 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 326 327 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0) 328 329 def test_lockdown_revokes_provider_tokens(self): 330 """Test lockdown stage revokes provider tokens and sessions.""" 331 oauth_provider = OAuth2Provider.objects.create( 332 name=generate_id(), 333 authorization_flow=create_test_flow(), 334 redirect_uris=[ 335 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback") 336 ], 337 signing_key=create_test_cert(), 338 ) 339 saml_provider = SAMLProvider.objects.create( 340 name=generate_id(), 341 authorization_flow=create_test_flow(), 342 acs_url="https://sp.example.com/acs", 343 issuer_override="https://idp.example.com", 344 ) 345 session = Session.objects.create( 346 session_key=generate_id(), 347 expires=timezone.now() + timezone.timedelta(hours=1), 348 last_ip="127.0.0.1", 349 ) 350 auth_session = AuthenticatedSession.objects.create( 351 session=session, 352 user=self.target_user, 353 ) 354 grant_kwargs = { 355 "provider": oauth_provider, 356 "user": self.target_user, 357 "auth_time": timezone.now(), 358 "_scope": "openid profile", 359 "expiring": False, 360 } 361 token_kwargs = grant_kwargs | {"_id_token": json.dumps(asdict(IDToken("foo", "bar")))} 362 AuthorizationCode.objects.create( 363 code=generate_id(), 364 session=auth_session, 365 **grant_kwargs, 366 ) 367 AccessToken.objects.create( 368 token=generate_id(), 369 session=auth_session, 370 **token_kwargs, 371 ) 372 RefreshToken.objects.create( 373 token=generate_id(), 374 session=auth_session, 375 **token_kwargs, 376 ) 377 DeviceToken.objects.create( 378 provider=oauth_provider, 379 user=self.target_user, 380 session=auth_session, 381 _scope="openid profile", 382 expiring=False, 383 ) 384 SAMLSession.objects.create( 385 provider=saml_provider, 386 user=self.target_user, 387 session=auth_session, 388 session_index=generate_id(), 389 name_id=self.target_user.email, 390 expires=timezone.now() + timezone.timedelta(hours=1), 391 expiring=True, 392 ) 393 394 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 395 view = self.make_stage_view(plan) 396 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 397 398 self.assertEqual(AuthorizationCode.objects.filter(user=self.target_user).count(), 0) 399 self.assertEqual(AccessToken.objects.filter(user=self.target_user).count(), 0) 400 self.assertEqual(RefreshToken.objects.filter(user=self.target_user).count(), 0) 401 self.assertEqual(DeviceToken.objects.filter(user=self.target_user).count(), 0) 402 self.assertEqual(SAMLSession.objects.filter(user=self.target_user).count(), 0) 403 404 def test_lockdown_selective_actions(self): 405 """Test lockdown stage with selective actions""" 406 self.stage.deactivate_user = True 407 self.stage.set_unusable_password = False 408 self.stage.delete_sessions = False 409 self.stage.revoke_tokens = False 410 self.stage.save() 411 412 self.target_user.is_active = True 413 self.target_user.set_password("testpassword") 414 self.target_user.save() 415 416 Token.objects.create( 417 user=self.target_user, 418 identifier="test-token", 419 intent=TokenIntents.INTENT_API, 420 key=generate_id(), 421 expiring=False, 422 ) 423 424 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 425 view = self.make_stage_view(plan) 426 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 427 428 self.target_user.refresh_from_db() 429 # User should be deactivated 430 self.assertFalse(self.target_user.is_active) 431 # Password should still be usable 432 self.assertTrue(self.target_user.has_usable_password()) 433 # Token should still exist 434 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1) 435 436 def test_lockdown_no_actions(self): 437 """Test lockdown stage with all actions disabled""" 438 self.stage.deactivate_user = False 439 self.stage.set_unusable_password = False 440 self.stage.delete_sessions = False 441 self.stage.revoke_tokens = False 442 self.stage.save() 443 444 self.target_user.is_active = True 445 self.target_user.set_password("testpassword") 446 self.target_user.save() 447 448 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 449 view = self.make_stage_view(plan) 450 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 451 452 self.target_user.refresh_from_db() 453 # User should still be active 454 self.assertTrue(self.target_user.is_active) 455 # Password should still be usable 456 self.assertTrue(self.target_user.has_usable_password()) 457 # Event should still be created 458 event = self.get_lockdown_event() 459 self.assertIsNotNone(event) 460 461 def test_lockdown_deactivation_inhibits_signal_dispatch_until_after_commit(self): 462 """Test lockdown queues explicit outgoing syncs after the deactivation transaction.""" 463 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 464 view = self.make_stage_view(plan) 465 466 with ( 467 patch( 468 "authentik.enterprise.stages.account_lockdown.stage.sync_outgoing_inhibit_dispatch" 469 ) as inhibit, 470 patch.object(view, "_sync_deactivated_user_to_outgoing_providers") as sync_outgoing, 471 ): 472 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 473 474 inhibit.assert_called_once() 475 sync_outgoing.assert_called_once() 476 synced_user = sync_outgoing.call_args.args[0] 477 self.assertEqual(synced_user.pk, self.target_user.pk) 478 self.assertFalse(synced_user.is_active) 479 480 def test_lockdown_waits_for_direct_outgoing_provider_syncs(self): 481 """Test direct outgoing sync tasks are enqueued and waited on.""" 482 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 483 view = self.make_stage_view(plan) 484 provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5") 485 task_sync_direct = MagicMock() 486 task_sync_direct.message_with_options.return_value = "direct-message" 487 provider_model = SimpleNamespace( 488 objects=SimpleNamespace(filter=MagicMock(return_value=[provider])) 489 ) 490 task_group = MagicMock() 491 492 with ( 493 patch( 494 "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks", 495 return_value=((provider_model, task_sync_direct),), 496 ), 497 patch( 498 "authentik.enterprise.stages.account_lockdown.stage.group", 499 return_value=task_group, 500 ) as task_group_cls, 501 ): 502 view._sync_deactivated_user_to_outgoing_providers(self.target_user) 503 504 task_sync_direct.message_with_options.assert_called_once_with( 505 args=(class_to_path(type(self.target_user)), self.target_user.pk, provider.pk), 506 rel_obj=provider, 507 time_limit=5000, 508 uid=f"{provider.name}:user:{self.target_user.pk}:direct", 509 ) 510 task_group_cls.assert_called_once_with(["direct-message"]) 511 task_group.run.return_value.wait.assert_called_once_with(timeout=5000) 512 513 def test_lockdown_outgoing_provider_sync_timeout_leaves_tasks_running(self): 514 """Test timeout while waiting for direct outgoing syncs does not fail lockdown.""" 515 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 516 view = self.make_stage_view(plan) 517 provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5") 518 task_sync_direct = MagicMock() 519 task_sync_direct.message_with_options.return_value = "direct-message" 520 provider_model = SimpleNamespace( 521 objects=SimpleNamespace(filter=MagicMock(return_value=[provider])) 522 ) 523 task_group = MagicMock() 524 task_group.run.return_value.wait.side_effect = ResultTimeout("timed out") 525 526 with ( 527 patch( 528 "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks", 529 return_value=((provider_model, task_sync_direct),), 530 ), 531 patch( 532 "authentik.enterprise.stages.account_lockdown.stage.group", 533 return_value=task_group, 534 ), 535 ): 536 view._sync_deactivated_user_to_outgoing_providers(self.target_user) 537 538 task_group.run.assert_called_once_with() 539 task_group.run.return_value.wait.assert_called_once_with(timeout=5000) 540 541 def test_lockdown_outgoing_provider_sync_failure_does_not_fail_lockdown(self): 542 """Test completed local lockdown still emits an event if outgoing sync fails.""" 543 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 544 view = self.make_stage_view(plan) 545 546 with patch.object( 547 view, 548 "_sync_deactivated_user_to_outgoing_providers", 549 side_effect=ValueError("sync failed"), 550 ): 551 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 552 553 self.target_user.refresh_from_db() 554 self.assertFalse(self.target_user.is_active) 555 event = self.get_lockdown_event() 556 self.assertIsNotNone(event)
Account lockdown stage tests
122 def test_lockdown_no_target(self): 123 """Test lockdown stage with no pending user fails""" 124 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 125 view = self.make_stage_view(plan) 126 127 response = view.dispatch(self.make_request()) 128 129 self.assertEqual(response.status_code, 400)
Test lockdown stage with no pending user fails
131 def test_lockdown_with_pending_user(self): 132 """Test lockdown stage with a pending target user.""" 133 self.target_user.is_active = True 134 self.target_user.save() 135 136 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 137 plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Security incident" 138 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 139 view = self.make_stage_view(plan) 140 request = self.make_request(user=self.user) 141 142 self.assertTrue(can_lock_user(request.user, self.target_user)) 143 response = view.dispatch(request) 144 145 self.target_user.refresh_from_db() 146 self.assertFalse(self.target_user.is_active) 147 self.assertFalse(self.target_user.has_usable_password()) 148 self.assertEqual(response.status_code, 204) 149 150 # Check event was created 151 event = self.get_lockdown_event() 152 self.assertIsNotNone(event) 153 self.assertEqual(event.context["action_id"], LOCKDOWN_EVENT_ACTION_ID) 154 self.assertEqual(event.context["reason"], "Security incident") 155 self.assertEqual(event.context["affected_user"], self.target_user.username)
Test lockdown stage with a pending target user.
157 def test_lockdown_with_pending_user_reason(self): 158 """Test lockdown stage with a pending target and explicit reason.""" 159 self.target_user.is_active = True 160 self.target_user.save() 161 162 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 163 plan.context[PLAN_CONTEXT_LOCKDOWN_REASON] = "Compromised account" 164 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 165 view = self.make_stage_view(plan) 166 request = self.make_request(user=self.user) 167 168 self.assertTrue(can_lock_user(request.user, self.target_user)) 169 response = view.dispatch(request) 170 171 self.target_user.refresh_from_db() 172 self.assertFalse(self.target_user.is_active) 173 self.assertEqual(response.status_code, 204)
Test lockdown stage with a pending target and explicit reason.
175 def test_lockdown_reason_from_prompt(self): 176 """Test lockdown stage reads the reason from prompt data.""" 177 self.target_user.is_active = True 178 self.target_user.save() 179 180 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 181 plan.context[PLAN_CONTEXT_PROMPT] = { 182 PLAN_CONTEXT_LOCKDOWN_REASON: "User requested lockdown", 183 } 184 view = self.make_stage_view(plan) 185 request = self.make_request(user=self.user) 186 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 187 188 event = self.get_lockdown_event() 189 self.assertIsNotNone(event) 190 self.assertEqual(event.context["reason"], "User requested lockdown")
Test lockdown stage reads the reason from prompt data.
192 def test_lockdown_event_failure_does_not_fail_self_service(self): 193 """Test lockdown still succeeds when event emission fails.""" 194 self.stage.delete_sessions = False 195 self.stage.save() 196 197 self.target_user.is_active = True 198 self.target_user.save() 199 200 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 201 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 202 view = self.make_stage_view(plan) 203 request = self.make_request(user=self.target_user) 204 205 original_event_new = Event.new 206 207 def _event_new_side_effect(action, *args, **kwargs): 208 if ( 209 action == EventAction.USER_WRITE 210 and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID 211 ): 212 raise RuntimeError("simulated event failure") 213 return original_event_new(action, *args, **kwargs) 214 215 with patch( 216 "authentik.enterprise.stages.account_lockdown.stage.Event.new", 217 side_effect=_event_new_side_effect, 218 ): 219 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 220 221 self.target_user.refresh_from_db() 222 self.assertFalse(self.target_user.is_active)
Test lockdown still succeeds when event emission fails.
224 def test_dispatch_records_success_when_event_emission_fails(self): 225 """Test dispatch still completes if event emission fails.""" 226 self.stage.delete_sessions = False 227 self.stage.save() 228 229 self.target_user.is_active = True 230 self.target_user.save() 231 232 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 233 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 234 view = self.make_stage_view(plan) 235 request = self.make_request( 236 user=self.target_user, 237 ) 238 239 original_event_new = Event.new 240 241 def _event_new_side_effect(action, *args, **kwargs): 242 if ( 243 action == EventAction.USER_WRITE 244 and kwargs.get("action_id") == LOCKDOWN_EVENT_ACTION_ID 245 ): 246 raise RuntimeError("simulated event failure") 247 return original_event_new(action, *args, **kwargs) 248 249 with patch( 250 "authentik.enterprise.stages.account_lockdown.stage.Event.new", 251 side_effect=_event_new_side_effect, 252 ): 253 response = view.dispatch(request) 254 255 self.target_user.refresh_from_db() 256 self.assertFalse(self.target_user.is_active) 257 self.assertEqual(response.status_code, 204)
Test dispatch still completes if event emission fails.
259 def test_lockdown_self_service_redirects_to_completion_flow(self): 260 """Test self-service lockdown redirects to completion flow when sessions are deleted.""" 261 completion_flow = create_test_flow(FlowDesignation.STAGE_CONFIGURATION) 262 self.stage.self_service_completion_flow = completion_flow 263 self.stage.save() 264 265 self.target_user.is_active = True 266 self.target_user.save() 267 268 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 269 view = self.make_stage_view(plan) 270 request = self.make_request(user=self.target_user) 271 view._lockdown_user(request, self.stage, self.target_user, view.get_reason()) 272 response = view._self_service_completion_response(request) 273 274 self.assertEqual(response.status_code, 302) 275 self.assertEqual( 276 response.url, 277 reverse("authentik_core:if-flow", kwargs={"flow_slug": completion_flow.slug}), 278 )
Test self-service lockdown redirects to completion flow when sessions are deleted.
280 def test_lockdown_self_service_requires_completion_flow(self): 281 """Test self-service lockdown fails before deleting sessions without a completion flow.""" 282 self.stage.self_service_completion_flow = None 283 self.stage.save() 284 285 self.target_user.is_active = True 286 self.target_user.save() 287 288 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 289 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 290 view = self.make_stage_view(plan) 291 request = self.make_request(user=self.target_user) 292 293 response = view.dispatch(request) 294 295 self.assertEqual(response.status_code, 400) 296 self.target_user.refresh_from_db() 297 self.assertTrue(self.target_user.is_active)
Test self-service lockdown fails before deleting sessions without a completion flow.
299 def test_lockdown_denies_other_user_without_permission(self): 300 """Test lockdown stage rejects non-self requests without change_user permission.""" 301 actor = create_test_user() 302 303 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 304 plan.context[PLAN_CONTEXT_PENDING_USER] = self.target_user 305 view = self.make_stage_view(plan) 306 request = self.make_request(user=actor) 307 308 self.assertFalse(can_lock_user(request.user, self.target_user)) 309 response = view.dispatch(request) 310 self.assertEqual(response.status_code, 400)
Test lockdown stage rejects non-self requests without change_user permission.
312 def test_lockdown_revokes_tokens(self): 313 """Test lockdown stage revokes tokens""" 314 Token.objects.create( 315 user=self.target_user, 316 identifier="test-token", 317 intent=TokenIntents.INTENT_API, 318 key=generate_id(), 319 expiring=False, 320 ) 321 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1) 322 323 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 324 view = self.make_stage_view(plan) 325 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 326 327 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)
Test lockdown stage revokes tokens
329 def test_lockdown_revokes_provider_tokens(self): 330 """Test lockdown stage revokes provider tokens and sessions.""" 331 oauth_provider = OAuth2Provider.objects.create( 332 name=generate_id(), 333 authorization_flow=create_test_flow(), 334 redirect_uris=[ 335 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback") 336 ], 337 signing_key=create_test_cert(), 338 ) 339 saml_provider = SAMLProvider.objects.create( 340 name=generate_id(), 341 authorization_flow=create_test_flow(), 342 acs_url="https://sp.example.com/acs", 343 issuer_override="https://idp.example.com", 344 ) 345 session = Session.objects.create( 346 session_key=generate_id(), 347 expires=timezone.now() + timezone.timedelta(hours=1), 348 last_ip="127.0.0.1", 349 ) 350 auth_session = AuthenticatedSession.objects.create( 351 session=session, 352 user=self.target_user, 353 ) 354 grant_kwargs = { 355 "provider": oauth_provider, 356 "user": self.target_user, 357 "auth_time": timezone.now(), 358 "_scope": "openid profile", 359 "expiring": False, 360 } 361 token_kwargs = grant_kwargs | {"_id_token": json.dumps(asdict(IDToken("foo", "bar")))} 362 AuthorizationCode.objects.create( 363 code=generate_id(), 364 session=auth_session, 365 **grant_kwargs, 366 ) 367 AccessToken.objects.create( 368 token=generate_id(), 369 session=auth_session, 370 **token_kwargs, 371 ) 372 RefreshToken.objects.create( 373 token=generate_id(), 374 session=auth_session, 375 **token_kwargs, 376 ) 377 DeviceToken.objects.create( 378 provider=oauth_provider, 379 user=self.target_user, 380 session=auth_session, 381 _scope="openid profile", 382 expiring=False, 383 ) 384 SAMLSession.objects.create( 385 provider=saml_provider, 386 user=self.target_user, 387 session=auth_session, 388 session_index=generate_id(), 389 name_id=self.target_user.email, 390 expires=timezone.now() + timezone.timedelta(hours=1), 391 expiring=True, 392 ) 393 394 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 395 view = self.make_stage_view(plan) 396 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 397 398 self.assertEqual(AuthorizationCode.objects.filter(user=self.target_user).count(), 0) 399 self.assertEqual(AccessToken.objects.filter(user=self.target_user).count(), 0) 400 self.assertEqual(RefreshToken.objects.filter(user=self.target_user).count(), 0) 401 self.assertEqual(DeviceToken.objects.filter(user=self.target_user).count(), 0) 402 self.assertEqual(SAMLSession.objects.filter(user=self.target_user).count(), 0)
Test lockdown stage revokes provider tokens and sessions.
404 def test_lockdown_selective_actions(self): 405 """Test lockdown stage with selective actions""" 406 self.stage.deactivate_user = True 407 self.stage.set_unusable_password = False 408 self.stage.delete_sessions = False 409 self.stage.revoke_tokens = False 410 self.stage.save() 411 412 self.target_user.is_active = True 413 self.target_user.set_password("testpassword") 414 self.target_user.save() 415 416 Token.objects.create( 417 user=self.target_user, 418 identifier="test-token", 419 intent=TokenIntents.INTENT_API, 420 key=generate_id(), 421 expiring=False, 422 ) 423 424 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 425 view = self.make_stage_view(plan) 426 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 427 428 self.target_user.refresh_from_db() 429 # User should be deactivated 430 self.assertFalse(self.target_user.is_active) 431 # Password should still be usable 432 self.assertTrue(self.target_user.has_usable_password()) 433 # Token should still exist 434 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 1)
Test lockdown stage with selective actions
436 def test_lockdown_no_actions(self): 437 """Test lockdown stage with all actions disabled""" 438 self.stage.deactivate_user = False 439 self.stage.set_unusable_password = False 440 self.stage.delete_sessions = False 441 self.stage.revoke_tokens = False 442 self.stage.save() 443 444 self.target_user.is_active = True 445 self.target_user.set_password("testpassword") 446 self.target_user.save() 447 448 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 449 view = self.make_stage_view(plan) 450 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 451 452 self.target_user.refresh_from_db() 453 # User should still be active 454 self.assertTrue(self.target_user.is_active) 455 # Password should still be usable 456 self.assertTrue(self.target_user.has_usable_password()) 457 # Event should still be created 458 event = self.get_lockdown_event() 459 self.assertIsNotNone(event)
Test lockdown stage with all actions disabled
461 def test_lockdown_deactivation_inhibits_signal_dispatch_until_after_commit(self): 462 """Test lockdown queues explicit outgoing syncs after the deactivation transaction.""" 463 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 464 view = self.make_stage_view(plan) 465 466 with ( 467 patch( 468 "authentik.enterprise.stages.account_lockdown.stage.sync_outgoing_inhibit_dispatch" 469 ) as inhibit, 470 patch.object(view, "_sync_deactivated_user_to_outgoing_providers") as sync_outgoing, 471 ): 472 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 473 474 inhibit.assert_called_once() 475 sync_outgoing.assert_called_once() 476 synced_user = sync_outgoing.call_args.args[0] 477 self.assertEqual(synced_user.pk, self.target_user.pk) 478 self.assertFalse(synced_user.is_active)
Test lockdown queues explicit outgoing syncs after the deactivation transaction.
480 def test_lockdown_waits_for_direct_outgoing_provider_syncs(self): 481 """Test direct outgoing sync tasks are enqueued and waited on.""" 482 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 483 view = self.make_stage_view(plan) 484 provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5") 485 task_sync_direct = MagicMock() 486 task_sync_direct.message_with_options.return_value = "direct-message" 487 provider_model = SimpleNamespace( 488 objects=SimpleNamespace(filter=MagicMock(return_value=[provider])) 489 ) 490 task_group = MagicMock() 491 492 with ( 493 patch( 494 "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks", 495 return_value=((provider_model, task_sync_direct),), 496 ), 497 patch( 498 "authentik.enterprise.stages.account_lockdown.stage.group", 499 return_value=task_group, 500 ) as task_group_cls, 501 ): 502 view._sync_deactivated_user_to_outgoing_providers(self.target_user) 503 504 task_sync_direct.message_with_options.assert_called_once_with( 505 args=(class_to_path(type(self.target_user)), self.target_user.pk, provider.pk), 506 rel_obj=provider, 507 time_limit=5000, 508 uid=f"{provider.name}:user:{self.target_user.pk}:direct", 509 ) 510 task_group_cls.assert_called_once_with(["direct-message"]) 511 task_group.run.return_value.wait.assert_called_once_with(timeout=5000)
Test direct outgoing sync tasks are enqueued and waited on.
513 def test_lockdown_outgoing_provider_sync_timeout_leaves_tasks_running(self): 514 """Test timeout while waiting for direct outgoing syncs does not fail lockdown.""" 515 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 516 view = self.make_stage_view(plan) 517 provider = SimpleNamespace(name="outgoing", pk=1, sync_page_timeout="seconds=5") 518 task_sync_direct = MagicMock() 519 task_sync_direct.message_with_options.return_value = "direct-message" 520 provider_model = SimpleNamespace( 521 objects=SimpleNamespace(filter=MagicMock(return_value=[provider])) 522 ) 523 task_group = MagicMock() 524 task_group.run.return_value.wait.side_effect = ResultTimeout("timed out") 525 526 with ( 527 patch( 528 "authentik.enterprise.stages.account_lockdown.stage.get_outgoing_sync_tasks", 529 return_value=((provider_model, task_sync_direct),), 530 ), 531 patch( 532 "authentik.enterprise.stages.account_lockdown.stage.group", 533 return_value=task_group, 534 ), 535 ): 536 view._sync_deactivated_user_to_outgoing_providers(self.target_user) 537 538 task_group.run.assert_called_once_with() 539 task_group.run.return_value.wait.assert_called_once_with(timeout=5000)
Test timeout while waiting for direct outgoing syncs does not fail lockdown.
541 def test_lockdown_outgoing_provider_sync_failure_does_not_fail_lockdown(self): 542 """Test completed local lockdown still emits an event if outgoing sync fails.""" 543 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 544 view = self.make_stage_view(plan) 545 546 with patch.object( 547 view, 548 "_sync_deactivated_user_to_outgoing_providers", 549 side_effect=ValueError("sync failed"), 550 ): 551 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 552 553 self.target_user.refresh_from_db() 554 self.assertFalse(self.target_user.is_active) 555 event = self.get_lockdown_event() 556 self.assertIsNotNone(event)
Test completed local lockdown still emits an event if outgoing sync fails.
559class TestAccountLockdownStageConcurrency(AccountLockdownStageTestMixin, TransactionTestCase): 560 """Account lockdown concurrency tests.""" 561 562 def test_lockdown_retries_when_another_transaction_recreates_a_token(self): 563 """Lockdown should remove a token recreated before the retry check runs.""" 564 Token.objects.create( 565 user=self.target_user, 566 identifier=f"initial-token-{generate_id()}", 567 intent=TokenIntents.INTENT_API, 568 key=generate_id(), 569 expiring=False, 570 ) 571 572 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 573 view = self.make_stage_view(plan) 574 original_has_artifacts = view._has_lockdown_artifacts 575 target_user = self.target_user 576 thread_ready = ThreadEvent() 577 start_create = ThreadEvent() 578 thread_done = ThreadEvent() 579 thread_errors = [] 580 581 class TokenCreatorThread(Thread): 582 __test__ = False 583 584 def run(self): 585 try: 586 thread_ready.set() 587 if not start_create.wait(timeout=5): 588 thread_errors.append("timed out waiting to recreate token") 589 return 590 Token.objects.create( 591 user=target_user, 592 identifier=f"concurrent-token-{generate_id()}", 593 intent=TokenIntents.INTENT_API, 594 key=generate_id(), 595 expiring=False, 596 ) 597 except Exception as exc: # noqa: BLE001 598 thread_errors.append(exc) 599 finally: 600 thread_done.set() 601 connection.close() 602 603 def has_artifacts_after_concurrent_create(stage, user): 604 if not start_create.is_set(): 605 start_create.set() 606 self.assertTrue( 607 thread_done.wait(timeout=30), 608 ( 609 "Concurrent token creation did not complete " 610 f"before retry check: {thread_errors}" 611 ), 612 ) 613 return original_has_artifacts(stage, user) 614 615 creator = TokenCreatorThread() 616 with patch.object( 617 view, "_has_lockdown_artifacts", side_effect=has_artifacts_after_concurrent_create 618 ): 619 creator.start() 620 self.assertTrue( 621 thread_ready.wait(timeout=5), 622 "Concurrent token creation thread did not start", 623 ) 624 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 625 creator.join() 626 627 self.assertEqual(thread_errors, []) 628 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)
Account lockdown concurrency tests.
562 def test_lockdown_retries_when_another_transaction_recreates_a_token(self): 563 """Lockdown should remove a token recreated before the retry check runs.""" 564 Token.objects.create( 565 user=self.target_user, 566 identifier=f"initial-token-{generate_id()}", 567 intent=TokenIntents.INTENT_API, 568 key=generate_id(), 569 expiring=False, 570 ) 571 572 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 573 view = self.make_stage_view(plan) 574 original_has_artifacts = view._has_lockdown_artifacts 575 target_user = self.target_user 576 thread_ready = ThreadEvent() 577 start_create = ThreadEvent() 578 thread_done = ThreadEvent() 579 thread_errors = [] 580 581 class TokenCreatorThread(Thread): 582 __test__ = False 583 584 def run(self): 585 try: 586 thread_ready.set() 587 if not start_create.wait(timeout=5): 588 thread_errors.append("timed out waiting to recreate token") 589 return 590 Token.objects.create( 591 user=target_user, 592 identifier=f"concurrent-token-{generate_id()}", 593 intent=TokenIntents.INTENT_API, 594 key=generate_id(), 595 expiring=False, 596 ) 597 except Exception as exc: # noqa: BLE001 598 thread_errors.append(exc) 599 finally: 600 thread_done.set() 601 connection.close() 602 603 def has_artifacts_after_concurrent_create(stage, user): 604 if not start_create.is_set(): 605 start_create.set() 606 self.assertTrue( 607 thread_done.wait(timeout=30), 608 ( 609 "Concurrent token creation did not complete " 610 f"before retry check: {thread_errors}" 611 ), 612 ) 613 return original_has_artifacts(stage, user) 614 615 creator = TokenCreatorThread() 616 with patch.object( 617 view, "_has_lockdown_artifacts", side_effect=has_artifacts_after_concurrent_create 618 ): 619 creator.start() 620 self.assertTrue( 621 thread_ready.wait(timeout=5), 622 "Concurrent token creation thread did not start", 623 ) 624 view._lockdown_user(self.make_request(user=self.user), self.stage, self.target_user, "") 625 creator.join() 626 627 self.assertEqual(thread_errors, []) 628 self.assertEqual(Token.objects.filter(user=self.target_user).count(), 0)
Lockdown should remove a token recreated before the retry check runs.