authentik.flows.tests.test_executor
flow views tests
1"""flow views tests""" 2 3from datetime import timedelta 4from unittest.mock import MagicMock, PropertyMock, patch 5from urllib.parse import urlencode 6 7from django.http import HttpRequest, HttpResponse 8from django.test import override_settings 9from django.test.client import RequestFactory 10from django.urls import reverse 11from django.utils.timezone import now 12from rest_framework.exceptions import ParseError 13 14from authentik.core.models import Group, User 15from authentik.core.tests.utils import create_test_flow, create_test_user 16from authentik.flows.markers import ReevaluateMarker, StageMarker 17from authentik.flows.models import ( 18 FlowAuthenticationRequirement, 19 FlowDeniedAction, 20 FlowDesignation, 21 FlowStageBinding, 22 FlowToken, 23 InvalidResponseAction, 24) 25from authentik.flows.planner import FlowPlan, FlowPlanner 26from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView 27from authentik.flows.tests import FlowTestCase 28from authentik.flows.views.executor import ( 29 NEXT_ARG_NAME, 30 QS_KEY_TOKEN, 31 QS_QUERY, 32 SESSION_KEY_PLAN, 33 FlowExecutorView, 34) 35from authentik.lib.generators import generate_id 36from authentik.policies.dummy.models import DummyPolicy 37from authentik.policies.models import PolicyBinding 38from authentik.policies.reputation.models import ReputationPolicy 39from authentik.policies.types import PolicyResult 40from authentik.stages.deny.models import DenyStage 41from authentik.stages.dummy.models import DummyStage 42from authentik.stages.identification.models import IdentificationStage, UserFields 43from authentik.stages.password.models import PasswordStage 44 45POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False, "foo")) 46POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True)) 47 48 49def to_stage_response(request: HttpRequest, source: HttpResponse): 50 """Mock for to_stage_response that returns the original response, so we can check 51 inheritance and member attributes""" 52 return source 53 54 55TO_STAGE_RESPONSE_MOCK = MagicMock(side_effect=to_stage_response) 56 57 58class TestFlowExecutor(FlowTestCase): 59 """Test executor""" 60 61 def setUp(self): 62 self.request_factory = RequestFactory() 63 64 @patch( 65 "authentik.flows.views.executor.to_stage_response", 66 TO_STAGE_RESPONSE_MOCK, 67 ) 68 def test_existing_plan_diff_flow(self): 69 """Check that a plan for a different flow cancels the current plan""" 70 flow = create_test_flow( 71 FlowDesignation.AUTHENTICATION, 72 ) 73 stage = DummyStage.objects.create(name=generate_id()) 74 binding = FlowStageBinding(target=flow, stage=stage, order=0) 75 plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()]) 76 session = self.client.session 77 session[SESSION_KEY_PLAN] = plan 78 session.save() 79 80 cancel_mock = MagicMock() 81 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock): 82 response = self.client.get( 83 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 84 ) 85 self.assertEqual(response.status_code, 302) 86 self.assertEqual(cancel_mock.call_count, 2) 87 88 @patch( 89 "authentik.flows.views.executor.to_stage_response", 90 TO_STAGE_RESPONSE_MOCK, 91 ) 92 @patch( 93 "authentik.policies.engine.PolicyEngine.result", 94 POLICY_RETURN_FALSE, 95 ) 96 def test_invalid_non_applicable_flow(self): 97 """Tests that a non-applicable flow returns the correct error message""" 98 flow = create_test_flow( 99 FlowDesignation.AUTHENTICATION, 100 ) 101 102 response = self.client.get( 103 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 104 ) 105 self.assertStageResponse( 106 response, 107 flow=flow, 108 error_message="foo", 109 component="ak-stage-access-denied", 110 ) 111 112 @patch( 113 "authentik.flows.views.executor.to_stage_response", 114 TO_STAGE_RESPONSE_MOCK, 115 ) 116 @patch( 117 "authentik.policies.engine.PolicyEngine.result", 118 POLICY_RETURN_FALSE, 119 ) 120 def test_invalid_non_applicable_flow_continue(self): 121 """Tests that a non-applicable flow that should redirect""" 122 flow = create_test_flow( 123 FlowDesignation.AUTHENTICATION, 124 denied_action=FlowDeniedAction.CONTINUE, 125 ) 126 127 response = self.client.get( 128 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 129 ) 130 self.assertEqual(response.status_code, 302) 131 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 132 133 @patch( 134 "authentik.flows.views.executor.to_stage_response", 135 TO_STAGE_RESPONSE_MOCK, 136 ) 137 def test_invalid_flow_redirect(self): 138 """Test invalid flow with valid redirect destination""" 139 flow = create_test_flow( 140 FlowDesignation.AUTHENTICATION, 141 ) 142 143 dest = "/unique-string" 144 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 145 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 146 self.assertEqual(response.status_code, 302) 147 self.assertEqual(response.url, "/unique-string") 148 149 @patch( 150 "authentik.flows.views.executor.to_stage_response", 151 TO_STAGE_RESPONSE_MOCK, 152 ) 153 def test_invalid_flow_invalid_redirect(self): 154 """Test invalid flow redirect with an invalid URL""" 155 flow = create_test_flow( 156 FlowDesignation.AUTHENTICATION, 157 ) 158 159 dest = "http://something.example.com/unique-string" 160 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 161 162 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 163 self.assertEqual(response.status_code, 200) 164 self.assertStageResponse( 165 response, 166 flow, 167 component="ak-stage-access-denied", 168 error_message="Invalid next URL", 169 ) 170 171 @patch( 172 "authentik.flows.views.executor.to_stage_response", 173 TO_STAGE_RESPONSE_MOCK, 174 ) 175 def test_valid_flow_redirect(self): 176 """Test valid flow with valid redirect destination""" 177 flow = create_test_flow() 178 179 dest = "/unique-string" 180 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 181 182 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 183 self.assertEqual(response.status_code, 302) 184 self.assertEqual(response.url, "/unique-string") 185 186 @patch( 187 "authentik.flows.views.executor.to_stage_response", 188 TO_STAGE_RESPONSE_MOCK, 189 ) 190 def test_valid_flow_redirect_authenticated(self): 191 """Test valid flow with valid redirect destination, authenticated already""" 192 flow = create_test_flow() 193 flow.designation = FlowDesignation.AUTHENTICATION 194 flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 195 flow.save() 196 self.client.force_login(create_test_user()) 197 198 dest = "/unique-string" 199 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 200 201 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 202 self.assertEqual(response.status_code, 302) 203 self.assertEqual(response.url, "/unique-string") 204 205 @patch( 206 "authentik.flows.views.executor.to_stage_response", 207 TO_STAGE_RESPONSE_MOCK, 208 ) 209 def test_valid_flow_invalid_redirect(self): 210 """Test valid flow redirect with an invalid URL""" 211 flow = create_test_flow() 212 213 dest = "http://something.example.com/unique-string" 214 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 215 216 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 217 self.assertEqual(response.status_code, 200) 218 self.assertStageResponse( 219 response, 220 flow, 221 component="ak-stage-access-denied", 222 error_message="Invalid next URL", 223 ) 224 225 @patch( 226 "authentik.flows.views.executor.to_stage_response", 227 TO_STAGE_RESPONSE_MOCK, 228 ) 229 def test_invalid_empty_flow(self): 230 """Tests that an empty flow returns the correct error message""" 231 flow = create_test_flow( 232 FlowDesignation.AUTHENTICATION, 233 ) 234 235 response = self.client.get( 236 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 237 ) 238 self.assertEqual(response.status_code, 302) 239 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 240 241 def test_multi_stage_flow(self): 242 """Test a full flow with multiple stages""" 243 flow = create_test_flow( 244 FlowDesignation.AUTHENTICATION, 245 ) 246 FlowStageBinding.objects.create( 247 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 248 ) 249 FlowStageBinding.objects.create( 250 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1 251 ) 252 253 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 254 # First Request, start planning, renders form 255 response = self.client.get(exec_url) 256 self.assertEqual(response.status_code, 200) 257 # Check that two stages are in plan 258 session = self.client.session 259 plan: FlowPlan = session[SESSION_KEY_PLAN] 260 self.assertEqual(len(plan.bindings), 2) 261 # Second request, submit form, one stage left 262 response = self.client.post(exec_url) 263 # Second request redirects to the same URL 264 self.assertEqual(response.status_code, 302) 265 self.assertEqual(response.url, exec_url) 266 # Check that two stages are in plan 267 session = self.client.session 268 plan: FlowPlan = session[SESSION_KEY_PLAN] 269 self.assertEqual(len(plan.bindings), 1) 270 271 @patch( 272 "authentik.flows.views.executor.to_stage_response", 273 TO_STAGE_RESPONSE_MOCK, 274 ) 275 def test_reevaluate_remove_last(self): 276 """Test planner with re-evaluate (last stage is removed)""" 277 flow = create_test_flow( 278 FlowDesignation.AUTHENTICATION, 279 ) 280 false_policy = DummyPolicy.objects.create( 281 name=generate_id(), result=False, wait_min=1, wait_max=2 282 ) 283 284 binding = FlowStageBinding.objects.create( 285 target=flow, 286 stage=DummyStage.objects.create(name=generate_id()), 287 order=0, 288 evaluate_on_plan=True, 289 re_evaluate_policies=False, 290 ) 291 binding2 = FlowStageBinding.objects.create( 292 target=flow, 293 stage=DummyStage.objects.create(name=generate_id()), 294 order=1, 295 re_evaluate_policies=True, 296 ) 297 298 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 299 300 # Here we patch the dummy policy to evaluate to true so the stage is included 301 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 302 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 303 # First request, run the planner 304 response = self.client.get(exec_url) 305 self.assertEqual(response.status_code, 200) 306 307 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 308 309 self.assertEqual(plan.bindings[0], binding) 310 self.assertEqual(plan.bindings[1], binding2) 311 312 self.assertEqual(plan.markers[0].__class__, StageMarker) 313 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 314 315 # Second request, this passes the first dummy stage 316 response = self.client.post(exec_url) 317 self.assertEqual(response.status_code, 302) 318 319 # third request, this should trigger the re-evaluate 320 # We do this request without the patch, so the policy results in false 321 response = self.client.post(exec_url) 322 self.assertEqual(response.status_code, 302) 323 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 324 325 def test_reevaluate_remove_middle(self): 326 """Test planner with re-evaluate (middle stage is removed)""" 327 flow = create_test_flow( 328 FlowDesignation.AUTHENTICATION, 329 ) 330 false_policy = DummyPolicy.objects.create( 331 name=generate_id(), result=False, wait_min=1, wait_max=2 332 ) 333 334 binding = FlowStageBinding.objects.create( 335 target=flow, 336 stage=DummyStage.objects.create(name=generate_id()), 337 order=0, 338 evaluate_on_plan=True, 339 re_evaluate_policies=False, 340 ) 341 binding2 = FlowStageBinding.objects.create( 342 target=flow, 343 stage=DummyStage.objects.create(name=generate_id()), 344 order=1, 345 re_evaluate_policies=True, 346 ) 347 binding3 = FlowStageBinding.objects.create( 348 target=flow, 349 stage=DummyStage.objects.create(name=generate_id()), 350 order=2, 351 evaluate_on_plan=True, 352 re_evaluate_policies=False, 353 ) 354 355 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 356 357 # Here we patch the dummy policy to evaluate to true so the stage is included 358 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 359 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 360 # First request, run the planner 361 response = self.client.get(exec_url) 362 363 self.assertEqual(response.status_code, 200) 364 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 365 366 self.assertEqual(plan.bindings[0], binding) 367 self.assertEqual(plan.bindings[1], binding2) 368 self.assertEqual(plan.bindings[2], binding3) 369 370 self.assertEqual(plan.markers[0].__class__, StageMarker) 371 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 372 self.assertEqual(plan.markers[2].__class__, StageMarker) 373 374 # Second request, this passes the first dummy stage 375 response = self.client.post(exec_url) 376 self.assertEqual(response.status_code, 302) 377 378 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 379 380 self.assertEqual(plan.bindings[0], binding2) 381 self.assertEqual(plan.bindings[1], binding3) 382 383 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 384 self.assertEqual(plan.markers[1].__class__, StageMarker) 385 386 # third request, this should trigger the re-evaluate 387 # We do this request without the patch, so the policy results in false 388 response = self.client.post(exec_url) 389 self.assertEqual(response.status_code, 200) 390 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 391 392 def test_reevaluate_keep(self): 393 """Test planner with re-evaluate (everything is kept)""" 394 flow = create_test_flow( 395 FlowDesignation.AUTHENTICATION, 396 ) 397 true_policy = DummyPolicy.objects.create( 398 name=generate_id(), result=True, wait_min=1, wait_max=2 399 ) 400 401 binding = FlowStageBinding.objects.create( 402 target=flow, 403 stage=DummyStage.objects.create(name=generate_id()), 404 order=0, 405 evaluate_on_plan=True, 406 re_evaluate_policies=False, 407 ) 408 binding2 = FlowStageBinding.objects.create( 409 target=flow, 410 stage=DummyStage.objects.create(name=generate_id()), 411 order=1, 412 re_evaluate_policies=True, 413 ) 414 binding3 = FlowStageBinding.objects.create( 415 target=flow, 416 stage=DummyStage.objects.create(name=generate_id()), 417 order=2, 418 evaluate_on_plan=True, 419 re_evaluate_policies=False, 420 ) 421 422 PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0) 423 424 # Here we patch the dummy policy to evaluate to true so the stage is included 425 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 426 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 427 # First request, run the planner 428 response = self.client.get(exec_url) 429 430 self.assertEqual(response.status_code, 200) 431 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 432 433 self.assertEqual(plan.bindings[0], binding) 434 self.assertEqual(plan.bindings[1], binding2) 435 self.assertEqual(plan.bindings[2], binding3) 436 437 self.assertEqual(plan.markers[0].__class__, StageMarker) 438 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 439 self.assertEqual(plan.markers[2].__class__, StageMarker) 440 441 # Second request, this passes the first dummy stage 442 response = self.client.post(exec_url) 443 self.assertEqual(response.status_code, 302) 444 445 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 446 447 self.assertEqual(plan.bindings[0], binding2) 448 self.assertEqual(plan.bindings[1], binding3) 449 450 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 451 self.assertEqual(plan.markers[1].__class__, StageMarker) 452 453 # Third request, this passes the first dummy stage 454 response = self.client.post(exec_url) 455 self.assertEqual(response.status_code, 302) 456 457 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 458 459 self.assertEqual(plan.bindings[0], binding3) 460 461 self.assertEqual(plan.markers[0].__class__, StageMarker) 462 463 # third request, this should trigger the re-evaluate 464 # We do this request without the patch, so the policy results in false 465 response = self.client.post(exec_url) 466 self.assertEqual(response.status_code, 200) 467 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 468 469 def test_reevaluate_remove_consecutive(self): 470 """Test planner with re-evaluate (consecutive stages are removed)""" 471 flow = create_test_flow( 472 FlowDesignation.AUTHENTICATION, 473 ) 474 false_policy = DummyPolicy.objects.create( 475 name=generate_id(), result=False, wait_min=1, wait_max=2 476 ) 477 478 binding = FlowStageBinding.objects.create( 479 target=flow, 480 stage=DummyStage.objects.create(name=generate_id()), 481 order=0, 482 evaluate_on_plan=True, 483 re_evaluate_policies=False, 484 ) 485 binding2 = FlowStageBinding.objects.create( 486 target=flow, 487 stage=DummyStage.objects.create(name=generate_id()), 488 order=1, 489 re_evaluate_policies=True, 490 ) 491 binding3 = FlowStageBinding.objects.create( 492 target=flow, 493 stage=DummyStage.objects.create(name=generate_id()), 494 order=2, 495 re_evaluate_policies=True, 496 ) 497 binding4 = FlowStageBinding.objects.create( 498 target=flow, 499 stage=DummyStage.objects.create(name=generate_id()), 500 order=2, 501 evaluate_on_plan=True, 502 re_evaluate_policies=False, 503 ) 504 505 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 506 PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0) 507 508 # Here we patch the dummy policy to evaluate to true so the stage is included 509 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 510 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 511 # First request, run the planner 512 response = self.client.get(exec_url) 513 self.assertEqual(response.status_code, 200) 514 self.assertStageResponse(response, flow, component="ak-stage-dummy") 515 516 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 517 518 self.assertEqual(plan.bindings[0], binding) 519 self.assertEqual(plan.bindings[1], binding2) 520 self.assertEqual(plan.bindings[2], binding3) 521 self.assertEqual(plan.bindings[3], binding4) 522 523 self.assertEqual(plan.markers[0].__class__, StageMarker) 524 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 525 self.assertEqual(plan.markers[2].__class__, ReevaluateMarker) 526 self.assertEqual(plan.markers[3].__class__, StageMarker) 527 528 # Second request, this passes the first dummy stage 529 response = self.client.post(exec_url) 530 self.assertEqual(response.status_code, 302) 531 532 # third request, this should trigger the re-evaluate 533 # A get request will evaluate the policies and this will return stage 4 534 # but it won't save it, hence we can't check the plan 535 response = self.client.get(exec_url) 536 self.assertStageResponse(response, flow, component="ak-stage-dummy") 537 538 # fourth request, this confirms the last stage (dummy4) 539 # We do this request without the patch, so the policy results in false 540 response = self.client.post(exec_url) 541 self.assertEqual(response.status_code, 200) 542 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 543 544 def test_stageview_user_identifier(self): 545 """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER""" 546 flow = create_test_flow( 547 FlowDesignation.AUTHENTICATION, 548 ) 549 FlowStageBinding.objects.create( 550 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 551 ) 552 553 ident = "test-identifier" 554 555 user = User.objects.create(username="test-user") 556 request = self.request_factory.get( 557 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 558 ) 559 request.user = user 560 planner = FlowPlanner(flow) 561 plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident}) 562 563 executor = FlowExecutorView() 564 executor.plan = plan 565 executor.flow = flow 566 567 stage_view = StageView(executor) 568 self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username) 569 570 def test_invalid_restart(self): 571 """Test flow that restarts on invalid entry""" 572 flow = create_test_flow( 573 FlowDesignation.AUTHENTICATION, 574 ) 575 # Stage 0 is a deny stage that is added dynamically 576 # when the reputation policy says so 577 deny_stage = DenyStage.objects.create(name=generate_id()) 578 reputation_policy = ReputationPolicy.objects.create( 579 name=generate_id(), threshold=-1, check_ip=False 580 ) 581 deny_binding = FlowStageBinding.objects.create( 582 target=flow, 583 stage=deny_stage, 584 order=0, 585 evaluate_on_plan=False, 586 re_evaluate_policies=True, 587 ) 588 PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0) 589 590 # Stage 1 is an identification stage 591 ident_stage = IdentificationStage.objects.create( 592 name=generate_id(), 593 user_fields=[UserFields.E_MAIL], 594 pretend_user_exists=False, 595 ) 596 FlowStageBinding.objects.create( 597 target=flow, 598 stage=ident_stage, 599 order=1, 600 invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT, 601 ) 602 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 603 # First request, run the planner 604 response = self.client.get(exec_url) 605 self.assertStageResponse( 606 response, 607 flow, 608 component="ak-stage-identification", 609 password_fields=False, 610 primary_action="Log in", 611 sources=[], 612 show_source_labels=False, 613 user_fields=[UserFields.E_MAIL], 614 ) 615 response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True) 616 self.assertStageResponse(response, flow, component="ak-stage-access-denied") 617 618 def test_re_evaluate_group_binding(self): 619 """Test re-evaluate stage binding that has a policy binding to a group""" 620 flow = create_test_flow() 621 622 user_group_membership = create_test_user() 623 user_direct_binding = create_test_user() 624 user_other = create_test_user() 625 626 group_a = Group.objects.create(name=generate_id()) 627 user_group_membership.groups.add(group_a) 628 629 # Stage 0 is an identification stage 630 ident_stage = IdentificationStage.objects.create( 631 name=generate_id(), 632 user_fields=[UserFields.USERNAME], 633 pretend_user_exists=False, 634 ) 635 FlowStageBinding.objects.create( 636 target=flow, 637 stage=ident_stage, 638 order=0, 639 ) 640 641 # Stage 1 is a dummy stage that is only shown for users in group_a 642 dummy_stage = DummyStage.objects.create(name=generate_id()) 643 dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1) 644 PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0) 645 PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0) 646 647 # Stage 2 is a deny stage that (in this case) only user_b will see 648 deny_stage = DenyStage.objects.create(name=generate_id()) 649 FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2) 650 651 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 652 653 with self.subTest(f"Test user access through group: {user_group_membership}"): 654 self.client.logout() 655 # First request, run the planner 656 response = self.client.get(exec_url) 657 self.assertStageResponse(response, flow, component="ak-stage-identification") 658 response = self.client.post( 659 exec_url, {"uid_field": user_group_membership.username}, follow=True 660 ) 661 self.assertStageResponse(response, flow, component="ak-stage-dummy") 662 with self.subTest(f"Test user access through user: {user_direct_binding}"): 663 self.client.logout() 664 # First request, run the planner 665 response = self.client.get(exec_url) 666 self.assertStageResponse(response, flow, component="ak-stage-identification") 667 response = self.client.post( 668 exec_url, {"uid_field": user_direct_binding.username}, follow=True 669 ) 670 self.assertStageResponse(response, flow, component="ak-stage-dummy") 671 with self.subTest(f"Test user has no access: {user_other}"): 672 self.client.logout() 673 # First request, run the planner 674 response = self.client.get(exec_url) 675 self.assertStageResponse(response, flow, component="ak-stage-identification") 676 response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True) 677 self.assertStageResponse(response, flow, component="ak-stage-access-denied") 678 679 @patch( 680 "authentik.flows.views.executor.to_stage_response", 681 TO_STAGE_RESPONSE_MOCK, 682 ) 683 def test_invalid_json(self): 684 """Test invalid JSON body""" 685 flow = create_test_flow() 686 FlowStageBinding.objects.create( 687 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 688 ) 689 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 690 691 with override_settings(TEST=False, DEBUG=False): 692 self.client.logout() 693 response = self.client.post(url, data="{", content_type="application/json") 694 self.assertEqual(response.status_code, 200) 695 696 with self.assertRaises(ParseError): 697 self.client.logout() 698 response = self.client.post(url, data="{", content_type="application/json") 699 self.assertEqual(response.status_code, 200) 700 701 def test_cancel_next(self): 702 """Test cancel URL with ?next param set""" 703 flow = create_test_flow() 704 705 # Stage 0 is an identification stage 706 ident_stage = IdentificationStage.objects.create( 707 name=generate_id(), 708 user_fields=[UserFields.USERNAME], 709 ) 710 FlowStageBinding.objects.create( 711 target=flow, 712 stage=ident_stage, 713 order=0, 714 ) 715 716 # Stage 1 is a password stage 717 password_stage = PasswordStage.objects.create(name=generate_id(), backends=[]) 718 FlowStageBinding.objects.create( 719 target=flow, 720 stage=password_stage, 721 order=1, 722 ) 723 res = self.client.get( 724 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 725 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}" 726 ) 727 self.assertStageResponse(res, flow, component="ak-stage-identification") 728 729 res = self.client.post( 730 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 731 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}", 732 data={"component": "ak-stage-identification", "uid_field": generate_id()}, 733 follow=True, 734 ) 735 self.assertEqual(res.status_code, 200) 736 self.assertStageResponse( 737 res, 738 flow, 739 flow_info={ 740 "background": "/static/dist/assets/images/flow_background.jpg", 741 "background_themed_urls": None, 742 "cancel_url": "/flows/-/cancel/?next=%2Ffoo", 743 "layout": "stacked", 744 "title": flow.title, 745 }, 746 ) 747 748 @patch( 749 "authentik.flows.views.executor.to_stage_response", 750 TO_STAGE_RESPONSE_MOCK, 751 ) 752 def test_expired_flow_token(self): 753 """Test that an expired flow token shows an appropriate error message""" 754 flow = create_test_flow( 755 FlowDesignation.RECOVERY, 756 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 757 ) 758 user = create_test_user() 759 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[], markers=[]) 760 761 token = FlowToken.objects.create( 762 user=user, 763 identifier=generate_id(), 764 flow=flow, 765 _plan=FlowToken.pickle(plan), 766 expires=now() - timedelta(hours=1), 767 ) 768 769 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 770 response = self.client.get( 771 url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: token.key})})}" 772 ) 773 self.assertStageResponse( 774 response, 775 flow, 776 component="ak-stage-access-denied", 777 error_message="This link is invalid or has expired. Please request a new one.", 778 ) 779 780 @patch( 781 "authentik.flows.views.executor.to_stage_response", 782 TO_STAGE_RESPONSE_MOCK, 783 ) 784 def test_invalid_flow_token_require_token(self): 785 """Test that an invalid/nonexistent token on a REQUIRE_TOKEN flow shows error""" 786 flow = create_test_flow( 787 FlowDesignation.RECOVERY, 788 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 789 ) 790 791 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 792 response = self.client.get( 793 url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: 'invalid-token'})})}" 794 ) 795 self.assertStageResponse( 796 response, 797 flow, 798 component="ak-stage-access-denied", 799 error_message="This link is invalid or has expired. Please request a new one.", 800 ) 801 802 @patch( 803 "authentik.flows.views.executor.to_stage_response", 804 TO_STAGE_RESPONSE_MOCK, 805 ) 806 def test_no_token_require_token(self): 807 """Test that accessing a REQUIRE_TOKEN flow without any token shows error""" 808 flow = create_test_flow( 809 FlowDesignation.RECOVERY, 810 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 811 ) 812 813 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 814 response = self.client.get(url) 815 self.assertStageResponse( 816 response, 817 flow, 818 component="ak-stage-access-denied", 819 error_message="This link is invalid or has expired. Please request a new one.", 820 )
A mock intended to be used as a property, or other descriptor, on a class.
PropertyMock provides __get__ and __set__ methods so you can specify
a return value when it is fetched.
Fetching a PropertyMock instance from an object calls the mock, with
no args. Setting it calls the mock with the value being set.
50def to_stage_response(request: HttpRequest, source: HttpResponse): 51 """Mock for to_stage_response that returns the original response, so we can check 52 inheritance and member attributes""" 53 return source
Mock for to_stage_response that returns the original response, so we can check inheritance and member attributes
59class TestFlowExecutor(FlowTestCase): 60 """Test executor""" 61 62 def setUp(self): 63 self.request_factory = RequestFactory() 64 65 @patch( 66 "authentik.flows.views.executor.to_stage_response", 67 TO_STAGE_RESPONSE_MOCK, 68 ) 69 def test_existing_plan_diff_flow(self): 70 """Check that a plan for a different flow cancels the current plan""" 71 flow = create_test_flow( 72 FlowDesignation.AUTHENTICATION, 73 ) 74 stage = DummyStage.objects.create(name=generate_id()) 75 binding = FlowStageBinding(target=flow, stage=stage, order=0) 76 plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()]) 77 session = self.client.session 78 session[SESSION_KEY_PLAN] = plan 79 session.save() 80 81 cancel_mock = MagicMock() 82 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock): 83 response = self.client.get( 84 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 85 ) 86 self.assertEqual(response.status_code, 302) 87 self.assertEqual(cancel_mock.call_count, 2) 88 89 @patch( 90 "authentik.flows.views.executor.to_stage_response", 91 TO_STAGE_RESPONSE_MOCK, 92 ) 93 @patch( 94 "authentik.policies.engine.PolicyEngine.result", 95 POLICY_RETURN_FALSE, 96 ) 97 def test_invalid_non_applicable_flow(self): 98 """Tests that a non-applicable flow returns the correct error message""" 99 flow = create_test_flow( 100 FlowDesignation.AUTHENTICATION, 101 ) 102 103 response = self.client.get( 104 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 105 ) 106 self.assertStageResponse( 107 response, 108 flow=flow, 109 error_message="foo", 110 component="ak-stage-access-denied", 111 ) 112 113 @patch( 114 "authentik.flows.views.executor.to_stage_response", 115 TO_STAGE_RESPONSE_MOCK, 116 ) 117 @patch( 118 "authentik.policies.engine.PolicyEngine.result", 119 POLICY_RETURN_FALSE, 120 ) 121 def test_invalid_non_applicable_flow_continue(self): 122 """Tests that a non-applicable flow that should redirect""" 123 flow = create_test_flow( 124 FlowDesignation.AUTHENTICATION, 125 denied_action=FlowDeniedAction.CONTINUE, 126 ) 127 128 response = self.client.get( 129 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 130 ) 131 self.assertEqual(response.status_code, 302) 132 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 133 134 @patch( 135 "authentik.flows.views.executor.to_stage_response", 136 TO_STAGE_RESPONSE_MOCK, 137 ) 138 def test_invalid_flow_redirect(self): 139 """Test invalid flow with valid redirect destination""" 140 flow = create_test_flow( 141 FlowDesignation.AUTHENTICATION, 142 ) 143 144 dest = "/unique-string" 145 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 146 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 147 self.assertEqual(response.status_code, 302) 148 self.assertEqual(response.url, "/unique-string") 149 150 @patch( 151 "authentik.flows.views.executor.to_stage_response", 152 TO_STAGE_RESPONSE_MOCK, 153 ) 154 def test_invalid_flow_invalid_redirect(self): 155 """Test invalid flow redirect with an invalid URL""" 156 flow = create_test_flow( 157 FlowDesignation.AUTHENTICATION, 158 ) 159 160 dest = "http://something.example.com/unique-string" 161 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 162 163 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 164 self.assertEqual(response.status_code, 200) 165 self.assertStageResponse( 166 response, 167 flow, 168 component="ak-stage-access-denied", 169 error_message="Invalid next URL", 170 ) 171 172 @patch( 173 "authentik.flows.views.executor.to_stage_response", 174 TO_STAGE_RESPONSE_MOCK, 175 ) 176 def test_valid_flow_redirect(self): 177 """Test valid flow with valid redirect destination""" 178 flow = create_test_flow() 179 180 dest = "/unique-string" 181 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 182 183 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 184 self.assertEqual(response.status_code, 302) 185 self.assertEqual(response.url, "/unique-string") 186 187 @patch( 188 "authentik.flows.views.executor.to_stage_response", 189 TO_STAGE_RESPONSE_MOCK, 190 ) 191 def test_valid_flow_redirect_authenticated(self): 192 """Test valid flow with valid redirect destination, authenticated already""" 193 flow = create_test_flow() 194 flow.designation = FlowDesignation.AUTHENTICATION 195 flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 196 flow.save() 197 self.client.force_login(create_test_user()) 198 199 dest = "/unique-string" 200 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 201 202 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 203 self.assertEqual(response.status_code, 302) 204 self.assertEqual(response.url, "/unique-string") 205 206 @patch( 207 "authentik.flows.views.executor.to_stage_response", 208 TO_STAGE_RESPONSE_MOCK, 209 ) 210 def test_valid_flow_invalid_redirect(self): 211 """Test valid flow redirect with an invalid URL""" 212 flow = create_test_flow() 213 214 dest = "http://something.example.com/unique-string" 215 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 216 217 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 218 self.assertEqual(response.status_code, 200) 219 self.assertStageResponse( 220 response, 221 flow, 222 component="ak-stage-access-denied", 223 error_message="Invalid next URL", 224 ) 225 226 @patch( 227 "authentik.flows.views.executor.to_stage_response", 228 TO_STAGE_RESPONSE_MOCK, 229 ) 230 def test_invalid_empty_flow(self): 231 """Tests that an empty flow returns the correct error message""" 232 flow = create_test_flow( 233 FlowDesignation.AUTHENTICATION, 234 ) 235 236 response = self.client.get( 237 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 238 ) 239 self.assertEqual(response.status_code, 302) 240 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 241 242 def test_multi_stage_flow(self): 243 """Test a full flow with multiple stages""" 244 flow = create_test_flow( 245 FlowDesignation.AUTHENTICATION, 246 ) 247 FlowStageBinding.objects.create( 248 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 249 ) 250 FlowStageBinding.objects.create( 251 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1 252 ) 253 254 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 255 # First Request, start planning, renders form 256 response = self.client.get(exec_url) 257 self.assertEqual(response.status_code, 200) 258 # Check that two stages are in plan 259 session = self.client.session 260 plan: FlowPlan = session[SESSION_KEY_PLAN] 261 self.assertEqual(len(plan.bindings), 2) 262 # Second request, submit form, one stage left 263 response = self.client.post(exec_url) 264 # Second request redirects to the same URL 265 self.assertEqual(response.status_code, 302) 266 self.assertEqual(response.url, exec_url) 267 # Check that two stages are in plan 268 session = self.client.session 269 plan: FlowPlan = session[SESSION_KEY_PLAN] 270 self.assertEqual(len(plan.bindings), 1) 271 272 @patch( 273 "authentik.flows.views.executor.to_stage_response", 274 TO_STAGE_RESPONSE_MOCK, 275 ) 276 def test_reevaluate_remove_last(self): 277 """Test planner with re-evaluate (last stage is removed)""" 278 flow = create_test_flow( 279 FlowDesignation.AUTHENTICATION, 280 ) 281 false_policy = DummyPolicy.objects.create( 282 name=generate_id(), result=False, wait_min=1, wait_max=2 283 ) 284 285 binding = FlowStageBinding.objects.create( 286 target=flow, 287 stage=DummyStage.objects.create(name=generate_id()), 288 order=0, 289 evaluate_on_plan=True, 290 re_evaluate_policies=False, 291 ) 292 binding2 = FlowStageBinding.objects.create( 293 target=flow, 294 stage=DummyStage.objects.create(name=generate_id()), 295 order=1, 296 re_evaluate_policies=True, 297 ) 298 299 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 300 301 # Here we patch the dummy policy to evaluate to true so the stage is included 302 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 303 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 304 # First request, run the planner 305 response = self.client.get(exec_url) 306 self.assertEqual(response.status_code, 200) 307 308 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 309 310 self.assertEqual(plan.bindings[0], binding) 311 self.assertEqual(plan.bindings[1], binding2) 312 313 self.assertEqual(plan.markers[0].__class__, StageMarker) 314 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 315 316 # Second request, this passes the first dummy stage 317 response = self.client.post(exec_url) 318 self.assertEqual(response.status_code, 302) 319 320 # third request, this should trigger the re-evaluate 321 # We do this request without the patch, so the policy results in false 322 response = self.client.post(exec_url) 323 self.assertEqual(response.status_code, 302) 324 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 325 326 def test_reevaluate_remove_middle(self): 327 """Test planner with re-evaluate (middle stage is removed)""" 328 flow = create_test_flow( 329 FlowDesignation.AUTHENTICATION, 330 ) 331 false_policy = DummyPolicy.objects.create( 332 name=generate_id(), result=False, wait_min=1, wait_max=2 333 ) 334 335 binding = FlowStageBinding.objects.create( 336 target=flow, 337 stage=DummyStage.objects.create(name=generate_id()), 338 order=0, 339 evaluate_on_plan=True, 340 re_evaluate_policies=False, 341 ) 342 binding2 = FlowStageBinding.objects.create( 343 target=flow, 344 stage=DummyStage.objects.create(name=generate_id()), 345 order=1, 346 re_evaluate_policies=True, 347 ) 348 binding3 = FlowStageBinding.objects.create( 349 target=flow, 350 stage=DummyStage.objects.create(name=generate_id()), 351 order=2, 352 evaluate_on_plan=True, 353 re_evaluate_policies=False, 354 ) 355 356 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 357 358 # Here we patch the dummy policy to evaluate to true so the stage is included 359 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 360 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 361 # First request, run the planner 362 response = self.client.get(exec_url) 363 364 self.assertEqual(response.status_code, 200) 365 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 366 367 self.assertEqual(plan.bindings[0], binding) 368 self.assertEqual(plan.bindings[1], binding2) 369 self.assertEqual(plan.bindings[2], binding3) 370 371 self.assertEqual(plan.markers[0].__class__, StageMarker) 372 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 373 self.assertEqual(plan.markers[2].__class__, StageMarker) 374 375 # Second request, this passes the first dummy stage 376 response = self.client.post(exec_url) 377 self.assertEqual(response.status_code, 302) 378 379 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 380 381 self.assertEqual(plan.bindings[0], binding2) 382 self.assertEqual(plan.bindings[1], binding3) 383 384 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 385 self.assertEqual(plan.markers[1].__class__, StageMarker) 386 387 # third request, this should trigger the re-evaluate 388 # We do this request without the patch, so the policy results in false 389 response = self.client.post(exec_url) 390 self.assertEqual(response.status_code, 200) 391 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 392 393 def test_reevaluate_keep(self): 394 """Test planner with re-evaluate (everything is kept)""" 395 flow = create_test_flow( 396 FlowDesignation.AUTHENTICATION, 397 ) 398 true_policy = DummyPolicy.objects.create( 399 name=generate_id(), result=True, wait_min=1, wait_max=2 400 ) 401 402 binding = FlowStageBinding.objects.create( 403 target=flow, 404 stage=DummyStage.objects.create(name=generate_id()), 405 order=0, 406 evaluate_on_plan=True, 407 re_evaluate_policies=False, 408 ) 409 binding2 = FlowStageBinding.objects.create( 410 target=flow, 411 stage=DummyStage.objects.create(name=generate_id()), 412 order=1, 413 re_evaluate_policies=True, 414 ) 415 binding3 = FlowStageBinding.objects.create( 416 target=flow, 417 stage=DummyStage.objects.create(name=generate_id()), 418 order=2, 419 evaluate_on_plan=True, 420 re_evaluate_policies=False, 421 ) 422 423 PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0) 424 425 # Here we patch the dummy policy to evaluate to true so the stage is included 426 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 427 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 428 # First request, run the planner 429 response = self.client.get(exec_url) 430 431 self.assertEqual(response.status_code, 200) 432 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 433 434 self.assertEqual(plan.bindings[0], binding) 435 self.assertEqual(plan.bindings[1], binding2) 436 self.assertEqual(plan.bindings[2], binding3) 437 438 self.assertEqual(plan.markers[0].__class__, StageMarker) 439 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 440 self.assertEqual(plan.markers[2].__class__, StageMarker) 441 442 # Second request, this passes the first dummy stage 443 response = self.client.post(exec_url) 444 self.assertEqual(response.status_code, 302) 445 446 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 447 448 self.assertEqual(plan.bindings[0], binding2) 449 self.assertEqual(plan.bindings[1], binding3) 450 451 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 452 self.assertEqual(plan.markers[1].__class__, StageMarker) 453 454 # Third request, this passes the first dummy stage 455 response = self.client.post(exec_url) 456 self.assertEqual(response.status_code, 302) 457 458 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 459 460 self.assertEqual(plan.bindings[0], binding3) 461 462 self.assertEqual(plan.markers[0].__class__, StageMarker) 463 464 # third request, this should trigger the re-evaluate 465 # We do this request without the patch, so the policy results in false 466 response = self.client.post(exec_url) 467 self.assertEqual(response.status_code, 200) 468 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 469 470 def test_reevaluate_remove_consecutive(self): 471 """Test planner with re-evaluate (consecutive stages are removed)""" 472 flow = create_test_flow( 473 FlowDesignation.AUTHENTICATION, 474 ) 475 false_policy = DummyPolicy.objects.create( 476 name=generate_id(), result=False, wait_min=1, wait_max=2 477 ) 478 479 binding = FlowStageBinding.objects.create( 480 target=flow, 481 stage=DummyStage.objects.create(name=generate_id()), 482 order=0, 483 evaluate_on_plan=True, 484 re_evaluate_policies=False, 485 ) 486 binding2 = FlowStageBinding.objects.create( 487 target=flow, 488 stage=DummyStage.objects.create(name=generate_id()), 489 order=1, 490 re_evaluate_policies=True, 491 ) 492 binding3 = FlowStageBinding.objects.create( 493 target=flow, 494 stage=DummyStage.objects.create(name=generate_id()), 495 order=2, 496 re_evaluate_policies=True, 497 ) 498 binding4 = FlowStageBinding.objects.create( 499 target=flow, 500 stage=DummyStage.objects.create(name=generate_id()), 501 order=2, 502 evaluate_on_plan=True, 503 re_evaluate_policies=False, 504 ) 505 506 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 507 PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0) 508 509 # Here we patch the dummy policy to evaluate to true so the stage is included 510 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 511 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 512 # First request, run the planner 513 response = self.client.get(exec_url) 514 self.assertEqual(response.status_code, 200) 515 self.assertStageResponse(response, flow, component="ak-stage-dummy") 516 517 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 518 519 self.assertEqual(plan.bindings[0], binding) 520 self.assertEqual(plan.bindings[1], binding2) 521 self.assertEqual(plan.bindings[2], binding3) 522 self.assertEqual(plan.bindings[3], binding4) 523 524 self.assertEqual(plan.markers[0].__class__, StageMarker) 525 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 526 self.assertEqual(plan.markers[2].__class__, ReevaluateMarker) 527 self.assertEqual(plan.markers[3].__class__, StageMarker) 528 529 # Second request, this passes the first dummy stage 530 response = self.client.post(exec_url) 531 self.assertEqual(response.status_code, 302) 532 533 # third request, this should trigger the re-evaluate 534 # A get request will evaluate the policies and this will return stage 4 535 # but it won't save it, hence we can't check the plan 536 response = self.client.get(exec_url) 537 self.assertStageResponse(response, flow, component="ak-stage-dummy") 538 539 # fourth request, this confirms the last stage (dummy4) 540 # We do this request without the patch, so the policy results in false 541 response = self.client.post(exec_url) 542 self.assertEqual(response.status_code, 200) 543 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 544 545 def test_stageview_user_identifier(self): 546 """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER""" 547 flow = create_test_flow( 548 FlowDesignation.AUTHENTICATION, 549 ) 550 FlowStageBinding.objects.create( 551 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 552 ) 553 554 ident = "test-identifier" 555 556 user = User.objects.create(username="test-user") 557 request = self.request_factory.get( 558 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 559 ) 560 request.user = user 561 planner = FlowPlanner(flow) 562 plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident}) 563 564 executor = FlowExecutorView() 565 executor.plan = plan 566 executor.flow = flow 567 568 stage_view = StageView(executor) 569 self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username) 570 571 def test_invalid_restart(self): 572 """Test flow that restarts on invalid entry""" 573 flow = create_test_flow( 574 FlowDesignation.AUTHENTICATION, 575 ) 576 # Stage 0 is a deny stage that is added dynamically 577 # when the reputation policy says so 578 deny_stage = DenyStage.objects.create(name=generate_id()) 579 reputation_policy = ReputationPolicy.objects.create( 580 name=generate_id(), threshold=-1, check_ip=False 581 ) 582 deny_binding = FlowStageBinding.objects.create( 583 target=flow, 584 stage=deny_stage, 585 order=0, 586 evaluate_on_plan=False, 587 re_evaluate_policies=True, 588 ) 589 PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0) 590 591 # Stage 1 is an identification stage 592 ident_stage = IdentificationStage.objects.create( 593 name=generate_id(), 594 user_fields=[UserFields.E_MAIL], 595 pretend_user_exists=False, 596 ) 597 FlowStageBinding.objects.create( 598 target=flow, 599 stage=ident_stage, 600 order=1, 601 invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT, 602 ) 603 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 604 # First request, run the planner 605 response = self.client.get(exec_url) 606 self.assertStageResponse( 607 response, 608 flow, 609 component="ak-stage-identification", 610 password_fields=False, 611 primary_action="Log in", 612 sources=[], 613 show_source_labels=False, 614 user_fields=[UserFields.E_MAIL], 615 ) 616 response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True) 617 self.assertStageResponse(response, flow, component="ak-stage-access-denied") 618 619 def test_re_evaluate_group_binding(self): 620 """Test re-evaluate stage binding that has a policy binding to a group""" 621 flow = create_test_flow() 622 623 user_group_membership = create_test_user() 624 user_direct_binding = create_test_user() 625 user_other = create_test_user() 626 627 group_a = Group.objects.create(name=generate_id()) 628 user_group_membership.groups.add(group_a) 629 630 # Stage 0 is an identification stage 631 ident_stage = IdentificationStage.objects.create( 632 name=generate_id(), 633 user_fields=[UserFields.USERNAME], 634 pretend_user_exists=False, 635 ) 636 FlowStageBinding.objects.create( 637 target=flow, 638 stage=ident_stage, 639 order=0, 640 ) 641 642 # Stage 1 is a dummy stage that is only shown for users in group_a 643 dummy_stage = DummyStage.objects.create(name=generate_id()) 644 dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1) 645 PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0) 646 PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0) 647 648 # Stage 2 is a deny stage that (in this case) only user_b will see 649 deny_stage = DenyStage.objects.create(name=generate_id()) 650 FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2) 651 652 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 653 654 with self.subTest(f"Test user access through group: {user_group_membership}"): 655 self.client.logout() 656 # First request, run the planner 657 response = self.client.get(exec_url) 658 self.assertStageResponse(response, flow, component="ak-stage-identification") 659 response = self.client.post( 660 exec_url, {"uid_field": user_group_membership.username}, follow=True 661 ) 662 self.assertStageResponse(response, flow, component="ak-stage-dummy") 663 with self.subTest(f"Test user access through user: {user_direct_binding}"): 664 self.client.logout() 665 # First request, run the planner 666 response = self.client.get(exec_url) 667 self.assertStageResponse(response, flow, component="ak-stage-identification") 668 response = self.client.post( 669 exec_url, {"uid_field": user_direct_binding.username}, follow=True 670 ) 671 self.assertStageResponse(response, flow, component="ak-stage-dummy") 672 with self.subTest(f"Test user has no access: {user_other}"): 673 self.client.logout() 674 # First request, run the planner 675 response = self.client.get(exec_url) 676 self.assertStageResponse(response, flow, component="ak-stage-identification") 677 response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True) 678 self.assertStageResponse(response, flow, component="ak-stage-access-denied") 679 680 @patch( 681 "authentik.flows.views.executor.to_stage_response", 682 TO_STAGE_RESPONSE_MOCK, 683 ) 684 def test_invalid_json(self): 685 """Test invalid JSON body""" 686 flow = create_test_flow() 687 FlowStageBinding.objects.create( 688 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 689 ) 690 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 691 692 with override_settings(TEST=False, DEBUG=False): 693 self.client.logout() 694 response = self.client.post(url, data="{", content_type="application/json") 695 self.assertEqual(response.status_code, 200) 696 697 with self.assertRaises(ParseError): 698 self.client.logout() 699 response = self.client.post(url, data="{", content_type="application/json") 700 self.assertEqual(response.status_code, 200) 701 702 def test_cancel_next(self): 703 """Test cancel URL with ?next param set""" 704 flow = create_test_flow() 705 706 # Stage 0 is an identification stage 707 ident_stage = IdentificationStage.objects.create( 708 name=generate_id(), 709 user_fields=[UserFields.USERNAME], 710 ) 711 FlowStageBinding.objects.create( 712 target=flow, 713 stage=ident_stage, 714 order=0, 715 ) 716 717 # Stage 1 is a password stage 718 password_stage = PasswordStage.objects.create(name=generate_id(), backends=[]) 719 FlowStageBinding.objects.create( 720 target=flow, 721 stage=password_stage, 722 order=1, 723 ) 724 res = self.client.get( 725 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 726 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}" 727 ) 728 self.assertStageResponse(res, flow, component="ak-stage-identification") 729 730 res = self.client.post( 731 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 732 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}", 733 data={"component": "ak-stage-identification", "uid_field": generate_id()}, 734 follow=True, 735 ) 736 self.assertEqual(res.status_code, 200) 737 self.assertStageResponse( 738 res, 739 flow, 740 flow_info={ 741 "background": "/static/dist/assets/images/flow_background.jpg", 742 "background_themed_urls": None, 743 "cancel_url": "/flows/-/cancel/?next=%2Ffoo", 744 "layout": "stacked", 745 "title": flow.title, 746 }, 747 ) 748 749 @patch( 750 "authentik.flows.views.executor.to_stage_response", 751 TO_STAGE_RESPONSE_MOCK, 752 ) 753 def test_expired_flow_token(self): 754 """Test that an expired flow token shows an appropriate error message""" 755 flow = create_test_flow( 756 FlowDesignation.RECOVERY, 757 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 758 ) 759 user = create_test_user() 760 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[], markers=[]) 761 762 token = FlowToken.objects.create( 763 user=user, 764 identifier=generate_id(), 765 flow=flow, 766 _plan=FlowToken.pickle(plan), 767 expires=now() - timedelta(hours=1), 768 ) 769 770 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 771 response = self.client.get( 772 url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: token.key})})}" 773 ) 774 self.assertStageResponse( 775 response, 776 flow, 777 component="ak-stage-access-denied", 778 error_message="This link is invalid or has expired. Please request a new one.", 779 ) 780 781 @patch( 782 "authentik.flows.views.executor.to_stage_response", 783 TO_STAGE_RESPONSE_MOCK, 784 ) 785 def test_invalid_flow_token_require_token(self): 786 """Test that an invalid/nonexistent token on a REQUIRE_TOKEN flow shows error""" 787 flow = create_test_flow( 788 FlowDesignation.RECOVERY, 789 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 790 ) 791 792 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 793 response = self.client.get( 794 url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: 'invalid-token'})})}" 795 ) 796 self.assertStageResponse( 797 response, 798 flow, 799 component="ak-stage-access-denied", 800 error_message="This link is invalid or has expired. Please request a new one.", 801 ) 802 803 @patch( 804 "authentik.flows.views.executor.to_stage_response", 805 TO_STAGE_RESPONSE_MOCK, 806 ) 807 def test_no_token_require_token(self): 808 """Test that accessing a REQUIRE_TOKEN flow without any token shows error""" 809 flow = create_test_flow( 810 FlowDesignation.RECOVERY, 811 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 812 ) 813 814 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 815 response = self.client.get(url) 816 self.assertStageResponse( 817 response, 818 flow, 819 component="ak-stage-access-denied", 820 error_message="This link is invalid or has expired. Please request a new one.", 821 )
Test executor
65 @patch( 66 "authentik.flows.views.executor.to_stage_response", 67 TO_STAGE_RESPONSE_MOCK, 68 ) 69 def test_existing_plan_diff_flow(self): 70 """Check that a plan for a different flow cancels the current plan""" 71 flow = create_test_flow( 72 FlowDesignation.AUTHENTICATION, 73 ) 74 stage = DummyStage.objects.create(name=generate_id()) 75 binding = FlowStageBinding(target=flow, stage=stage, order=0) 76 plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()]) 77 session = self.client.session 78 session[SESSION_KEY_PLAN] = plan 79 session.save() 80 81 cancel_mock = MagicMock() 82 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock): 83 response = self.client.get( 84 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 85 ) 86 self.assertEqual(response.status_code, 302) 87 self.assertEqual(cancel_mock.call_count, 2)
Check that a plan for a different flow cancels the current plan
89 @patch( 90 "authentik.flows.views.executor.to_stage_response", 91 TO_STAGE_RESPONSE_MOCK, 92 ) 93 @patch( 94 "authentik.policies.engine.PolicyEngine.result", 95 POLICY_RETURN_FALSE, 96 ) 97 def test_invalid_non_applicable_flow(self): 98 """Tests that a non-applicable flow returns the correct error message""" 99 flow = create_test_flow( 100 FlowDesignation.AUTHENTICATION, 101 ) 102 103 response = self.client.get( 104 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 105 ) 106 self.assertStageResponse( 107 response, 108 flow=flow, 109 error_message="foo", 110 component="ak-stage-access-denied", 111 )
Tests that a non-applicable flow returns the correct error message
113 @patch( 114 "authentik.flows.views.executor.to_stage_response", 115 TO_STAGE_RESPONSE_MOCK, 116 ) 117 @patch( 118 "authentik.policies.engine.PolicyEngine.result", 119 POLICY_RETURN_FALSE, 120 ) 121 def test_invalid_non_applicable_flow_continue(self): 122 """Tests that a non-applicable flow that should redirect""" 123 flow = create_test_flow( 124 FlowDesignation.AUTHENTICATION, 125 denied_action=FlowDeniedAction.CONTINUE, 126 ) 127 128 response = self.client.get( 129 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 130 ) 131 self.assertEqual(response.status_code, 302) 132 self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
Tests that a non-applicable flow that should redirect
134 @patch( 135 "authentik.flows.views.executor.to_stage_response", 136 TO_STAGE_RESPONSE_MOCK, 137 ) 138 def test_invalid_flow_redirect(self): 139 """Test invalid flow with valid redirect destination""" 140 flow = create_test_flow( 141 FlowDesignation.AUTHENTICATION, 142 ) 143 144 dest = "/unique-string" 145 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 146 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 147 self.assertEqual(response.status_code, 302) 148 self.assertEqual(response.url, "/unique-string")
Test invalid flow with valid redirect destination
150 @patch( 151 "authentik.flows.views.executor.to_stage_response", 152 TO_STAGE_RESPONSE_MOCK, 153 ) 154 def test_invalid_flow_invalid_redirect(self): 155 """Test invalid flow redirect with an invalid URL""" 156 flow = create_test_flow( 157 FlowDesignation.AUTHENTICATION, 158 ) 159 160 dest = "http://something.example.com/unique-string" 161 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 162 163 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 164 self.assertEqual(response.status_code, 200) 165 self.assertStageResponse( 166 response, 167 flow, 168 component="ak-stage-access-denied", 169 error_message="Invalid next URL", 170 )
Test invalid flow redirect with an invalid URL
172 @patch( 173 "authentik.flows.views.executor.to_stage_response", 174 TO_STAGE_RESPONSE_MOCK, 175 ) 176 def test_valid_flow_redirect(self): 177 """Test valid flow with valid redirect destination""" 178 flow = create_test_flow() 179 180 dest = "/unique-string" 181 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 182 183 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 184 self.assertEqual(response.status_code, 302) 185 self.assertEqual(response.url, "/unique-string")
Test valid flow with valid redirect destination
187 @patch( 188 "authentik.flows.views.executor.to_stage_response", 189 TO_STAGE_RESPONSE_MOCK, 190 ) 191 def test_valid_flow_redirect_authenticated(self): 192 """Test valid flow with valid redirect destination, authenticated already""" 193 flow = create_test_flow() 194 flow.designation = FlowDesignation.AUTHENTICATION 195 flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 196 flow.save() 197 self.client.force_login(create_test_user()) 198 199 dest = "/unique-string" 200 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 201 202 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 203 self.assertEqual(response.status_code, 302) 204 self.assertEqual(response.url, "/unique-string")
Test valid flow with valid redirect destination, authenticated already
206 @patch( 207 "authentik.flows.views.executor.to_stage_response", 208 TO_STAGE_RESPONSE_MOCK, 209 ) 210 def test_valid_flow_invalid_redirect(self): 211 """Test valid flow redirect with an invalid URL""" 212 flow = create_test_flow() 213 214 dest = "http://something.example.com/unique-string" 215 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 216 217 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 218 self.assertEqual(response.status_code, 200) 219 self.assertStageResponse( 220 response, 221 flow, 222 component="ak-stage-access-denied", 223 error_message="Invalid next URL", 224 )
Test valid flow redirect with an invalid URL
226 @patch( 227 "authentik.flows.views.executor.to_stage_response", 228 TO_STAGE_RESPONSE_MOCK, 229 ) 230 def test_invalid_empty_flow(self): 231 """Tests that an empty flow returns the correct error message""" 232 flow = create_test_flow( 233 FlowDesignation.AUTHENTICATION, 234 ) 235 236 response = self.client.get( 237 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 238 ) 239 self.assertEqual(response.status_code, 302) 240 self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
Tests that an empty flow returns the correct error message
242 def test_multi_stage_flow(self): 243 """Test a full flow with multiple stages""" 244 flow = create_test_flow( 245 FlowDesignation.AUTHENTICATION, 246 ) 247 FlowStageBinding.objects.create( 248 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 249 ) 250 FlowStageBinding.objects.create( 251 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1 252 ) 253 254 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 255 # First Request, start planning, renders form 256 response = self.client.get(exec_url) 257 self.assertEqual(response.status_code, 200) 258 # Check that two stages are in plan 259 session = self.client.session 260 plan: FlowPlan = session[SESSION_KEY_PLAN] 261 self.assertEqual(len(plan.bindings), 2) 262 # Second request, submit form, one stage left 263 response = self.client.post(exec_url) 264 # Second request redirects to the same URL 265 self.assertEqual(response.status_code, 302) 266 self.assertEqual(response.url, exec_url) 267 # Check that two stages are in plan 268 session = self.client.session 269 plan: FlowPlan = session[SESSION_KEY_PLAN] 270 self.assertEqual(len(plan.bindings), 1)
Test a full flow with multiple stages
272 @patch( 273 "authentik.flows.views.executor.to_stage_response", 274 TO_STAGE_RESPONSE_MOCK, 275 ) 276 def test_reevaluate_remove_last(self): 277 """Test planner with re-evaluate (last stage is removed)""" 278 flow = create_test_flow( 279 FlowDesignation.AUTHENTICATION, 280 ) 281 false_policy = DummyPolicy.objects.create( 282 name=generate_id(), result=False, wait_min=1, wait_max=2 283 ) 284 285 binding = FlowStageBinding.objects.create( 286 target=flow, 287 stage=DummyStage.objects.create(name=generate_id()), 288 order=0, 289 evaluate_on_plan=True, 290 re_evaluate_policies=False, 291 ) 292 binding2 = FlowStageBinding.objects.create( 293 target=flow, 294 stage=DummyStage.objects.create(name=generate_id()), 295 order=1, 296 re_evaluate_policies=True, 297 ) 298 299 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 300 301 # Here we patch the dummy policy to evaluate to true so the stage is included 302 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 303 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 304 # First request, run the planner 305 response = self.client.get(exec_url) 306 self.assertEqual(response.status_code, 200) 307 308 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 309 310 self.assertEqual(plan.bindings[0], binding) 311 self.assertEqual(plan.bindings[1], binding2) 312 313 self.assertEqual(plan.markers[0].__class__, StageMarker) 314 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 315 316 # Second request, this passes the first dummy stage 317 response = self.client.post(exec_url) 318 self.assertEqual(response.status_code, 302) 319 320 # third request, this should trigger the re-evaluate 321 # We do this request without the patch, so the policy results in false 322 response = self.client.post(exec_url) 323 self.assertEqual(response.status_code, 302) 324 self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
Test planner with re-evaluate (last stage is removed)
326 def test_reevaluate_remove_middle(self): 327 """Test planner with re-evaluate (middle stage is removed)""" 328 flow = create_test_flow( 329 FlowDesignation.AUTHENTICATION, 330 ) 331 false_policy = DummyPolicy.objects.create( 332 name=generate_id(), result=False, wait_min=1, wait_max=2 333 ) 334 335 binding = FlowStageBinding.objects.create( 336 target=flow, 337 stage=DummyStage.objects.create(name=generate_id()), 338 order=0, 339 evaluate_on_plan=True, 340 re_evaluate_policies=False, 341 ) 342 binding2 = FlowStageBinding.objects.create( 343 target=flow, 344 stage=DummyStage.objects.create(name=generate_id()), 345 order=1, 346 re_evaluate_policies=True, 347 ) 348 binding3 = FlowStageBinding.objects.create( 349 target=flow, 350 stage=DummyStage.objects.create(name=generate_id()), 351 order=2, 352 evaluate_on_plan=True, 353 re_evaluate_policies=False, 354 ) 355 356 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 357 358 # Here we patch the dummy policy to evaluate to true so the stage is included 359 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 360 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 361 # First request, run the planner 362 response = self.client.get(exec_url) 363 364 self.assertEqual(response.status_code, 200) 365 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 366 367 self.assertEqual(plan.bindings[0], binding) 368 self.assertEqual(plan.bindings[1], binding2) 369 self.assertEqual(plan.bindings[2], binding3) 370 371 self.assertEqual(plan.markers[0].__class__, StageMarker) 372 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 373 self.assertEqual(plan.markers[2].__class__, StageMarker) 374 375 # Second request, this passes the first dummy stage 376 response = self.client.post(exec_url) 377 self.assertEqual(response.status_code, 302) 378 379 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 380 381 self.assertEqual(plan.bindings[0], binding2) 382 self.assertEqual(plan.bindings[1], binding3) 383 384 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 385 self.assertEqual(plan.markers[1].__class__, StageMarker) 386 387 # third request, this should trigger the re-evaluate 388 # We do this request without the patch, so the policy results in false 389 response = self.client.post(exec_url) 390 self.assertEqual(response.status_code, 200) 391 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test planner with re-evaluate (middle stage is removed)
393 def test_reevaluate_keep(self): 394 """Test planner with re-evaluate (everything is kept)""" 395 flow = create_test_flow( 396 FlowDesignation.AUTHENTICATION, 397 ) 398 true_policy = DummyPolicy.objects.create( 399 name=generate_id(), result=True, wait_min=1, wait_max=2 400 ) 401 402 binding = FlowStageBinding.objects.create( 403 target=flow, 404 stage=DummyStage.objects.create(name=generate_id()), 405 order=0, 406 evaluate_on_plan=True, 407 re_evaluate_policies=False, 408 ) 409 binding2 = FlowStageBinding.objects.create( 410 target=flow, 411 stage=DummyStage.objects.create(name=generate_id()), 412 order=1, 413 re_evaluate_policies=True, 414 ) 415 binding3 = FlowStageBinding.objects.create( 416 target=flow, 417 stage=DummyStage.objects.create(name=generate_id()), 418 order=2, 419 evaluate_on_plan=True, 420 re_evaluate_policies=False, 421 ) 422 423 PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0) 424 425 # Here we patch the dummy policy to evaluate to true so the stage is included 426 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 427 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 428 # First request, run the planner 429 response = self.client.get(exec_url) 430 431 self.assertEqual(response.status_code, 200) 432 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 433 434 self.assertEqual(plan.bindings[0], binding) 435 self.assertEqual(plan.bindings[1], binding2) 436 self.assertEqual(plan.bindings[2], binding3) 437 438 self.assertEqual(plan.markers[0].__class__, StageMarker) 439 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 440 self.assertEqual(plan.markers[2].__class__, StageMarker) 441 442 # Second request, this passes the first dummy stage 443 response = self.client.post(exec_url) 444 self.assertEqual(response.status_code, 302) 445 446 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 447 448 self.assertEqual(plan.bindings[0], binding2) 449 self.assertEqual(plan.bindings[1], binding3) 450 451 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 452 self.assertEqual(plan.markers[1].__class__, StageMarker) 453 454 # Third request, this passes the first dummy stage 455 response = self.client.post(exec_url) 456 self.assertEqual(response.status_code, 302) 457 458 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 459 460 self.assertEqual(plan.bindings[0], binding3) 461 462 self.assertEqual(plan.markers[0].__class__, StageMarker) 463 464 # third request, this should trigger the re-evaluate 465 # We do this request without the patch, so the policy results in false 466 response = self.client.post(exec_url) 467 self.assertEqual(response.status_code, 200) 468 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test planner with re-evaluate (everything is kept)
470 def test_reevaluate_remove_consecutive(self): 471 """Test planner with re-evaluate (consecutive stages are removed)""" 472 flow = create_test_flow( 473 FlowDesignation.AUTHENTICATION, 474 ) 475 false_policy = DummyPolicy.objects.create( 476 name=generate_id(), result=False, wait_min=1, wait_max=2 477 ) 478 479 binding = FlowStageBinding.objects.create( 480 target=flow, 481 stage=DummyStage.objects.create(name=generate_id()), 482 order=0, 483 evaluate_on_plan=True, 484 re_evaluate_policies=False, 485 ) 486 binding2 = FlowStageBinding.objects.create( 487 target=flow, 488 stage=DummyStage.objects.create(name=generate_id()), 489 order=1, 490 re_evaluate_policies=True, 491 ) 492 binding3 = FlowStageBinding.objects.create( 493 target=flow, 494 stage=DummyStage.objects.create(name=generate_id()), 495 order=2, 496 re_evaluate_policies=True, 497 ) 498 binding4 = FlowStageBinding.objects.create( 499 target=flow, 500 stage=DummyStage.objects.create(name=generate_id()), 501 order=2, 502 evaluate_on_plan=True, 503 re_evaluate_policies=False, 504 ) 505 506 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 507 PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0) 508 509 # Here we patch the dummy policy to evaluate to true so the stage is included 510 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 511 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 512 # First request, run the planner 513 response = self.client.get(exec_url) 514 self.assertEqual(response.status_code, 200) 515 self.assertStageResponse(response, flow, component="ak-stage-dummy") 516 517 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 518 519 self.assertEqual(plan.bindings[0], binding) 520 self.assertEqual(plan.bindings[1], binding2) 521 self.assertEqual(plan.bindings[2], binding3) 522 self.assertEqual(plan.bindings[3], binding4) 523 524 self.assertEqual(plan.markers[0].__class__, StageMarker) 525 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 526 self.assertEqual(plan.markers[2].__class__, ReevaluateMarker) 527 self.assertEqual(plan.markers[3].__class__, StageMarker) 528 529 # Second request, this passes the first dummy stage 530 response = self.client.post(exec_url) 531 self.assertEqual(response.status_code, 302) 532 533 # third request, this should trigger the re-evaluate 534 # A get request will evaluate the policies and this will return stage 4 535 # but it won't save it, hence we can't check the plan 536 response = self.client.get(exec_url) 537 self.assertStageResponse(response, flow, component="ak-stage-dummy") 538 539 # fourth request, this confirms the last stage (dummy4) 540 # We do this request without the patch, so the policy results in false 541 response = self.client.post(exec_url) 542 self.assertEqual(response.status_code, 200) 543 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test planner with re-evaluate (consecutive stages are removed)
545 def test_stageview_user_identifier(self): 546 """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER""" 547 flow = create_test_flow( 548 FlowDesignation.AUTHENTICATION, 549 ) 550 FlowStageBinding.objects.create( 551 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 552 ) 553 554 ident = "test-identifier" 555 556 user = User.objects.create(username="test-user") 557 request = self.request_factory.get( 558 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 559 ) 560 request.user = user 561 planner = FlowPlanner(flow) 562 plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident}) 563 564 executor = FlowExecutorView() 565 executor.plan = plan 566 executor.flow = flow 567 568 stage_view = StageView(executor) 569 self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username)
Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER
571 def test_invalid_restart(self): 572 """Test flow that restarts on invalid entry""" 573 flow = create_test_flow( 574 FlowDesignation.AUTHENTICATION, 575 ) 576 # Stage 0 is a deny stage that is added dynamically 577 # when the reputation policy says so 578 deny_stage = DenyStage.objects.create(name=generate_id()) 579 reputation_policy = ReputationPolicy.objects.create( 580 name=generate_id(), threshold=-1, check_ip=False 581 ) 582 deny_binding = FlowStageBinding.objects.create( 583 target=flow, 584 stage=deny_stage, 585 order=0, 586 evaluate_on_plan=False, 587 re_evaluate_policies=True, 588 ) 589 PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0) 590 591 # Stage 1 is an identification stage 592 ident_stage = IdentificationStage.objects.create( 593 name=generate_id(), 594 user_fields=[UserFields.E_MAIL], 595 pretend_user_exists=False, 596 ) 597 FlowStageBinding.objects.create( 598 target=flow, 599 stage=ident_stage, 600 order=1, 601 invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT, 602 ) 603 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 604 # First request, run the planner 605 response = self.client.get(exec_url) 606 self.assertStageResponse( 607 response, 608 flow, 609 component="ak-stage-identification", 610 password_fields=False, 611 primary_action="Log in", 612 sources=[], 613 show_source_labels=False, 614 user_fields=[UserFields.E_MAIL], 615 ) 616 response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True) 617 self.assertStageResponse(response, flow, component="ak-stage-access-denied")
Test flow that restarts on invalid entry
619 def test_re_evaluate_group_binding(self): 620 """Test re-evaluate stage binding that has a policy binding to a group""" 621 flow = create_test_flow() 622 623 user_group_membership = create_test_user() 624 user_direct_binding = create_test_user() 625 user_other = create_test_user() 626 627 group_a = Group.objects.create(name=generate_id()) 628 user_group_membership.groups.add(group_a) 629 630 # Stage 0 is an identification stage 631 ident_stage = IdentificationStage.objects.create( 632 name=generate_id(), 633 user_fields=[UserFields.USERNAME], 634 pretend_user_exists=False, 635 ) 636 FlowStageBinding.objects.create( 637 target=flow, 638 stage=ident_stage, 639 order=0, 640 ) 641 642 # Stage 1 is a dummy stage that is only shown for users in group_a 643 dummy_stage = DummyStage.objects.create(name=generate_id()) 644 dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1) 645 PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0) 646 PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0) 647 648 # Stage 2 is a deny stage that (in this case) only user_b will see 649 deny_stage = DenyStage.objects.create(name=generate_id()) 650 FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2) 651 652 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 653 654 with self.subTest(f"Test user access through group: {user_group_membership}"): 655 self.client.logout() 656 # First request, run the planner 657 response = self.client.get(exec_url) 658 self.assertStageResponse(response, flow, component="ak-stage-identification") 659 response = self.client.post( 660 exec_url, {"uid_field": user_group_membership.username}, follow=True 661 ) 662 self.assertStageResponse(response, flow, component="ak-stage-dummy") 663 with self.subTest(f"Test user access through user: {user_direct_binding}"): 664 self.client.logout() 665 # First request, run the planner 666 response = self.client.get(exec_url) 667 self.assertStageResponse(response, flow, component="ak-stage-identification") 668 response = self.client.post( 669 exec_url, {"uid_field": user_direct_binding.username}, follow=True 670 ) 671 self.assertStageResponse(response, flow, component="ak-stage-dummy") 672 with self.subTest(f"Test user has no access: {user_other}"): 673 self.client.logout() 674 # First request, run the planner 675 response = self.client.get(exec_url) 676 self.assertStageResponse(response, flow, component="ak-stage-identification") 677 response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True) 678 self.assertStageResponse(response, flow, component="ak-stage-access-denied")
Test re-evaluate stage binding that has a policy binding to a group
680 @patch( 681 "authentik.flows.views.executor.to_stage_response", 682 TO_STAGE_RESPONSE_MOCK, 683 ) 684 def test_invalid_json(self): 685 """Test invalid JSON body""" 686 flow = create_test_flow() 687 FlowStageBinding.objects.create( 688 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 689 ) 690 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 691 692 with override_settings(TEST=False, DEBUG=False): 693 self.client.logout() 694 response = self.client.post(url, data="{", content_type="application/json") 695 self.assertEqual(response.status_code, 200) 696 697 with self.assertRaises(ParseError): 698 self.client.logout() 699 response = self.client.post(url, data="{", content_type="application/json") 700 self.assertEqual(response.status_code, 200)
Test invalid JSON body
702 def test_cancel_next(self): 703 """Test cancel URL with ?next param set""" 704 flow = create_test_flow() 705 706 # Stage 0 is an identification stage 707 ident_stage = IdentificationStage.objects.create( 708 name=generate_id(), 709 user_fields=[UserFields.USERNAME], 710 ) 711 FlowStageBinding.objects.create( 712 target=flow, 713 stage=ident_stage, 714 order=0, 715 ) 716 717 # Stage 1 is a password stage 718 password_stage = PasswordStage.objects.create(name=generate_id(), backends=[]) 719 FlowStageBinding.objects.create( 720 target=flow, 721 stage=password_stage, 722 order=1, 723 ) 724 res = self.client.get( 725 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 726 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}" 727 ) 728 self.assertStageResponse(res, flow, component="ak-stage-identification") 729 730 res = self.client.post( 731 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 732 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}", 733 data={"component": "ak-stage-identification", "uid_field": generate_id()}, 734 follow=True, 735 ) 736 self.assertEqual(res.status_code, 200) 737 self.assertStageResponse( 738 res, 739 flow, 740 flow_info={ 741 "background": "/static/dist/assets/images/flow_background.jpg", 742 "background_themed_urls": None, 743 "cancel_url": "/flows/-/cancel/?next=%2Ffoo", 744 "layout": "stacked", 745 "title": flow.title, 746 }, 747 )
Test cancel URL with ?next param set
749 @patch( 750 "authentik.flows.views.executor.to_stage_response", 751 TO_STAGE_RESPONSE_MOCK, 752 ) 753 def test_expired_flow_token(self): 754 """Test that an expired flow token shows an appropriate error message""" 755 flow = create_test_flow( 756 FlowDesignation.RECOVERY, 757 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 758 ) 759 user = create_test_user() 760 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[], markers=[]) 761 762 token = FlowToken.objects.create( 763 user=user, 764 identifier=generate_id(), 765 flow=flow, 766 _plan=FlowToken.pickle(plan), 767 expires=now() - timedelta(hours=1), 768 ) 769 770 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 771 response = self.client.get( 772 url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: token.key})})}" 773 ) 774 self.assertStageResponse( 775 response, 776 flow, 777 component="ak-stage-access-denied", 778 error_message="This link is invalid or has expired. Please request a new one.", 779 )
Test that an expired flow token shows an appropriate error message
781 @patch( 782 "authentik.flows.views.executor.to_stage_response", 783 TO_STAGE_RESPONSE_MOCK, 784 ) 785 def test_invalid_flow_token_require_token(self): 786 """Test that an invalid/nonexistent token on a REQUIRE_TOKEN flow shows error""" 787 flow = create_test_flow( 788 FlowDesignation.RECOVERY, 789 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 790 ) 791 792 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 793 response = self.client.get( 794 url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: 'invalid-token'})})}" 795 ) 796 self.assertStageResponse( 797 response, 798 flow, 799 component="ak-stage-access-denied", 800 error_message="This link is invalid or has expired. Please request a new one.", 801 )
Test that an invalid/nonexistent token on a REQUIRE_TOKEN flow shows error
803 @patch( 804 "authentik.flows.views.executor.to_stage_response", 805 TO_STAGE_RESPONSE_MOCK, 806 ) 807 def test_no_token_require_token(self): 808 """Test that accessing a REQUIRE_TOKEN flow without any token shows error""" 809 flow = create_test_flow( 810 FlowDesignation.RECOVERY, 811 authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN, 812 ) 813 814 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 815 response = self.client.get(url) 816 self.assertStageResponse( 817 response, 818 flow, 819 component="ak-stage-access-denied", 820 error_message="This link is invalid or has expired. Please request a new one.", 821 )
Test that accessing a REQUIRE_TOKEN flow without any token shows error