authentik.flows.tests.test_executor
flow views tests
1"""flow views tests""" 2 3from unittest.mock import MagicMock, PropertyMock, patch 4from urllib.parse import urlencode 5 6from django.http import HttpRequest, HttpResponse 7from django.test import override_settings 8from django.test.client import RequestFactory 9from django.urls import reverse 10from rest_framework.exceptions import ParseError 11 12from authentik.core.models import Group, User 13from authentik.core.tests.utils import create_test_flow, create_test_user 14from authentik.flows.markers import ReevaluateMarker, StageMarker 15from authentik.flows.models import ( 16 FlowAuthenticationRequirement, 17 FlowDeniedAction, 18 FlowDesignation, 19 FlowStageBinding, 20 InvalidResponseAction, 21) 22from authentik.flows.planner import FlowPlan, FlowPlanner 23from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView 24from authentik.flows.tests import FlowTestCase 25from authentik.flows.views.executor import ( 26 NEXT_ARG_NAME, 27 QS_QUERY, 28 SESSION_KEY_PLAN, 29 FlowExecutorView, 30) 31from authentik.lib.generators import generate_id 32from authentik.policies.dummy.models import DummyPolicy 33from authentik.policies.models import PolicyBinding 34from authentik.policies.reputation.models import ReputationPolicy 35from authentik.policies.types import PolicyResult 36from authentik.stages.deny.models import DenyStage 37from authentik.stages.dummy.models import DummyStage 38from authentik.stages.identification.models import IdentificationStage, UserFields 39from authentik.stages.password.models import PasswordStage 40 41POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False, "foo")) 42POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True)) 43 44 45def to_stage_response(request: HttpRequest, source: HttpResponse): 46 """Mock for to_stage_response that returns the original response, so we can check 47 inheritance and member attributes""" 48 return source 49 50 51TO_STAGE_RESPONSE_MOCK = MagicMock(side_effect=to_stage_response) 52 53 54class TestFlowExecutor(FlowTestCase): 55 """Test executor""" 56 57 def setUp(self): 58 self.request_factory = RequestFactory() 59 60 @patch( 61 "authentik.flows.views.executor.to_stage_response", 62 TO_STAGE_RESPONSE_MOCK, 63 ) 64 def test_existing_plan_diff_flow(self): 65 """Check that a plan for a different flow cancels the current plan""" 66 flow = create_test_flow( 67 FlowDesignation.AUTHENTICATION, 68 ) 69 stage = DummyStage.objects.create(name=generate_id()) 70 binding = FlowStageBinding(target=flow, stage=stage, order=0) 71 plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()]) 72 session = self.client.session 73 session[SESSION_KEY_PLAN] = plan 74 session.save() 75 76 cancel_mock = MagicMock() 77 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock): 78 response = self.client.get( 79 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 80 ) 81 self.assertEqual(response.status_code, 302) 82 self.assertEqual(cancel_mock.call_count, 2) 83 84 @patch( 85 "authentik.flows.views.executor.to_stage_response", 86 TO_STAGE_RESPONSE_MOCK, 87 ) 88 @patch( 89 "authentik.policies.engine.PolicyEngine.result", 90 POLICY_RETURN_FALSE, 91 ) 92 def test_invalid_non_applicable_flow(self): 93 """Tests that a non-applicable flow returns the correct error message""" 94 flow = create_test_flow( 95 FlowDesignation.AUTHENTICATION, 96 ) 97 98 response = self.client.get( 99 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 100 ) 101 self.assertStageResponse( 102 response, 103 flow=flow, 104 error_message="foo", 105 component="ak-stage-access-denied", 106 ) 107 108 @patch( 109 "authentik.flows.views.executor.to_stage_response", 110 TO_STAGE_RESPONSE_MOCK, 111 ) 112 @patch( 113 "authentik.policies.engine.PolicyEngine.result", 114 POLICY_RETURN_FALSE, 115 ) 116 def test_invalid_non_applicable_flow_continue(self): 117 """Tests that a non-applicable flow that should redirect""" 118 flow = create_test_flow( 119 FlowDesignation.AUTHENTICATION, 120 denied_action=FlowDeniedAction.CONTINUE, 121 ) 122 123 response = self.client.get( 124 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 125 ) 126 self.assertEqual(response.status_code, 302) 127 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 128 129 @patch( 130 "authentik.flows.views.executor.to_stage_response", 131 TO_STAGE_RESPONSE_MOCK, 132 ) 133 def test_invalid_flow_redirect(self): 134 """Test invalid flow with valid redirect destination""" 135 flow = create_test_flow( 136 FlowDesignation.AUTHENTICATION, 137 ) 138 139 dest = "/unique-string" 140 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 141 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 142 self.assertEqual(response.status_code, 302) 143 self.assertEqual(response.url, "/unique-string") 144 145 @patch( 146 "authentik.flows.views.executor.to_stage_response", 147 TO_STAGE_RESPONSE_MOCK, 148 ) 149 def test_invalid_flow_invalid_redirect(self): 150 """Test invalid flow redirect with an invalid URL""" 151 flow = create_test_flow( 152 FlowDesignation.AUTHENTICATION, 153 ) 154 155 dest = "http://something.example.com/unique-string" 156 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 157 158 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 159 self.assertEqual(response.status_code, 200) 160 self.assertStageResponse( 161 response, 162 flow, 163 component="ak-stage-access-denied", 164 error_message="Invalid next URL", 165 ) 166 167 @patch( 168 "authentik.flows.views.executor.to_stage_response", 169 TO_STAGE_RESPONSE_MOCK, 170 ) 171 def test_valid_flow_redirect(self): 172 """Test valid flow with valid redirect destination""" 173 flow = create_test_flow() 174 175 dest = "/unique-string" 176 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 177 178 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 179 self.assertEqual(response.status_code, 302) 180 self.assertEqual(response.url, "/unique-string") 181 182 @patch( 183 "authentik.flows.views.executor.to_stage_response", 184 TO_STAGE_RESPONSE_MOCK, 185 ) 186 def test_valid_flow_redirect_authenticated(self): 187 """Test valid flow with valid redirect destination, authenticated already""" 188 flow = create_test_flow() 189 flow.designation = FlowDesignation.AUTHENTICATION 190 flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 191 flow.save() 192 self.client.force_login(create_test_user()) 193 194 dest = "/unique-string" 195 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 196 197 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 198 self.assertEqual(response.status_code, 302) 199 self.assertEqual(response.url, "/unique-string") 200 201 @patch( 202 "authentik.flows.views.executor.to_stage_response", 203 TO_STAGE_RESPONSE_MOCK, 204 ) 205 def test_valid_flow_invalid_redirect(self): 206 """Test valid flow redirect with an invalid URL""" 207 flow = create_test_flow() 208 209 dest = "http://something.example.com/unique-string" 210 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 211 212 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 213 self.assertEqual(response.status_code, 200) 214 self.assertStageResponse( 215 response, 216 flow, 217 component="ak-stage-access-denied", 218 error_message="Invalid next URL", 219 ) 220 221 @patch( 222 "authentik.flows.views.executor.to_stage_response", 223 TO_STAGE_RESPONSE_MOCK, 224 ) 225 def test_invalid_empty_flow(self): 226 """Tests that an empty flow returns the correct error message""" 227 flow = create_test_flow( 228 FlowDesignation.AUTHENTICATION, 229 ) 230 231 response = self.client.get( 232 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 233 ) 234 self.assertEqual(response.status_code, 302) 235 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 236 237 def test_multi_stage_flow(self): 238 """Test a full flow with multiple stages""" 239 flow = create_test_flow( 240 FlowDesignation.AUTHENTICATION, 241 ) 242 FlowStageBinding.objects.create( 243 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 244 ) 245 FlowStageBinding.objects.create( 246 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1 247 ) 248 249 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 250 # First Request, start planning, renders form 251 response = self.client.get(exec_url) 252 self.assertEqual(response.status_code, 200) 253 # Check that two stages are in plan 254 session = self.client.session 255 plan: FlowPlan = session[SESSION_KEY_PLAN] 256 self.assertEqual(len(plan.bindings), 2) 257 # Second request, submit form, one stage left 258 response = self.client.post(exec_url) 259 # Second request redirects to the same URL 260 self.assertEqual(response.status_code, 302) 261 self.assertEqual(response.url, exec_url) 262 # Check that two stages are in plan 263 session = self.client.session 264 plan: FlowPlan = session[SESSION_KEY_PLAN] 265 self.assertEqual(len(plan.bindings), 1) 266 267 @patch( 268 "authentik.flows.views.executor.to_stage_response", 269 TO_STAGE_RESPONSE_MOCK, 270 ) 271 def test_reevaluate_remove_last(self): 272 """Test planner with re-evaluate (last stage is removed)""" 273 flow = create_test_flow( 274 FlowDesignation.AUTHENTICATION, 275 ) 276 false_policy = DummyPolicy.objects.create( 277 name=generate_id(), result=False, wait_min=1, wait_max=2 278 ) 279 280 binding = FlowStageBinding.objects.create( 281 target=flow, 282 stage=DummyStage.objects.create(name=generate_id()), 283 order=0, 284 evaluate_on_plan=True, 285 re_evaluate_policies=False, 286 ) 287 binding2 = FlowStageBinding.objects.create( 288 target=flow, 289 stage=DummyStage.objects.create(name=generate_id()), 290 order=1, 291 re_evaluate_policies=True, 292 ) 293 294 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 295 296 # Here we patch the dummy policy to evaluate to true so the stage is included 297 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 298 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 299 # First request, run the planner 300 response = self.client.get(exec_url) 301 self.assertEqual(response.status_code, 200) 302 303 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 304 305 self.assertEqual(plan.bindings[0], binding) 306 self.assertEqual(plan.bindings[1], binding2) 307 308 self.assertEqual(plan.markers[0].__class__, StageMarker) 309 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 310 311 # Second request, this passes the first dummy stage 312 response = self.client.post(exec_url) 313 self.assertEqual(response.status_code, 302) 314 315 # third request, this should trigger the re-evaluate 316 # We do this request without the patch, so the policy results in false 317 response = self.client.post(exec_url) 318 self.assertEqual(response.status_code, 302) 319 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 320 321 def test_reevaluate_remove_middle(self): 322 """Test planner with re-evaluate (middle stage is removed)""" 323 flow = create_test_flow( 324 FlowDesignation.AUTHENTICATION, 325 ) 326 false_policy = DummyPolicy.objects.create( 327 name=generate_id(), result=False, wait_min=1, wait_max=2 328 ) 329 330 binding = FlowStageBinding.objects.create( 331 target=flow, 332 stage=DummyStage.objects.create(name=generate_id()), 333 order=0, 334 evaluate_on_plan=True, 335 re_evaluate_policies=False, 336 ) 337 binding2 = FlowStageBinding.objects.create( 338 target=flow, 339 stage=DummyStage.objects.create(name=generate_id()), 340 order=1, 341 re_evaluate_policies=True, 342 ) 343 binding3 = FlowStageBinding.objects.create( 344 target=flow, 345 stage=DummyStage.objects.create(name=generate_id()), 346 order=2, 347 evaluate_on_plan=True, 348 re_evaluate_policies=False, 349 ) 350 351 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 352 353 # Here we patch the dummy policy to evaluate to true so the stage is included 354 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 355 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 356 # First request, run the planner 357 response = self.client.get(exec_url) 358 359 self.assertEqual(response.status_code, 200) 360 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 361 362 self.assertEqual(plan.bindings[0], binding) 363 self.assertEqual(plan.bindings[1], binding2) 364 self.assertEqual(plan.bindings[2], binding3) 365 366 self.assertEqual(plan.markers[0].__class__, StageMarker) 367 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 368 self.assertEqual(plan.markers[2].__class__, StageMarker) 369 370 # Second request, this passes the first dummy stage 371 response = self.client.post(exec_url) 372 self.assertEqual(response.status_code, 302) 373 374 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 375 376 self.assertEqual(plan.bindings[0], binding2) 377 self.assertEqual(plan.bindings[1], binding3) 378 379 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 380 self.assertEqual(plan.markers[1].__class__, StageMarker) 381 382 # third request, this should trigger the re-evaluate 383 # We do this request without the patch, so the policy results in false 384 response = self.client.post(exec_url) 385 self.assertEqual(response.status_code, 200) 386 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 387 388 def test_reevaluate_keep(self): 389 """Test planner with re-evaluate (everything is kept)""" 390 flow = create_test_flow( 391 FlowDesignation.AUTHENTICATION, 392 ) 393 true_policy = DummyPolicy.objects.create( 394 name=generate_id(), result=True, wait_min=1, wait_max=2 395 ) 396 397 binding = FlowStageBinding.objects.create( 398 target=flow, 399 stage=DummyStage.objects.create(name=generate_id()), 400 order=0, 401 evaluate_on_plan=True, 402 re_evaluate_policies=False, 403 ) 404 binding2 = FlowStageBinding.objects.create( 405 target=flow, 406 stage=DummyStage.objects.create(name=generate_id()), 407 order=1, 408 re_evaluate_policies=True, 409 ) 410 binding3 = FlowStageBinding.objects.create( 411 target=flow, 412 stage=DummyStage.objects.create(name=generate_id()), 413 order=2, 414 evaluate_on_plan=True, 415 re_evaluate_policies=False, 416 ) 417 418 PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0) 419 420 # Here we patch the dummy policy to evaluate to true so the stage is included 421 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 422 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 423 # First request, run the planner 424 response = self.client.get(exec_url) 425 426 self.assertEqual(response.status_code, 200) 427 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 428 429 self.assertEqual(plan.bindings[0], binding) 430 self.assertEqual(plan.bindings[1], binding2) 431 self.assertEqual(plan.bindings[2], binding3) 432 433 self.assertEqual(plan.markers[0].__class__, StageMarker) 434 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 435 self.assertEqual(plan.markers[2].__class__, StageMarker) 436 437 # Second request, this passes the first dummy stage 438 response = self.client.post(exec_url) 439 self.assertEqual(response.status_code, 302) 440 441 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 442 443 self.assertEqual(plan.bindings[0], binding2) 444 self.assertEqual(plan.bindings[1], binding3) 445 446 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 447 self.assertEqual(plan.markers[1].__class__, StageMarker) 448 449 # Third request, this passes the first dummy stage 450 response = self.client.post(exec_url) 451 self.assertEqual(response.status_code, 302) 452 453 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 454 455 self.assertEqual(plan.bindings[0], binding3) 456 457 self.assertEqual(plan.markers[0].__class__, StageMarker) 458 459 # third request, this should trigger the re-evaluate 460 # We do this request without the patch, so the policy results in false 461 response = self.client.post(exec_url) 462 self.assertEqual(response.status_code, 200) 463 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 464 465 def test_reevaluate_remove_consecutive(self): 466 """Test planner with re-evaluate (consecutive stages are removed)""" 467 flow = create_test_flow( 468 FlowDesignation.AUTHENTICATION, 469 ) 470 false_policy = DummyPolicy.objects.create( 471 name=generate_id(), result=False, wait_min=1, wait_max=2 472 ) 473 474 binding = FlowStageBinding.objects.create( 475 target=flow, 476 stage=DummyStage.objects.create(name=generate_id()), 477 order=0, 478 evaluate_on_plan=True, 479 re_evaluate_policies=False, 480 ) 481 binding2 = FlowStageBinding.objects.create( 482 target=flow, 483 stage=DummyStage.objects.create(name=generate_id()), 484 order=1, 485 re_evaluate_policies=True, 486 ) 487 binding3 = FlowStageBinding.objects.create( 488 target=flow, 489 stage=DummyStage.objects.create(name=generate_id()), 490 order=2, 491 re_evaluate_policies=True, 492 ) 493 binding4 = FlowStageBinding.objects.create( 494 target=flow, 495 stage=DummyStage.objects.create(name=generate_id()), 496 order=2, 497 evaluate_on_plan=True, 498 re_evaluate_policies=False, 499 ) 500 501 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 502 PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0) 503 504 # Here we patch the dummy policy to evaluate to true so the stage is included 505 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 506 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 507 # First request, run the planner 508 response = self.client.get(exec_url) 509 self.assertEqual(response.status_code, 200) 510 self.assertStageResponse(response, flow, component="ak-stage-dummy") 511 512 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 513 514 self.assertEqual(plan.bindings[0], binding) 515 self.assertEqual(plan.bindings[1], binding2) 516 self.assertEqual(plan.bindings[2], binding3) 517 self.assertEqual(plan.bindings[3], binding4) 518 519 self.assertEqual(plan.markers[0].__class__, StageMarker) 520 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 521 self.assertEqual(plan.markers[2].__class__, ReevaluateMarker) 522 self.assertEqual(plan.markers[3].__class__, StageMarker) 523 524 # Second request, this passes the first dummy stage 525 response = self.client.post(exec_url) 526 self.assertEqual(response.status_code, 302) 527 528 # third request, this should trigger the re-evaluate 529 # A get request will evaluate the policies and this will return stage 4 530 # but it won't save it, hence we can't check the plan 531 response = self.client.get(exec_url) 532 self.assertStageResponse(response, flow, component="ak-stage-dummy") 533 534 # fourth request, this confirms the last stage (dummy4) 535 # We do this request without the patch, so the policy results in false 536 response = self.client.post(exec_url) 537 self.assertEqual(response.status_code, 200) 538 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 539 540 def test_stageview_user_identifier(self): 541 """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER""" 542 flow = create_test_flow( 543 FlowDesignation.AUTHENTICATION, 544 ) 545 FlowStageBinding.objects.create( 546 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 547 ) 548 549 ident = "test-identifier" 550 551 user = User.objects.create(username="test-user") 552 request = self.request_factory.get( 553 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 554 ) 555 request.user = user 556 planner = FlowPlanner(flow) 557 plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident}) 558 559 executor = FlowExecutorView() 560 executor.plan = plan 561 executor.flow = flow 562 563 stage_view = StageView(executor) 564 self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username) 565 566 def test_invalid_restart(self): 567 """Test flow that restarts on invalid entry""" 568 flow = create_test_flow( 569 FlowDesignation.AUTHENTICATION, 570 ) 571 # Stage 0 is a deny stage that is added dynamically 572 # when the reputation policy says so 573 deny_stage = DenyStage.objects.create(name=generate_id()) 574 reputation_policy = ReputationPolicy.objects.create( 575 name=generate_id(), threshold=-1, check_ip=False 576 ) 577 deny_binding = FlowStageBinding.objects.create( 578 target=flow, 579 stage=deny_stage, 580 order=0, 581 evaluate_on_plan=False, 582 re_evaluate_policies=True, 583 ) 584 PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0) 585 586 # Stage 1 is an identification stage 587 ident_stage = IdentificationStage.objects.create( 588 name=generate_id(), 589 user_fields=[UserFields.E_MAIL], 590 pretend_user_exists=False, 591 ) 592 FlowStageBinding.objects.create( 593 target=flow, 594 stage=ident_stage, 595 order=1, 596 invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT, 597 ) 598 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 599 # First request, run the planner 600 response = self.client.get(exec_url) 601 self.assertStageResponse( 602 response, 603 flow, 604 component="ak-stage-identification", 605 password_fields=False, 606 primary_action="Log in", 607 sources=[], 608 show_source_labels=False, 609 user_fields=[UserFields.E_MAIL], 610 ) 611 response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True) 612 self.assertStageResponse(response, flow, component="ak-stage-access-denied") 613 614 def test_re_evaluate_group_binding(self): 615 """Test re-evaluate stage binding that has a policy binding to a group""" 616 flow = create_test_flow() 617 618 user_group_membership = create_test_user() 619 user_direct_binding = create_test_user() 620 user_other = create_test_user() 621 622 group_a = Group.objects.create(name=generate_id()) 623 user_group_membership.groups.add(group_a) 624 625 # Stage 0 is an identification stage 626 ident_stage = IdentificationStage.objects.create( 627 name=generate_id(), 628 user_fields=[UserFields.USERNAME], 629 pretend_user_exists=False, 630 ) 631 FlowStageBinding.objects.create( 632 target=flow, 633 stage=ident_stage, 634 order=0, 635 ) 636 637 # Stage 1 is a dummy stage that is only shown for users in group_a 638 dummy_stage = DummyStage.objects.create(name=generate_id()) 639 dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1) 640 PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0) 641 PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0) 642 643 # Stage 2 is a deny stage that (in this case) only user_b will see 644 deny_stage = DenyStage.objects.create(name=generate_id()) 645 FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2) 646 647 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 648 649 with self.subTest(f"Test user access through group: {user_group_membership}"): 650 self.client.logout() 651 # First request, run the planner 652 response = self.client.get(exec_url) 653 self.assertStageResponse(response, flow, component="ak-stage-identification") 654 response = self.client.post( 655 exec_url, {"uid_field": user_group_membership.username}, follow=True 656 ) 657 self.assertStageResponse(response, flow, component="ak-stage-dummy") 658 with self.subTest(f"Test user access through user: {user_direct_binding}"): 659 self.client.logout() 660 # First request, run the planner 661 response = self.client.get(exec_url) 662 self.assertStageResponse(response, flow, component="ak-stage-identification") 663 response = self.client.post( 664 exec_url, {"uid_field": user_direct_binding.username}, follow=True 665 ) 666 self.assertStageResponse(response, flow, component="ak-stage-dummy") 667 with self.subTest(f"Test user has no access: {user_other}"): 668 self.client.logout() 669 # First request, run the planner 670 response = self.client.get(exec_url) 671 self.assertStageResponse(response, flow, component="ak-stage-identification") 672 response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True) 673 self.assertStageResponse(response, flow, component="ak-stage-access-denied") 674 675 @patch( 676 "authentik.flows.views.executor.to_stage_response", 677 TO_STAGE_RESPONSE_MOCK, 678 ) 679 def test_invalid_json(self): 680 """Test invalid JSON body""" 681 flow = create_test_flow() 682 FlowStageBinding.objects.create( 683 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 684 ) 685 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 686 687 with override_settings(TEST=False, DEBUG=False): 688 self.client.logout() 689 response = self.client.post(url, data="{", content_type="application/json") 690 self.assertEqual(response.status_code, 200) 691 692 with self.assertRaises(ParseError): 693 self.client.logout() 694 response = self.client.post(url, data="{", content_type="application/json") 695 self.assertEqual(response.status_code, 200) 696 697 def test_cancel_next(self): 698 """Test cancel URL with ?next param set""" 699 flow = create_test_flow() 700 701 # Stage 0 is an identification stage 702 ident_stage = IdentificationStage.objects.create( 703 name=generate_id(), 704 user_fields=[UserFields.USERNAME], 705 ) 706 FlowStageBinding.objects.create( 707 target=flow, 708 stage=ident_stage, 709 order=0, 710 ) 711 712 # Stage 1 is a password stage 713 password_stage = PasswordStage.objects.create(name=generate_id(), backends=[]) 714 FlowStageBinding.objects.create( 715 target=flow, 716 stage=password_stage, 717 order=1, 718 ) 719 res = self.client.get( 720 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 721 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}" 722 ) 723 self.assertStageResponse(res, flow, component="ak-stage-identification") 724 725 res = self.client.post( 726 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 727 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}", 728 data={"component": "ak-stage-identification", "uid_field": generate_id()}, 729 follow=True, 730 ) 731 self.assertEqual(res.status_code, 200) 732 self.assertStageResponse( 733 res, 734 flow, 735 flow_info={ 736 "background": "/static/dist/assets/images/flow_background.jpg", 737 "background_themed_urls": None, 738 "cancel_url": "/flows/-/cancel/?next=%2Ffoo", 739 "layout": "stacked", 740 "title": flow.title, 741 }, 742 )
A mock intended to be used as a property, or other descriptor, on a class.
PropertyMock provides __get__ and __set__ methods so you can specify
a return value when it is fetched.
Fetching a PropertyMock instance from an object calls the mock, with
no args. Setting it calls the mock with the value being set.
46def to_stage_response(request: HttpRequest, source: HttpResponse): 47 """Mock for to_stage_response that returns the original response, so we can check 48 inheritance and member attributes""" 49 return source
Mock for to_stage_response that returns the original response, so we can check inheritance and member attributes
55class TestFlowExecutor(FlowTestCase): 56 """Test executor""" 57 58 def setUp(self): 59 self.request_factory = RequestFactory() 60 61 @patch( 62 "authentik.flows.views.executor.to_stage_response", 63 TO_STAGE_RESPONSE_MOCK, 64 ) 65 def test_existing_plan_diff_flow(self): 66 """Check that a plan for a different flow cancels the current plan""" 67 flow = create_test_flow( 68 FlowDesignation.AUTHENTICATION, 69 ) 70 stage = DummyStage.objects.create(name=generate_id()) 71 binding = FlowStageBinding(target=flow, stage=stage, order=0) 72 plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()]) 73 session = self.client.session 74 session[SESSION_KEY_PLAN] = plan 75 session.save() 76 77 cancel_mock = MagicMock() 78 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock): 79 response = self.client.get( 80 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 81 ) 82 self.assertEqual(response.status_code, 302) 83 self.assertEqual(cancel_mock.call_count, 2) 84 85 @patch( 86 "authentik.flows.views.executor.to_stage_response", 87 TO_STAGE_RESPONSE_MOCK, 88 ) 89 @patch( 90 "authentik.policies.engine.PolicyEngine.result", 91 POLICY_RETURN_FALSE, 92 ) 93 def test_invalid_non_applicable_flow(self): 94 """Tests that a non-applicable flow returns the correct error message""" 95 flow = create_test_flow( 96 FlowDesignation.AUTHENTICATION, 97 ) 98 99 response = self.client.get( 100 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 101 ) 102 self.assertStageResponse( 103 response, 104 flow=flow, 105 error_message="foo", 106 component="ak-stage-access-denied", 107 ) 108 109 @patch( 110 "authentik.flows.views.executor.to_stage_response", 111 TO_STAGE_RESPONSE_MOCK, 112 ) 113 @patch( 114 "authentik.policies.engine.PolicyEngine.result", 115 POLICY_RETURN_FALSE, 116 ) 117 def test_invalid_non_applicable_flow_continue(self): 118 """Tests that a non-applicable flow that should redirect""" 119 flow = create_test_flow( 120 FlowDesignation.AUTHENTICATION, 121 denied_action=FlowDeniedAction.CONTINUE, 122 ) 123 124 response = self.client.get( 125 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 126 ) 127 self.assertEqual(response.status_code, 302) 128 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 129 130 @patch( 131 "authentik.flows.views.executor.to_stage_response", 132 TO_STAGE_RESPONSE_MOCK, 133 ) 134 def test_invalid_flow_redirect(self): 135 """Test invalid flow with valid redirect destination""" 136 flow = create_test_flow( 137 FlowDesignation.AUTHENTICATION, 138 ) 139 140 dest = "/unique-string" 141 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 142 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 143 self.assertEqual(response.status_code, 302) 144 self.assertEqual(response.url, "/unique-string") 145 146 @patch( 147 "authentik.flows.views.executor.to_stage_response", 148 TO_STAGE_RESPONSE_MOCK, 149 ) 150 def test_invalid_flow_invalid_redirect(self): 151 """Test invalid flow redirect with an invalid URL""" 152 flow = create_test_flow( 153 FlowDesignation.AUTHENTICATION, 154 ) 155 156 dest = "http://something.example.com/unique-string" 157 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 158 159 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 160 self.assertEqual(response.status_code, 200) 161 self.assertStageResponse( 162 response, 163 flow, 164 component="ak-stage-access-denied", 165 error_message="Invalid next URL", 166 ) 167 168 @patch( 169 "authentik.flows.views.executor.to_stage_response", 170 TO_STAGE_RESPONSE_MOCK, 171 ) 172 def test_valid_flow_redirect(self): 173 """Test valid flow with valid redirect destination""" 174 flow = create_test_flow() 175 176 dest = "/unique-string" 177 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 178 179 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 180 self.assertEqual(response.status_code, 302) 181 self.assertEqual(response.url, "/unique-string") 182 183 @patch( 184 "authentik.flows.views.executor.to_stage_response", 185 TO_STAGE_RESPONSE_MOCK, 186 ) 187 def test_valid_flow_redirect_authenticated(self): 188 """Test valid flow with valid redirect destination, authenticated already""" 189 flow = create_test_flow() 190 flow.designation = FlowDesignation.AUTHENTICATION 191 flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 192 flow.save() 193 self.client.force_login(create_test_user()) 194 195 dest = "/unique-string" 196 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 197 198 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 199 self.assertEqual(response.status_code, 302) 200 self.assertEqual(response.url, "/unique-string") 201 202 @patch( 203 "authentik.flows.views.executor.to_stage_response", 204 TO_STAGE_RESPONSE_MOCK, 205 ) 206 def test_valid_flow_invalid_redirect(self): 207 """Test valid flow redirect with an invalid URL""" 208 flow = create_test_flow() 209 210 dest = "http://something.example.com/unique-string" 211 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 212 213 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 214 self.assertEqual(response.status_code, 200) 215 self.assertStageResponse( 216 response, 217 flow, 218 component="ak-stage-access-denied", 219 error_message="Invalid next URL", 220 ) 221 222 @patch( 223 "authentik.flows.views.executor.to_stage_response", 224 TO_STAGE_RESPONSE_MOCK, 225 ) 226 def test_invalid_empty_flow(self): 227 """Tests that an empty flow returns the correct error message""" 228 flow = create_test_flow( 229 FlowDesignation.AUTHENTICATION, 230 ) 231 232 response = self.client.get( 233 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 234 ) 235 self.assertEqual(response.status_code, 302) 236 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 237 238 def test_multi_stage_flow(self): 239 """Test a full flow with multiple stages""" 240 flow = create_test_flow( 241 FlowDesignation.AUTHENTICATION, 242 ) 243 FlowStageBinding.objects.create( 244 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 245 ) 246 FlowStageBinding.objects.create( 247 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1 248 ) 249 250 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 251 # First Request, start planning, renders form 252 response = self.client.get(exec_url) 253 self.assertEqual(response.status_code, 200) 254 # Check that two stages are in plan 255 session = self.client.session 256 plan: FlowPlan = session[SESSION_KEY_PLAN] 257 self.assertEqual(len(plan.bindings), 2) 258 # Second request, submit form, one stage left 259 response = self.client.post(exec_url) 260 # Second request redirects to the same URL 261 self.assertEqual(response.status_code, 302) 262 self.assertEqual(response.url, exec_url) 263 # Check that two stages are in plan 264 session = self.client.session 265 plan: FlowPlan = session[SESSION_KEY_PLAN] 266 self.assertEqual(len(plan.bindings), 1) 267 268 @patch( 269 "authentik.flows.views.executor.to_stage_response", 270 TO_STAGE_RESPONSE_MOCK, 271 ) 272 def test_reevaluate_remove_last(self): 273 """Test planner with re-evaluate (last stage is removed)""" 274 flow = create_test_flow( 275 FlowDesignation.AUTHENTICATION, 276 ) 277 false_policy = DummyPolicy.objects.create( 278 name=generate_id(), result=False, wait_min=1, wait_max=2 279 ) 280 281 binding = FlowStageBinding.objects.create( 282 target=flow, 283 stage=DummyStage.objects.create(name=generate_id()), 284 order=0, 285 evaluate_on_plan=True, 286 re_evaluate_policies=False, 287 ) 288 binding2 = FlowStageBinding.objects.create( 289 target=flow, 290 stage=DummyStage.objects.create(name=generate_id()), 291 order=1, 292 re_evaluate_policies=True, 293 ) 294 295 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 296 297 # Here we patch the dummy policy to evaluate to true so the stage is included 298 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 299 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 300 # First request, run the planner 301 response = self.client.get(exec_url) 302 self.assertEqual(response.status_code, 200) 303 304 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 305 306 self.assertEqual(plan.bindings[0], binding) 307 self.assertEqual(plan.bindings[1], binding2) 308 309 self.assertEqual(plan.markers[0].__class__, StageMarker) 310 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 311 312 # Second request, this passes the first dummy stage 313 response = self.client.post(exec_url) 314 self.assertEqual(response.status_code, 302) 315 316 # third request, this should trigger the re-evaluate 317 # We do this request without the patch, so the policy results in false 318 response = self.client.post(exec_url) 319 self.assertEqual(response.status_code, 302) 320 self.assertEqual(response.url, reverse("authentik_core:root-redirect")) 321 322 def test_reevaluate_remove_middle(self): 323 """Test planner with re-evaluate (middle stage is removed)""" 324 flow = create_test_flow( 325 FlowDesignation.AUTHENTICATION, 326 ) 327 false_policy = DummyPolicy.objects.create( 328 name=generate_id(), result=False, wait_min=1, wait_max=2 329 ) 330 331 binding = FlowStageBinding.objects.create( 332 target=flow, 333 stage=DummyStage.objects.create(name=generate_id()), 334 order=0, 335 evaluate_on_plan=True, 336 re_evaluate_policies=False, 337 ) 338 binding2 = FlowStageBinding.objects.create( 339 target=flow, 340 stage=DummyStage.objects.create(name=generate_id()), 341 order=1, 342 re_evaluate_policies=True, 343 ) 344 binding3 = FlowStageBinding.objects.create( 345 target=flow, 346 stage=DummyStage.objects.create(name=generate_id()), 347 order=2, 348 evaluate_on_plan=True, 349 re_evaluate_policies=False, 350 ) 351 352 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 353 354 # Here we patch the dummy policy to evaluate to true so the stage is included 355 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 356 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 357 # First request, run the planner 358 response = self.client.get(exec_url) 359 360 self.assertEqual(response.status_code, 200) 361 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 362 363 self.assertEqual(plan.bindings[0], binding) 364 self.assertEqual(plan.bindings[1], binding2) 365 self.assertEqual(plan.bindings[2], binding3) 366 367 self.assertEqual(plan.markers[0].__class__, StageMarker) 368 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 369 self.assertEqual(plan.markers[2].__class__, StageMarker) 370 371 # Second request, this passes the first dummy stage 372 response = self.client.post(exec_url) 373 self.assertEqual(response.status_code, 302) 374 375 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 376 377 self.assertEqual(plan.bindings[0], binding2) 378 self.assertEqual(plan.bindings[1], binding3) 379 380 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 381 self.assertEqual(plan.markers[1].__class__, StageMarker) 382 383 # third request, this should trigger the re-evaluate 384 # We do this request without the patch, so the policy results in false 385 response = self.client.post(exec_url) 386 self.assertEqual(response.status_code, 200) 387 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 388 389 def test_reevaluate_keep(self): 390 """Test planner with re-evaluate (everything is kept)""" 391 flow = create_test_flow( 392 FlowDesignation.AUTHENTICATION, 393 ) 394 true_policy = DummyPolicy.objects.create( 395 name=generate_id(), result=True, wait_min=1, wait_max=2 396 ) 397 398 binding = FlowStageBinding.objects.create( 399 target=flow, 400 stage=DummyStage.objects.create(name=generate_id()), 401 order=0, 402 evaluate_on_plan=True, 403 re_evaluate_policies=False, 404 ) 405 binding2 = FlowStageBinding.objects.create( 406 target=flow, 407 stage=DummyStage.objects.create(name=generate_id()), 408 order=1, 409 re_evaluate_policies=True, 410 ) 411 binding3 = FlowStageBinding.objects.create( 412 target=flow, 413 stage=DummyStage.objects.create(name=generate_id()), 414 order=2, 415 evaluate_on_plan=True, 416 re_evaluate_policies=False, 417 ) 418 419 PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0) 420 421 # Here we patch the dummy policy to evaluate to true so the stage is included 422 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 423 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 424 # First request, run the planner 425 response = self.client.get(exec_url) 426 427 self.assertEqual(response.status_code, 200) 428 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 429 430 self.assertEqual(plan.bindings[0], binding) 431 self.assertEqual(plan.bindings[1], binding2) 432 self.assertEqual(plan.bindings[2], binding3) 433 434 self.assertEqual(plan.markers[0].__class__, StageMarker) 435 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 436 self.assertEqual(plan.markers[2].__class__, StageMarker) 437 438 # Second request, this passes the first dummy stage 439 response = self.client.post(exec_url) 440 self.assertEqual(response.status_code, 302) 441 442 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 443 444 self.assertEqual(plan.bindings[0], binding2) 445 self.assertEqual(plan.bindings[1], binding3) 446 447 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 448 self.assertEqual(plan.markers[1].__class__, StageMarker) 449 450 # Third request, this passes the first dummy stage 451 response = self.client.post(exec_url) 452 self.assertEqual(response.status_code, 302) 453 454 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 455 456 self.assertEqual(plan.bindings[0], binding3) 457 458 self.assertEqual(plan.markers[0].__class__, StageMarker) 459 460 # third request, this should trigger the re-evaluate 461 # We do this request without the patch, so the policy results in false 462 response = self.client.post(exec_url) 463 self.assertEqual(response.status_code, 200) 464 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 465 466 def test_reevaluate_remove_consecutive(self): 467 """Test planner with re-evaluate (consecutive stages are removed)""" 468 flow = create_test_flow( 469 FlowDesignation.AUTHENTICATION, 470 ) 471 false_policy = DummyPolicy.objects.create( 472 name=generate_id(), result=False, wait_min=1, wait_max=2 473 ) 474 475 binding = FlowStageBinding.objects.create( 476 target=flow, 477 stage=DummyStage.objects.create(name=generate_id()), 478 order=0, 479 evaluate_on_plan=True, 480 re_evaluate_policies=False, 481 ) 482 binding2 = FlowStageBinding.objects.create( 483 target=flow, 484 stage=DummyStage.objects.create(name=generate_id()), 485 order=1, 486 re_evaluate_policies=True, 487 ) 488 binding3 = FlowStageBinding.objects.create( 489 target=flow, 490 stage=DummyStage.objects.create(name=generate_id()), 491 order=2, 492 re_evaluate_policies=True, 493 ) 494 binding4 = FlowStageBinding.objects.create( 495 target=flow, 496 stage=DummyStage.objects.create(name=generate_id()), 497 order=2, 498 evaluate_on_plan=True, 499 re_evaluate_policies=False, 500 ) 501 502 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 503 PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0) 504 505 # Here we patch the dummy policy to evaluate to true so the stage is included 506 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 507 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 508 # First request, run the planner 509 response = self.client.get(exec_url) 510 self.assertEqual(response.status_code, 200) 511 self.assertStageResponse(response, flow, component="ak-stage-dummy") 512 513 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 514 515 self.assertEqual(plan.bindings[0], binding) 516 self.assertEqual(plan.bindings[1], binding2) 517 self.assertEqual(plan.bindings[2], binding3) 518 self.assertEqual(plan.bindings[3], binding4) 519 520 self.assertEqual(plan.markers[0].__class__, StageMarker) 521 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 522 self.assertEqual(plan.markers[2].__class__, ReevaluateMarker) 523 self.assertEqual(plan.markers[3].__class__, StageMarker) 524 525 # Second request, this passes the first dummy stage 526 response = self.client.post(exec_url) 527 self.assertEqual(response.status_code, 302) 528 529 # third request, this should trigger the re-evaluate 530 # A get request will evaluate the policies and this will return stage 4 531 # but it won't save it, hence we can't check the plan 532 response = self.client.get(exec_url) 533 self.assertStageResponse(response, flow, component="ak-stage-dummy") 534 535 # fourth request, this confirms the last stage (dummy4) 536 # We do this request without the patch, so the policy results in false 537 response = self.client.post(exec_url) 538 self.assertEqual(response.status_code, 200) 539 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 540 541 def test_stageview_user_identifier(self): 542 """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER""" 543 flow = create_test_flow( 544 FlowDesignation.AUTHENTICATION, 545 ) 546 FlowStageBinding.objects.create( 547 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 548 ) 549 550 ident = "test-identifier" 551 552 user = User.objects.create(username="test-user") 553 request = self.request_factory.get( 554 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 555 ) 556 request.user = user 557 planner = FlowPlanner(flow) 558 plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident}) 559 560 executor = FlowExecutorView() 561 executor.plan = plan 562 executor.flow = flow 563 564 stage_view = StageView(executor) 565 self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username) 566 567 def test_invalid_restart(self): 568 """Test flow that restarts on invalid entry""" 569 flow = create_test_flow( 570 FlowDesignation.AUTHENTICATION, 571 ) 572 # Stage 0 is a deny stage that is added dynamically 573 # when the reputation policy says so 574 deny_stage = DenyStage.objects.create(name=generate_id()) 575 reputation_policy = ReputationPolicy.objects.create( 576 name=generate_id(), threshold=-1, check_ip=False 577 ) 578 deny_binding = FlowStageBinding.objects.create( 579 target=flow, 580 stage=deny_stage, 581 order=0, 582 evaluate_on_plan=False, 583 re_evaluate_policies=True, 584 ) 585 PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0) 586 587 # Stage 1 is an identification stage 588 ident_stage = IdentificationStage.objects.create( 589 name=generate_id(), 590 user_fields=[UserFields.E_MAIL], 591 pretend_user_exists=False, 592 ) 593 FlowStageBinding.objects.create( 594 target=flow, 595 stage=ident_stage, 596 order=1, 597 invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT, 598 ) 599 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 600 # First request, run the planner 601 response = self.client.get(exec_url) 602 self.assertStageResponse( 603 response, 604 flow, 605 component="ak-stage-identification", 606 password_fields=False, 607 primary_action="Log in", 608 sources=[], 609 show_source_labels=False, 610 user_fields=[UserFields.E_MAIL], 611 ) 612 response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True) 613 self.assertStageResponse(response, flow, component="ak-stage-access-denied") 614 615 def test_re_evaluate_group_binding(self): 616 """Test re-evaluate stage binding that has a policy binding to a group""" 617 flow = create_test_flow() 618 619 user_group_membership = create_test_user() 620 user_direct_binding = create_test_user() 621 user_other = create_test_user() 622 623 group_a = Group.objects.create(name=generate_id()) 624 user_group_membership.groups.add(group_a) 625 626 # Stage 0 is an identification stage 627 ident_stage = IdentificationStage.objects.create( 628 name=generate_id(), 629 user_fields=[UserFields.USERNAME], 630 pretend_user_exists=False, 631 ) 632 FlowStageBinding.objects.create( 633 target=flow, 634 stage=ident_stage, 635 order=0, 636 ) 637 638 # Stage 1 is a dummy stage that is only shown for users in group_a 639 dummy_stage = DummyStage.objects.create(name=generate_id()) 640 dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1) 641 PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0) 642 PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0) 643 644 # Stage 2 is a deny stage that (in this case) only user_b will see 645 deny_stage = DenyStage.objects.create(name=generate_id()) 646 FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2) 647 648 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 649 650 with self.subTest(f"Test user access through group: {user_group_membership}"): 651 self.client.logout() 652 # First request, run the planner 653 response = self.client.get(exec_url) 654 self.assertStageResponse(response, flow, component="ak-stage-identification") 655 response = self.client.post( 656 exec_url, {"uid_field": user_group_membership.username}, follow=True 657 ) 658 self.assertStageResponse(response, flow, component="ak-stage-dummy") 659 with self.subTest(f"Test user access through user: {user_direct_binding}"): 660 self.client.logout() 661 # First request, run the planner 662 response = self.client.get(exec_url) 663 self.assertStageResponse(response, flow, component="ak-stage-identification") 664 response = self.client.post( 665 exec_url, {"uid_field": user_direct_binding.username}, follow=True 666 ) 667 self.assertStageResponse(response, flow, component="ak-stage-dummy") 668 with self.subTest(f"Test user has no access: {user_other}"): 669 self.client.logout() 670 # First request, run the planner 671 response = self.client.get(exec_url) 672 self.assertStageResponse(response, flow, component="ak-stage-identification") 673 response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True) 674 self.assertStageResponse(response, flow, component="ak-stage-access-denied") 675 676 @patch( 677 "authentik.flows.views.executor.to_stage_response", 678 TO_STAGE_RESPONSE_MOCK, 679 ) 680 def test_invalid_json(self): 681 """Test invalid JSON body""" 682 flow = create_test_flow() 683 FlowStageBinding.objects.create( 684 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 685 ) 686 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 687 688 with override_settings(TEST=False, DEBUG=False): 689 self.client.logout() 690 response = self.client.post(url, data="{", content_type="application/json") 691 self.assertEqual(response.status_code, 200) 692 693 with self.assertRaises(ParseError): 694 self.client.logout() 695 response = self.client.post(url, data="{", content_type="application/json") 696 self.assertEqual(response.status_code, 200) 697 698 def test_cancel_next(self): 699 """Test cancel URL with ?next param set""" 700 flow = create_test_flow() 701 702 # Stage 0 is an identification stage 703 ident_stage = IdentificationStage.objects.create( 704 name=generate_id(), 705 user_fields=[UserFields.USERNAME], 706 ) 707 FlowStageBinding.objects.create( 708 target=flow, 709 stage=ident_stage, 710 order=0, 711 ) 712 713 # Stage 1 is a password stage 714 password_stage = PasswordStage.objects.create(name=generate_id(), backends=[]) 715 FlowStageBinding.objects.create( 716 target=flow, 717 stage=password_stage, 718 order=1, 719 ) 720 res = self.client.get( 721 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 722 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}" 723 ) 724 self.assertStageResponse(res, flow, component="ak-stage-identification") 725 726 res = self.client.post( 727 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 728 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}", 729 data={"component": "ak-stage-identification", "uid_field": generate_id()}, 730 follow=True, 731 ) 732 self.assertEqual(res.status_code, 200) 733 self.assertStageResponse( 734 res, 735 flow, 736 flow_info={ 737 "background": "/static/dist/assets/images/flow_background.jpg", 738 "background_themed_urls": None, 739 "cancel_url": "/flows/-/cancel/?next=%2Ffoo", 740 "layout": "stacked", 741 "title": flow.title, 742 }, 743 )
Test executor
61 @patch( 62 "authentik.flows.views.executor.to_stage_response", 63 TO_STAGE_RESPONSE_MOCK, 64 ) 65 def test_existing_plan_diff_flow(self): 66 """Check that a plan for a different flow cancels the current plan""" 67 flow = create_test_flow( 68 FlowDesignation.AUTHENTICATION, 69 ) 70 stage = DummyStage.objects.create(name=generate_id()) 71 binding = FlowStageBinding(target=flow, stage=stage, order=0) 72 plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()]) 73 session = self.client.session 74 session[SESSION_KEY_PLAN] = plan 75 session.save() 76 77 cancel_mock = MagicMock() 78 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock): 79 response = self.client.get( 80 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 81 ) 82 self.assertEqual(response.status_code, 302) 83 self.assertEqual(cancel_mock.call_count, 2)
Check that a plan for a different flow cancels the current plan
85 @patch( 86 "authentik.flows.views.executor.to_stage_response", 87 TO_STAGE_RESPONSE_MOCK, 88 ) 89 @patch( 90 "authentik.policies.engine.PolicyEngine.result", 91 POLICY_RETURN_FALSE, 92 ) 93 def test_invalid_non_applicable_flow(self): 94 """Tests that a non-applicable flow returns the correct error message""" 95 flow = create_test_flow( 96 FlowDesignation.AUTHENTICATION, 97 ) 98 99 response = self.client.get( 100 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 101 ) 102 self.assertStageResponse( 103 response, 104 flow=flow, 105 error_message="foo", 106 component="ak-stage-access-denied", 107 )
Tests that a non-applicable flow returns the correct error message
109 @patch( 110 "authentik.flows.views.executor.to_stage_response", 111 TO_STAGE_RESPONSE_MOCK, 112 ) 113 @patch( 114 "authentik.policies.engine.PolicyEngine.result", 115 POLICY_RETURN_FALSE, 116 ) 117 def test_invalid_non_applicable_flow_continue(self): 118 """Tests that a non-applicable flow that should redirect""" 119 flow = create_test_flow( 120 FlowDesignation.AUTHENTICATION, 121 denied_action=FlowDeniedAction.CONTINUE, 122 ) 123 124 response = self.client.get( 125 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 126 ) 127 self.assertEqual(response.status_code, 302) 128 self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
Tests that a non-applicable flow that should redirect
130 @patch( 131 "authentik.flows.views.executor.to_stage_response", 132 TO_STAGE_RESPONSE_MOCK, 133 ) 134 def test_invalid_flow_redirect(self): 135 """Test invalid flow with valid redirect destination""" 136 flow = create_test_flow( 137 FlowDesignation.AUTHENTICATION, 138 ) 139 140 dest = "/unique-string" 141 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 142 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 143 self.assertEqual(response.status_code, 302) 144 self.assertEqual(response.url, "/unique-string")
Test invalid flow with valid redirect destination
146 @patch( 147 "authentik.flows.views.executor.to_stage_response", 148 TO_STAGE_RESPONSE_MOCK, 149 ) 150 def test_invalid_flow_invalid_redirect(self): 151 """Test invalid flow redirect with an invalid URL""" 152 flow = create_test_flow( 153 FlowDesignation.AUTHENTICATION, 154 ) 155 156 dest = "http://something.example.com/unique-string" 157 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 158 159 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 160 self.assertEqual(response.status_code, 200) 161 self.assertStageResponse( 162 response, 163 flow, 164 component="ak-stage-access-denied", 165 error_message="Invalid next URL", 166 )
Test invalid flow redirect with an invalid URL
168 @patch( 169 "authentik.flows.views.executor.to_stage_response", 170 TO_STAGE_RESPONSE_MOCK, 171 ) 172 def test_valid_flow_redirect(self): 173 """Test valid flow with valid redirect destination""" 174 flow = create_test_flow() 175 176 dest = "/unique-string" 177 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 178 179 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 180 self.assertEqual(response.status_code, 302) 181 self.assertEqual(response.url, "/unique-string")
Test valid flow with valid redirect destination
183 @patch( 184 "authentik.flows.views.executor.to_stage_response", 185 TO_STAGE_RESPONSE_MOCK, 186 ) 187 def test_valid_flow_redirect_authenticated(self): 188 """Test valid flow with valid redirect destination, authenticated already""" 189 flow = create_test_flow() 190 flow.designation = FlowDesignation.AUTHENTICATION 191 flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 192 flow.save() 193 self.client.force_login(create_test_user()) 194 195 dest = "/unique-string" 196 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 197 198 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 199 self.assertEqual(response.status_code, 302) 200 self.assertEqual(response.url, "/unique-string")
Test valid flow with valid redirect destination, authenticated already
202 @patch( 203 "authentik.flows.views.executor.to_stage_response", 204 TO_STAGE_RESPONSE_MOCK, 205 ) 206 def test_valid_flow_invalid_redirect(self): 207 """Test valid flow redirect with an invalid URL""" 208 flow = create_test_flow() 209 210 dest = "http://something.example.com/unique-string" 211 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 212 213 response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}") 214 self.assertEqual(response.status_code, 200) 215 self.assertStageResponse( 216 response, 217 flow, 218 component="ak-stage-access-denied", 219 error_message="Invalid next URL", 220 )
Test valid flow redirect with an invalid URL
222 @patch( 223 "authentik.flows.views.executor.to_stage_response", 224 TO_STAGE_RESPONSE_MOCK, 225 ) 226 def test_invalid_empty_flow(self): 227 """Tests that an empty flow returns the correct error message""" 228 flow = create_test_flow( 229 FlowDesignation.AUTHENTICATION, 230 ) 231 232 response = self.client.get( 233 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 234 ) 235 self.assertEqual(response.status_code, 302) 236 self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
Tests that an empty flow returns the correct error message
238 def test_multi_stage_flow(self): 239 """Test a full flow with multiple stages""" 240 flow = create_test_flow( 241 FlowDesignation.AUTHENTICATION, 242 ) 243 FlowStageBinding.objects.create( 244 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 245 ) 246 FlowStageBinding.objects.create( 247 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1 248 ) 249 250 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 251 # First Request, start planning, renders form 252 response = self.client.get(exec_url) 253 self.assertEqual(response.status_code, 200) 254 # Check that two stages are in plan 255 session = self.client.session 256 plan: FlowPlan = session[SESSION_KEY_PLAN] 257 self.assertEqual(len(plan.bindings), 2) 258 # Second request, submit form, one stage left 259 response = self.client.post(exec_url) 260 # Second request redirects to the same URL 261 self.assertEqual(response.status_code, 302) 262 self.assertEqual(response.url, exec_url) 263 # Check that two stages are in plan 264 session = self.client.session 265 plan: FlowPlan = session[SESSION_KEY_PLAN] 266 self.assertEqual(len(plan.bindings), 1)
Test a full flow with multiple stages
268 @patch( 269 "authentik.flows.views.executor.to_stage_response", 270 TO_STAGE_RESPONSE_MOCK, 271 ) 272 def test_reevaluate_remove_last(self): 273 """Test planner with re-evaluate (last stage is removed)""" 274 flow = create_test_flow( 275 FlowDesignation.AUTHENTICATION, 276 ) 277 false_policy = DummyPolicy.objects.create( 278 name=generate_id(), result=False, wait_min=1, wait_max=2 279 ) 280 281 binding = FlowStageBinding.objects.create( 282 target=flow, 283 stage=DummyStage.objects.create(name=generate_id()), 284 order=0, 285 evaluate_on_plan=True, 286 re_evaluate_policies=False, 287 ) 288 binding2 = FlowStageBinding.objects.create( 289 target=flow, 290 stage=DummyStage.objects.create(name=generate_id()), 291 order=1, 292 re_evaluate_policies=True, 293 ) 294 295 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 296 297 # Here we patch the dummy policy to evaluate to true so the stage is included 298 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 299 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 300 # First request, run the planner 301 response = self.client.get(exec_url) 302 self.assertEqual(response.status_code, 200) 303 304 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 305 306 self.assertEqual(plan.bindings[0], binding) 307 self.assertEqual(plan.bindings[1], binding2) 308 309 self.assertEqual(plan.markers[0].__class__, StageMarker) 310 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 311 312 # Second request, this passes the first dummy stage 313 response = self.client.post(exec_url) 314 self.assertEqual(response.status_code, 302) 315 316 # third request, this should trigger the re-evaluate 317 # We do this request without the patch, so the policy results in false 318 response = self.client.post(exec_url) 319 self.assertEqual(response.status_code, 302) 320 self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
Test planner with re-evaluate (last stage is removed)
322 def test_reevaluate_remove_middle(self): 323 """Test planner with re-evaluate (middle stage is removed)""" 324 flow = create_test_flow( 325 FlowDesignation.AUTHENTICATION, 326 ) 327 false_policy = DummyPolicy.objects.create( 328 name=generate_id(), result=False, wait_min=1, wait_max=2 329 ) 330 331 binding = FlowStageBinding.objects.create( 332 target=flow, 333 stage=DummyStage.objects.create(name=generate_id()), 334 order=0, 335 evaluate_on_plan=True, 336 re_evaluate_policies=False, 337 ) 338 binding2 = FlowStageBinding.objects.create( 339 target=flow, 340 stage=DummyStage.objects.create(name=generate_id()), 341 order=1, 342 re_evaluate_policies=True, 343 ) 344 binding3 = FlowStageBinding.objects.create( 345 target=flow, 346 stage=DummyStage.objects.create(name=generate_id()), 347 order=2, 348 evaluate_on_plan=True, 349 re_evaluate_policies=False, 350 ) 351 352 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 353 354 # Here we patch the dummy policy to evaluate to true so the stage is included 355 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 356 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 357 # First request, run the planner 358 response = self.client.get(exec_url) 359 360 self.assertEqual(response.status_code, 200) 361 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 362 363 self.assertEqual(plan.bindings[0], binding) 364 self.assertEqual(plan.bindings[1], binding2) 365 self.assertEqual(plan.bindings[2], binding3) 366 367 self.assertEqual(plan.markers[0].__class__, StageMarker) 368 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 369 self.assertEqual(plan.markers[2].__class__, StageMarker) 370 371 # Second request, this passes the first dummy stage 372 response = self.client.post(exec_url) 373 self.assertEqual(response.status_code, 302) 374 375 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 376 377 self.assertEqual(plan.bindings[0], binding2) 378 self.assertEqual(plan.bindings[1], binding3) 379 380 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 381 self.assertEqual(plan.markers[1].__class__, StageMarker) 382 383 # third request, this should trigger the re-evaluate 384 # We do this request without the patch, so the policy results in false 385 response = self.client.post(exec_url) 386 self.assertEqual(response.status_code, 200) 387 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test planner with re-evaluate (middle stage is removed)
389 def test_reevaluate_keep(self): 390 """Test planner with re-evaluate (everything is kept)""" 391 flow = create_test_flow( 392 FlowDesignation.AUTHENTICATION, 393 ) 394 true_policy = DummyPolicy.objects.create( 395 name=generate_id(), result=True, wait_min=1, wait_max=2 396 ) 397 398 binding = FlowStageBinding.objects.create( 399 target=flow, 400 stage=DummyStage.objects.create(name=generate_id()), 401 order=0, 402 evaluate_on_plan=True, 403 re_evaluate_policies=False, 404 ) 405 binding2 = FlowStageBinding.objects.create( 406 target=flow, 407 stage=DummyStage.objects.create(name=generate_id()), 408 order=1, 409 re_evaluate_policies=True, 410 ) 411 binding3 = FlowStageBinding.objects.create( 412 target=flow, 413 stage=DummyStage.objects.create(name=generate_id()), 414 order=2, 415 evaluate_on_plan=True, 416 re_evaluate_policies=False, 417 ) 418 419 PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0) 420 421 # Here we patch the dummy policy to evaluate to true so the stage is included 422 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 423 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 424 # First request, run the planner 425 response = self.client.get(exec_url) 426 427 self.assertEqual(response.status_code, 200) 428 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 429 430 self.assertEqual(plan.bindings[0], binding) 431 self.assertEqual(plan.bindings[1], binding2) 432 self.assertEqual(plan.bindings[2], binding3) 433 434 self.assertEqual(plan.markers[0].__class__, StageMarker) 435 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 436 self.assertEqual(plan.markers[2].__class__, StageMarker) 437 438 # Second request, this passes the first dummy stage 439 response = self.client.post(exec_url) 440 self.assertEqual(response.status_code, 302) 441 442 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 443 444 self.assertEqual(plan.bindings[0], binding2) 445 self.assertEqual(plan.bindings[1], binding3) 446 447 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 448 self.assertEqual(plan.markers[1].__class__, StageMarker) 449 450 # Third request, this passes the first dummy stage 451 response = self.client.post(exec_url) 452 self.assertEqual(response.status_code, 302) 453 454 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 455 456 self.assertEqual(plan.bindings[0], binding3) 457 458 self.assertEqual(plan.markers[0].__class__, StageMarker) 459 460 # third request, this should trigger the re-evaluate 461 # We do this request without the patch, so the policy results in false 462 response = self.client.post(exec_url) 463 self.assertEqual(response.status_code, 200) 464 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test planner with re-evaluate (everything is kept)
466 def test_reevaluate_remove_consecutive(self): 467 """Test planner with re-evaluate (consecutive stages are removed)""" 468 flow = create_test_flow( 469 FlowDesignation.AUTHENTICATION, 470 ) 471 false_policy = DummyPolicy.objects.create( 472 name=generate_id(), result=False, wait_min=1, wait_max=2 473 ) 474 475 binding = FlowStageBinding.objects.create( 476 target=flow, 477 stage=DummyStage.objects.create(name=generate_id()), 478 order=0, 479 evaluate_on_plan=True, 480 re_evaluate_policies=False, 481 ) 482 binding2 = FlowStageBinding.objects.create( 483 target=flow, 484 stage=DummyStage.objects.create(name=generate_id()), 485 order=1, 486 re_evaluate_policies=True, 487 ) 488 binding3 = FlowStageBinding.objects.create( 489 target=flow, 490 stage=DummyStage.objects.create(name=generate_id()), 491 order=2, 492 re_evaluate_policies=True, 493 ) 494 binding4 = FlowStageBinding.objects.create( 495 target=flow, 496 stage=DummyStage.objects.create(name=generate_id()), 497 order=2, 498 evaluate_on_plan=True, 499 re_evaluate_policies=False, 500 ) 501 502 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 503 PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0) 504 505 # Here we patch the dummy policy to evaluate to true so the stage is included 506 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 507 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 508 # First request, run the planner 509 response = self.client.get(exec_url) 510 self.assertEqual(response.status_code, 200) 511 self.assertStageResponse(response, flow, component="ak-stage-dummy") 512 513 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 514 515 self.assertEqual(plan.bindings[0], binding) 516 self.assertEqual(plan.bindings[1], binding2) 517 self.assertEqual(plan.bindings[2], binding3) 518 self.assertEqual(plan.bindings[3], binding4) 519 520 self.assertEqual(plan.markers[0].__class__, StageMarker) 521 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 522 self.assertEqual(plan.markers[2].__class__, ReevaluateMarker) 523 self.assertEqual(plan.markers[3].__class__, StageMarker) 524 525 # Second request, this passes the first dummy stage 526 response = self.client.post(exec_url) 527 self.assertEqual(response.status_code, 302) 528 529 # third request, this should trigger the re-evaluate 530 # A get request will evaluate the policies and this will return stage 4 531 # but it won't save it, hence we can't check the plan 532 response = self.client.get(exec_url) 533 self.assertStageResponse(response, flow, component="ak-stage-dummy") 534 535 # fourth request, this confirms the last stage (dummy4) 536 # We do this request without the patch, so the policy results in false 537 response = self.client.post(exec_url) 538 self.assertEqual(response.status_code, 200) 539 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test planner with re-evaluate (consecutive stages are removed)
541 def test_stageview_user_identifier(self): 542 """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER""" 543 flow = create_test_flow( 544 FlowDesignation.AUTHENTICATION, 545 ) 546 FlowStageBinding.objects.create( 547 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 548 ) 549 550 ident = "test-identifier" 551 552 user = User.objects.create(username="test-user") 553 request = self.request_factory.get( 554 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 555 ) 556 request.user = user 557 planner = FlowPlanner(flow) 558 plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident}) 559 560 executor = FlowExecutorView() 561 executor.plan = plan 562 executor.flow = flow 563 564 stage_view = StageView(executor) 565 self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username)
Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER
567 def test_invalid_restart(self): 568 """Test flow that restarts on invalid entry""" 569 flow = create_test_flow( 570 FlowDesignation.AUTHENTICATION, 571 ) 572 # Stage 0 is a deny stage that is added dynamically 573 # when the reputation policy says so 574 deny_stage = DenyStage.objects.create(name=generate_id()) 575 reputation_policy = ReputationPolicy.objects.create( 576 name=generate_id(), threshold=-1, check_ip=False 577 ) 578 deny_binding = FlowStageBinding.objects.create( 579 target=flow, 580 stage=deny_stage, 581 order=0, 582 evaluate_on_plan=False, 583 re_evaluate_policies=True, 584 ) 585 PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0) 586 587 # Stage 1 is an identification stage 588 ident_stage = IdentificationStage.objects.create( 589 name=generate_id(), 590 user_fields=[UserFields.E_MAIL], 591 pretend_user_exists=False, 592 ) 593 FlowStageBinding.objects.create( 594 target=flow, 595 stage=ident_stage, 596 order=1, 597 invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT, 598 ) 599 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 600 # First request, run the planner 601 response = self.client.get(exec_url) 602 self.assertStageResponse( 603 response, 604 flow, 605 component="ak-stage-identification", 606 password_fields=False, 607 primary_action="Log in", 608 sources=[], 609 show_source_labels=False, 610 user_fields=[UserFields.E_MAIL], 611 ) 612 response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True) 613 self.assertStageResponse(response, flow, component="ak-stage-access-denied")
Test flow that restarts on invalid entry
615 def test_re_evaluate_group_binding(self): 616 """Test re-evaluate stage binding that has a policy binding to a group""" 617 flow = create_test_flow() 618 619 user_group_membership = create_test_user() 620 user_direct_binding = create_test_user() 621 user_other = create_test_user() 622 623 group_a = Group.objects.create(name=generate_id()) 624 user_group_membership.groups.add(group_a) 625 626 # Stage 0 is an identification stage 627 ident_stage = IdentificationStage.objects.create( 628 name=generate_id(), 629 user_fields=[UserFields.USERNAME], 630 pretend_user_exists=False, 631 ) 632 FlowStageBinding.objects.create( 633 target=flow, 634 stage=ident_stage, 635 order=0, 636 ) 637 638 # Stage 1 is a dummy stage that is only shown for users in group_a 639 dummy_stage = DummyStage.objects.create(name=generate_id()) 640 dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1) 641 PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0) 642 PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0) 643 644 # Stage 2 is a deny stage that (in this case) only user_b will see 645 deny_stage = DenyStage.objects.create(name=generate_id()) 646 FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2) 647 648 exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 649 650 with self.subTest(f"Test user access through group: {user_group_membership}"): 651 self.client.logout() 652 # First request, run the planner 653 response = self.client.get(exec_url) 654 self.assertStageResponse(response, flow, component="ak-stage-identification") 655 response = self.client.post( 656 exec_url, {"uid_field": user_group_membership.username}, follow=True 657 ) 658 self.assertStageResponse(response, flow, component="ak-stage-dummy") 659 with self.subTest(f"Test user access through user: {user_direct_binding}"): 660 self.client.logout() 661 # First request, run the planner 662 response = self.client.get(exec_url) 663 self.assertStageResponse(response, flow, component="ak-stage-identification") 664 response = self.client.post( 665 exec_url, {"uid_field": user_direct_binding.username}, follow=True 666 ) 667 self.assertStageResponse(response, flow, component="ak-stage-dummy") 668 with self.subTest(f"Test user has no access: {user_other}"): 669 self.client.logout() 670 # First request, run the planner 671 response = self.client.get(exec_url) 672 self.assertStageResponse(response, flow, component="ak-stage-identification") 673 response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True) 674 self.assertStageResponse(response, flow, component="ak-stage-access-denied")
Test re-evaluate stage binding that has a policy binding to a group
676 @patch( 677 "authentik.flows.views.executor.to_stage_response", 678 TO_STAGE_RESPONSE_MOCK, 679 ) 680 def test_invalid_json(self): 681 """Test invalid JSON body""" 682 flow = create_test_flow() 683 FlowStageBinding.objects.create( 684 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 685 ) 686 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 687 688 with override_settings(TEST=False, DEBUG=False): 689 self.client.logout() 690 response = self.client.post(url, data="{", content_type="application/json") 691 self.assertEqual(response.status_code, 200) 692 693 with self.assertRaises(ParseError): 694 self.client.logout() 695 response = self.client.post(url, data="{", content_type="application/json") 696 self.assertEqual(response.status_code, 200)
Test invalid JSON body
698 def test_cancel_next(self): 699 """Test cancel URL with ?next param set""" 700 flow = create_test_flow() 701 702 # Stage 0 is an identification stage 703 ident_stage = IdentificationStage.objects.create( 704 name=generate_id(), 705 user_fields=[UserFields.USERNAME], 706 ) 707 FlowStageBinding.objects.create( 708 target=flow, 709 stage=ident_stage, 710 order=0, 711 ) 712 713 # Stage 1 is a password stage 714 password_stage = PasswordStage.objects.create(name=generate_id(), backends=[]) 715 FlowStageBinding.objects.create( 716 target=flow, 717 stage=password_stage, 718 order=1, 719 ) 720 res = self.client.get( 721 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 722 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}" 723 ) 724 self.assertStageResponse(res, flow, component="ak-stage-identification") 725 726 res = self.client.post( 727 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}) 728 + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}", 729 data={"component": "ak-stage-identification", "uid_field": generate_id()}, 730 follow=True, 731 ) 732 self.assertEqual(res.status_code, 200) 733 self.assertStageResponse( 734 res, 735 flow, 736 flow_info={ 737 "background": "/static/dist/assets/images/flow_background.jpg", 738 "background_themed_urls": None, 739 "cancel_url": "/flows/-/cancel/?next=%2Ffoo", 740 "layout": "stacked", 741 "title": flow.title, 742 }, 743 )
Test cancel URL with ?next param set