authentik.flows.tests.test_executor

flow views tests

  1"""flow views tests"""
  2
  3from unittest.mock import MagicMock, PropertyMock, patch
  4from urllib.parse import urlencode
  5
  6from django.http import HttpRequest, HttpResponse
  7from django.test import override_settings
  8from django.test.client import RequestFactory
  9from django.urls import reverse
 10from rest_framework.exceptions import ParseError
 11
 12from authentik.core.models import Group, User
 13from authentik.core.tests.utils import create_test_flow, create_test_user
 14from authentik.flows.markers import ReevaluateMarker, StageMarker
 15from authentik.flows.models import (
 16    FlowAuthenticationRequirement,
 17    FlowDeniedAction,
 18    FlowDesignation,
 19    FlowStageBinding,
 20    InvalidResponseAction,
 21)
 22from authentik.flows.planner import FlowPlan, FlowPlanner
 23from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView
 24from authentik.flows.tests import FlowTestCase
 25from authentik.flows.views.executor import (
 26    NEXT_ARG_NAME,
 27    QS_QUERY,
 28    SESSION_KEY_PLAN,
 29    FlowExecutorView,
 30)
 31from authentik.lib.generators import generate_id
 32from authentik.policies.dummy.models import DummyPolicy
 33from authentik.policies.models import PolicyBinding
 34from authentik.policies.reputation.models import ReputationPolicy
 35from authentik.policies.types import PolicyResult
 36from authentik.stages.deny.models import DenyStage
 37from authentik.stages.dummy.models import DummyStage
 38from authentik.stages.identification.models import IdentificationStage, UserFields
 39from authentik.stages.password.models import PasswordStage
 40
 41POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False, "foo"))
 42POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True))
 43
 44
 45def to_stage_response(request: HttpRequest, source: HttpResponse):
 46    """Mock for to_stage_response that returns the original response, so we can check
 47    inheritance and member attributes"""
 48    return source
 49
 50
 51TO_STAGE_RESPONSE_MOCK = MagicMock(side_effect=to_stage_response)
 52
 53
 54class TestFlowExecutor(FlowTestCase):
 55    """Test executor"""
 56
 57    def setUp(self):
 58        self.request_factory = RequestFactory()
 59
 60    @patch(
 61        "authentik.flows.views.executor.to_stage_response",
 62        TO_STAGE_RESPONSE_MOCK,
 63    )
 64    def test_existing_plan_diff_flow(self):
 65        """Check that a plan for a different flow cancels the current plan"""
 66        flow = create_test_flow(
 67            FlowDesignation.AUTHENTICATION,
 68        )
 69        stage = DummyStage.objects.create(name=generate_id())
 70        binding = FlowStageBinding(target=flow, stage=stage, order=0)
 71        plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()])
 72        session = self.client.session
 73        session[SESSION_KEY_PLAN] = plan
 74        session.save()
 75
 76        cancel_mock = MagicMock()
 77        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock):
 78            response = self.client.get(
 79                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 80            )
 81            self.assertEqual(response.status_code, 302)
 82            self.assertEqual(cancel_mock.call_count, 2)
 83
 84    @patch(
 85        "authentik.flows.views.executor.to_stage_response",
 86        TO_STAGE_RESPONSE_MOCK,
 87    )
 88    @patch(
 89        "authentik.policies.engine.PolicyEngine.result",
 90        POLICY_RETURN_FALSE,
 91    )
 92    def test_invalid_non_applicable_flow(self):
 93        """Tests that a non-applicable flow returns the correct error message"""
 94        flow = create_test_flow(
 95            FlowDesignation.AUTHENTICATION,
 96        )
 97
 98        response = self.client.get(
 99            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
100        )
101        self.assertStageResponse(
102            response,
103            flow=flow,
104            error_message="foo",
105            component="ak-stage-access-denied",
106        )
107
108    @patch(
109        "authentik.flows.views.executor.to_stage_response",
110        TO_STAGE_RESPONSE_MOCK,
111    )
112    @patch(
113        "authentik.policies.engine.PolicyEngine.result",
114        POLICY_RETURN_FALSE,
115    )
116    def test_invalid_non_applicable_flow_continue(self):
117        """Tests that a non-applicable flow that should redirect"""
118        flow = create_test_flow(
119            FlowDesignation.AUTHENTICATION,
120            denied_action=FlowDeniedAction.CONTINUE,
121        )
122
123        response = self.client.get(
124            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
125        )
126        self.assertEqual(response.status_code, 302)
127        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
128
129    @patch(
130        "authentik.flows.views.executor.to_stage_response",
131        TO_STAGE_RESPONSE_MOCK,
132    )
133    def test_invalid_flow_redirect(self):
134        """Test invalid flow with valid redirect destination"""
135        flow = create_test_flow(
136            FlowDesignation.AUTHENTICATION,
137        )
138
139        dest = "/unique-string"
140        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
141        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
142        self.assertEqual(response.status_code, 302)
143        self.assertEqual(response.url, "/unique-string")
144
145    @patch(
146        "authentik.flows.views.executor.to_stage_response",
147        TO_STAGE_RESPONSE_MOCK,
148    )
149    def test_invalid_flow_invalid_redirect(self):
150        """Test invalid flow redirect with an invalid URL"""
151        flow = create_test_flow(
152            FlowDesignation.AUTHENTICATION,
153        )
154
155        dest = "http://something.example.com/unique-string"
156        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
157
158        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
159        self.assertEqual(response.status_code, 200)
160        self.assertStageResponse(
161            response,
162            flow,
163            component="ak-stage-access-denied",
164            error_message="Invalid next URL",
165        )
166
167    @patch(
168        "authentik.flows.views.executor.to_stage_response",
169        TO_STAGE_RESPONSE_MOCK,
170    )
171    def test_valid_flow_redirect(self):
172        """Test valid flow with valid redirect destination"""
173        flow = create_test_flow()
174
175        dest = "/unique-string"
176        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
177
178        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
179        self.assertEqual(response.status_code, 302)
180        self.assertEqual(response.url, "/unique-string")
181
182    @patch(
183        "authentik.flows.views.executor.to_stage_response",
184        TO_STAGE_RESPONSE_MOCK,
185    )
186    def test_valid_flow_redirect_authenticated(self):
187        """Test valid flow with valid redirect destination, authenticated already"""
188        flow = create_test_flow()
189        flow.designation = FlowDesignation.AUTHENTICATION
190        flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
191        flow.save()
192        self.client.force_login(create_test_user())
193
194        dest = "/unique-string"
195        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
196
197        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
198        self.assertEqual(response.status_code, 302)
199        self.assertEqual(response.url, "/unique-string")
200
201    @patch(
202        "authentik.flows.views.executor.to_stage_response",
203        TO_STAGE_RESPONSE_MOCK,
204    )
205    def test_valid_flow_invalid_redirect(self):
206        """Test valid flow redirect with an invalid URL"""
207        flow = create_test_flow()
208
209        dest = "http://something.example.com/unique-string"
210        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
211
212        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
213        self.assertEqual(response.status_code, 200)
214        self.assertStageResponse(
215            response,
216            flow,
217            component="ak-stage-access-denied",
218            error_message="Invalid next URL",
219        )
220
221    @patch(
222        "authentik.flows.views.executor.to_stage_response",
223        TO_STAGE_RESPONSE_MOCK,
224    )
225    def test_invalid_empty_flow(self):
226        """Tests that an empty flow returns the correct error message"""
227        flow = create_test_flow(
228            FlowDesignation.AUTHENTICATION,
229        )
230
231        response = self.client.get(
232            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
233        )
234        self.assertEqual(response.status_code, 302)
235        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
236
237    def test_multi_stage_flow(self):
238        """Test a full flow with multiple stages"""
239        flow = create_test_flow(
240            FlowDesignation.AUTHENTICATION,
241        )
242        FlowStageBinding.objects.create(
243            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
244        )
245        FlowStageBinding.objects.create(
246            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1
247        )
248
249        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
250        # First Request, start planning, renders form
251        response = self.client.get(exec_url)
252        self.assertEqual(response.status_code, 200)
253        # Check that two stages are in plan
254        session = self.client.session
255        plan: FlowPlan = session[SESSION_KEY_PLAN]
256        self.assertEqual(len(plan.bindings), 2)
257        # Second request, submit form, one stage left
258        response = self.client.post(exec_url)
259        # Second request redirects to the same URL
260        self.assertEqual(response.status_code, 302)
261        self.assertEqual(response.url, exec_url)
262        # Check that two stages are in plan
263        session = self.client.session
264        plan: FlowPlan = session[SESSION_KEY_PLAN]
265        self.assertEqual(len(plan.bindings), 1)
266
267    @patch(
268        "authentik.flows.views.executor.to_stage_response",
269        TO_STAGE_RESPONSE_MOCK,
270    )
271    def test_reevaluate_remove_last(self):
272        """Test planner with re-evaluate (last stage is removed)"""
273        flow = create_test_flow(
274            FlowDesignation.AUTHENTICATION,
275        )
276        false_policy = DummyPolicy.objects.create(
277            name=generate_id(), result=False, wait_min=1, wait_max=2
278        )
279
280        binding = FlowStageBinding.objects.create(
281            target=flow,
282            stage=DummyStage.objects.create(name=generate_id()),
283            order=0,
284            evaluate_on_plan=True,
285            re_evaluate_policies=False,
286        )
287        binding2 = FlowStageBinding.objects.create(
288            target=flow,
289            stage=DummyStage.objects.create(name=generate_id()),
290            order=1,
291            re_evaluate_policies=True,
292        )
293
294        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
295
296        # Here we patch the dummy policy to evaluate to true so the stage is included
297        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
298            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
299            # First request, run the planner
300            response = self.client.get(exec_url)
301            self.assertEqual(response.status_code, 200)
302
303            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
304
305            self.assertEqual(plan.bindings[0], binding)
306            self.assertEqual(plan.bindings[1], binding2)
307
308            self.assertEqual(plan.markers[0].__class__, StageMarker)
309            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
310
311            # Second request, this passes the first dummy stage
312            response = self.client.post(exec_url)
313            self.assertEqual(response.status_code, 302)
314
315        # third request, this should trigger the re-evaluate
316        # We do this request without the patch, so the policy results in false
317        response = self.client.post(exec_url)
318        self.assertEqual(response.status_code, 302)
319        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
320
321    def test_reevaluate_remove_middle(self):
322        """Test planner with re-evaluate (middle stage is removed)"""
323        flow = create_test_flow(
324            FlowDesignation.AUTHENTICATION,
325        )
326        false_policy = DummyPolicy.objects.create(
327            name=generate_id(), result=False, wait_min=1, wait_max=2
328        )
329
330        binding = FlowStageBinding.objects.create(
331            target=flow,
332            stage=DummyStage.objects.create(name=generate_id()),
333            order=0,
334            evaluate_on_plan=True,
335            re_evaluate_policies=False,
336        )
337        binding2 = FlowStageBinding.objects.create(
338            target=flow,
339            stage=DummyStage.objects.create(name=generate_id()),
340            order=1,
341            re_evaluate_policies=True,
342        )
343        binding3 = FlowStageBinding.objects.create(
344            target=flow,
345            stage=DummyStage.objects.create(name=generate_id()),
346            order=2,
347            evaluate_on_plan=True,
348            re_evaluate_policies=False,
349        )
350
351        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
352
353        # Here we patch the dummy policy to evaluate to true so the stage is included
354        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
355            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
356            # First request, run the planner
357            response = self.client.get(exec_url)
358
359            self.assertEqual(response.status_code, 200)
360            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
361
362            self.assertEqual(plan.bindings[0], binding)
363            self.assertEqual(plan.bindings[1], binding2)
364            self.assertEqual(plan.bindings[2], binding3)
365
366            self.assertEqual(plan.markers[0].__class__, StageMarker)
367            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
368            self.assertEqual(plan.markers[2].__class__, StageMarker)
369
370            # Second request, this passes the first dummy stage
371            response = self.client.post(exec_url)
372            self.assertEqual(response.status_code, 302)
373
374            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
375
376            self.assertEqual(plan.bindings[0], binding2)
377            self.assertEqual(plan.bindings[1], binding3)
378
379            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
380            self.assertEqual(plan.markers[1].__class__, StageMarker)
381
382        # third request, this should trigger the re-evaluate
383        # We do this request without the patch, so the policy results in false
384        response = self.client.post(exec_url)
385        self.assertEqual(response.status_code, 200)
386        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
387
388    def test_reevaluate_keep(self):
389        """Test planner with re-evaluate (everything is kept)"""
390        flow = create_test_flow(
391            FlowDesignation.AUTHENTICATION,
392        )
393        true_policy = DummyPolicy.objects.create(
394            name=generate_id(), result=True, wait_min=1, wait_max=2
395        )
396
397        binding = FlowStageBinding.objects.create(
398            target=flow,
399            stage=DummyStage.objects.create(name=generate_id()),
400            order=0,
401            evaluate_on_plan=True,
402            re_evaluate_policies=False,
403        )
404        binding2 = FlowStageBinding.objects.create(
405            target=flow,
406            stage=DummyStage.objects.create(name=generate_id()),
407            order=1,
408            re_evaluate_policies=True,
409        )
410        binding3 = FlowStageBinding.objects.create(
411            target=flow,
412            stage=DummyStage.objects.create(name=generate_id()),
413            order=2,
414            evaluate_on_plan=True,
415            re_evaluate_policies=False,
416        )
417
418        PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0)
419
420        # Here we patch the dummy policy to evaluate to true so the stage is included
421        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
422            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
423            # First request, run the planner
424            response = self.client.get(exec_url)
425
426            self.assertEqual(response.status_code, 200)
427            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
428
429            self.assertEqual(plan.bindings[0], binding)
430            self.assertEqual(plan.bindings[1], binding2)
431            self.assertEqual(plan.bindings[2], binding3)
432
433            self.assertEqual(plan.markers[0].__class__, StageMarker)
434            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
435            self.assertEqual(plan.markers[2].__class__, StageMarker)
436
437            # Second request, this passes the first dummy stage
438            response = self.client.post(exec_url)
439            self.assertEqual(response.status_code, 302)
440
441            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
442
443            self.assertEqual(plan.bindings[0], binding2)
444            self.assertEqual(plan.bindings[1], binding3)
445
446            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
447            self.assertEqual(plan.markers[1].__class__, StageMarker)
448
449            # Third request, this passes the first dummy stage
450            response = self.client.post(exec_url)
451            self.assertEqual(response.status_code, 302)
452
453            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
454
455            self.assertEqual(plan.bindings[0], binding3)
456
457            self.assertEqual(plan.markers[0].__class__, StageMarker)
458
459        # third request, this should trigger the re-evaluate
460        # We do this request without the patch, so the policy results in false
461        response = self.client.post(exec_url)
462        self.assertEqual(response.status_code, 200)
463        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
464
465    def test_reevaluate_remove_consecutive(self):
466        """Test planner with re-evaluate (consecutive stages are removed)"""
467        flow = create_test_flow(
468            FlowDesignation.AUTHENTICATION,
469        )
470        false_policy = DummyPolicy.objects.create(
471            name=generate_id(), result=False, wait_min=1, wait_max=2
472        )
473
474        binding = FlowStageBinding.objects.create(
475            target=flow,
476            stage=DummyStage.objects.create(name=generate_id()),
477            order=0,
478            evaluate_on_plan=True,
479            re_evaluate_policies=False,
480        )
481        binding2 = FlowStageBinding.objects.create(
482            target=flow,
483            stage=DummyStage.objects.create(name=generate_id()),
484            order=1,
485            re_evaluate_policies=True,
486        )
487        binding3 = FlowStageBinding.objects.create(
488            target=flow,
489            stage=DummyStage.objects.create(name=generate_id()),
490            order=2,
491            re_evaluate_policies=True,
492        )
493        binding4 = FlowStageBinding.objects.create(
494            target=flow,
495            stage=DummyStage.objects.create(name=generate_id()),
496            order=2,
497            evaluate_on_plan=True,
498            re_evaluate_policies=False,
499        )
500
501        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
502        PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0)
503
504        # Here we patch the dummy policy to evaluate to true so the stage is included
505        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
506            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
507            # First request, run the planner
508            response = self.client.get(exec_url)
509            self.assertEqual(response.status_code, 200)
510            self.assertStageResponse(response, flow, component="ak-stage-dummy")
511
512            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
513
514            self.assertEqual(plan.bindings[0], binding)
515            self.assertEqual(plan.bindings[1], binding2)
516            self.assertEqual(plan.bindings[2], binding3)
517            self.assertEqual(plan.bindings[3], binding4)
518
519            self.assertEqual(plan.markers[0].__class__, StageMarker)
520            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
521            self.assertEqual(plan.markers[2].__class__, ReevaluateMarker)
522            self.assertEqual(plan.markers[3].__class__, StageMarker)
523
524        # Second request, this passes the first dummy stage
525        response = self.client.post(exec_url)
526        self.assertEqual(response.status_code, 302)
527
528        # third request, this should trigger the re-evaluate
529        # A get request will evaluate the policies and this will return stage 4
530        # but it won't save it, hence we can't check the plan
531        response = self.client.get(exec_url)
532        self.assertStageResponse(response, flow, component="ak-stage-dummy")
533
534        # fourth request, this confirms the last stage (dummy4)
535        # We do this request without the patch, so the policy results in false
536        response = self.client.post(exec_url)
537        self.assertEqual(response.status_code, 200)
538        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
539
540    def test_stageview_user_identifier(self):
541        """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER"""
542        flow = create_test_flow(
543            FlowDesignation.AUTHENTICATION,
544        )
545        FlowStageBinding.objects.create(
546            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
547        )
548
549        ident = "test-identifier"
550
551        user = User.objects.create(username="test-user")
552        request = self.request_factory.get(
553            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
554        )
555        request.user = user
556        planner = FlowPlanner(flow)
557        plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident})
558
559        executor = FlowExecutorView()
560        executor.plan = plan
561        executor.flow = flow
562
563        stage_view = StageView(executor)
564        self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username)
565
566    def test_invalid_restart(self):
567        """Test flow that restarts on invalid entry"""
568        flow = create_test_flow(
569            FlowDesignation.AUTHENTICATION,
570        )
571        # Stage 0 is a deny stage that is added dynamically
572        # when the reputation policy says so
573        deny_stage = DenyStage.objects.create(name=generate_id())
574        reputation_policy = ReputationPolicy.objects.create(
575            name=generate_id(), threshold=-1, check_ip=False
576        )
577        deny_binding = FlowStageBinding.objects.create(
578            target=flow,
579            stage=deny_stage,
580            order=0,
581            evaluate_on_plan=False,
582            re_evaluate_policies=True,
583        )
584        PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0)
585
586        # Stage 1 is an identification stage
587        ident_stage = IdentificationStage.objects.create(
588            name=generate_id(),
589            user_fields=[UserFields.E_MAIL],
590            pretend_user_exists=False,
591        )
592        FlowStageBinding.objects.create(
593            target=flow,
594            stage=ident_stage,
595            order=1,
596            invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT,
597        )
598        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
599        # First request, run the planner
600        response = self.client.get(exec_url)
601        self.assertStageResponse(
602            response,
603            flow,
604            component="ak-stage-identification",
605            password_fields=False,
606            primary_action="Log in",
607            sources=[],
608            show_source_labels=False,
609            user_fields=[UserFields.E_MAIL],
610        )
611        response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True)
612        self.assertStageResponse(response, flow, component="ak-stage-access-denied")
613
614    def test_re_evaluate_group_binding(self):
615        """Test re-evaluate stage binding that has a policy binding to a group"""
616        flow = create_test_flow()
617
618        user_group_membership = create_test_user()
619        user_direct_binding = create_test_user()
620        user_other = create_test_user()
621
622        group_a = Group.objects.create(name=generate_id())
623        user_group_membership.groups.add(group_a)
624
625        # Stage 0 is an identification stage
626        ident_stage = IdentificationStage.objects.create(
627            name=generate_id(),
628            user_fields=[UserFields.USERNAME],
629            pretend_user_exists=False,
630        )
631        FlowStageBinding.objects.create(
632            target=flow,
633            stage=ident_stage,
634            order=0,
635        )
636
637        # Stage 1 is a dummy stage that is only shown for users in group_a
638        dummy_stage = DummyStage.objects.create(name=generate_id())
639        dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1)
640        PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0)
641        PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0)
642
643        # Stage 2 is a deny stage that (in this case) only user_b will see
644        deny_stage = DenyStage.objects.create(name=generate_id())
645        FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2)
646
647        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
648
649        with self.subTest(f"Test user access through group: {user_group_membership}"):
650            self.client.logout()
651            # First request, run the planner
652            response = self.client.get(exec_url)
653            self.assertStageResponse(response, flow, component="ak-stage-identification")
654            response = self.client.post(
655                exec_url, {"uid_field": user_group_membership.username}, follow=True
656            )
657            self.assertStageResponse(response, flow, component="ak-stage-dummy")
658        with self.subTest(f"Test user access through user: {user_direct_binding}"):
659            self.client.logout()
660            # First request, run the planner
661            response = self.client.get(exec_url)
662            self.assertStageResponse(response, flow, component="ak-stage-identification")
663            response = self.client.post(
664                exec_url, {"uid_field": user_direct_binding.username}, follow=True
665            )
666            self.assertStageResponse(response, flow, component="ak-stage-dummy")
667        with self.subTest(f"Test user has no access: {user_other}"):
668            self.client.logout()
669            # First request, run the planner
670            response = self.client.get(exec_url)
671            self.assertStageResponse(response, flow, component="ak-stage-identification")
672            response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True)
673            self.assertStageResponse(response, flow, component="ak-stage-access-denied")
674
675    @patch(
676        "authentik.flows.views.executor.to_stage_response",
677        TO_STAGE_RESPONSE_MOCK,
678    )
679    def test_invalid_json(self):
680        """Test invalid JSON body"""
681        flow = create_test_flow()
682        FlowStageBinding.objects.create(
683            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
684        )
685        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
686
687        with override_settings(TEST=False, DEBUG=False):
688            self.client.logout()
689            response = self.client.post(url, data="{", content_type="application/json")
690            self.assertEqual(response.status_code, 200)
691
692        with self.assertRaises(ParseError):
693            self.client.logout()
694            response = self.client.post(url, data="{", content_type="application/json")
695            self.assertEqual(response.status_code, 200)
696
697    def test_cancel_next(self):
698        """Test cancel URL with ?next param set"""
699        flow = create_test_flow()
700
701        # Stage 0 is an identification stage
702        ident_stage = IdentificationStage.objects.create(
703            name=generate_id(),
704            user_fields=[UserFields.USERNAME],
705        )
706        FlowStageBinding.objects.create(
707            target=flow,
708            stage=ident_stage,
709            order=0,
710        )
711
712        # Stage 1 is a password stage
713        password_stage = PasswordStage.objects.create(name=generate_id(), backends=[])
714        FlowStageBinding.objects.create(
715            target=flow,
716            stage=password_stage,
717            order=1,
718        )
719        res = self.client.get(
720            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
721            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}"
722        )
723        self.assertStageResponse(res, flow, component="ak-stage-identification")
724
725        res = self.client.post(
726            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
727            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}",
728            data={"component": "ak-stage-identification", "uid_field": generate_id()},
729            follow=True,
730        )
731        self.assertEqual(res.status_code, 200)
732        self.assertStageResponse(
733            res,
734            flow,
735            flow_info={
736                "background": "/static/dist/assets/images/flow_background.jpg",
737                "background_themed_urls": None,
738                "cancel_url": "/flows/-/cancel/?next=%2Ffoo",
739                "layout": "stacked",
740                "title": flow.title,
741            },
742        )
POLICY_RETURN_FALSE

A mock intended to be used as a property, or other descriptor, on a class. PropertyMock provides __get__ and __set__ methods so you can specify a return value when it is fetched.

Fetching a PropertyMock instance from an object calls the mock, with no args. Setting it calls the mock with the value being set.

POLICY_RETURN_TRUE = <MagicMock id='139707766231056'>
def to_stage_response( request: django.http.request.HttpRequest, source: django.http.response.HttpResponse):
46def to_stage_response(request: HttpRequest, source: HttpResponse):
47    """Mock for to_stage_response that returns the original response, so we can check
48    inheritance and member attributes"""
49    return source

Mock for to_stage_response that returns the original response, so we can check inheritance and member attributes

TO_STAGE_RESPONSE_MOCK = <MagicMock id='139707766231392'>
class TestFlowExecutor(authentik.flows.tests.FlowTestCase):
 55class TestFlowExecutor(FlowTestCase):
 56    """Test executor"""
 57
 58    def setUp(self):
 59        self.request_factory = RequestFactory()
 60
 61    @patch(
 62        "authentik.flows.views.executor.to_stage_response",
 63        TO_STAGE_RESPONSE_MOCK,
 64    )
 65    def test_existing_plan_diff_flow(self):
 66        """Check that a plan for a different flow cancels the current plan"""
 67        flow = create_test_flow(
 68            FlowDesignation.AUTHENTICATION,
 69        )
 70        stage = DummyStage.objects.create(name=generate_id())
 71        binding = FlowStageBinding(target=flow, stage=stage, order=0)
 72        plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()])
 73        session = self.client.session
 74        session[SESSION_KEY_PLAN] = plan
 75        session.save()
 76
 77        cancel_mock = MagicMock()
 78        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock):
 79            response = self.client.get(
 80                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 81            )
 82            self.assertEqual(response.status_code, 302)
 83            self.assertEqual(cancel_mock.call_count, 2)
 84
 85    @patch(
 86        "authentik.flows.views.executor.to_stage_response",
 87        TO_STAGE_RESPONSE_MOCK,
 88    )
 89    @patch(
 90        "authentik.policies.engine.PolicyEngine.result",
 91        POLICY_RETURN_FALSE,
 92    )
 93    def test_invalid_non_applicable_flow(self):
 94        """Tests that a non-applicable flow returns the correct error message"""
 95        flow = create_test_flow(
 96            FlowDesignation.AUTHENTICATION,
 97        )
 98
 99        response = self.client.get(
100            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
101        )
102        self.assertStageResponse(
103            response,
104            flow=flow,
105            error_message="foo",
106            component="ak-stage-access-denied",
107        )
108
109    @patch(
110        "authentik.flows.views.executor.to_stage_response",
111        TO_STAGE_RESPONSE_MOCK,
112    )
113    @patch(
114        "authentik.policies.engine.PolicyEngine.result",
115        POLICY_RETURN_FALSE,
116    )
117    def test_invalid_non_applicable_flow_continue(self):
118        """Tests that a non-applicable flow that should redirect"""
119        flow = create_test_flow(
120            FlowDesignation.AUTHENTICATION,
121            denied_action=FlowDeniedAction.CONTINUE,
122        )
123
124        response = self.client.get(
125            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
126        )
127        self.assertEqual(response.status_code, 302)
128        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
129
130    @patch(
131        "authentik.flows.views.executor.to_stage_response",
132        TO_STAGE_RESPONSE_MOCK,
133    )
134    def test_invalid_flow_redirect(self):
135        """Test invalid flow with valid redirect destination"""
136        flow = create_test_flow(
137            FlowDesignation.AUTHENTICATION,
138        )
139
140        dest = "/unique-string"
141        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
142        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
143        self.assertEqual(response.status_code, 302)
144        self.assertEqual(response.url, "/unique-string")
145
146    @patch(
147        "authentik.flows.views.executor.to_stage_response",
148        TO_STAGE_RESPONSE_MOCK,
149    )
150    def test_invalid_flow_invalid_redirect(self):
151        """Test invalid flow redirect with an invalid URL"""
152        flow = create_test_flow(
153            FlowDesignation.AUTHENTICATION,
154        )
155
156        dest = "http://something.example.com/unique-string"
157        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
158
159        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
160        self.assertEqual(response.status_code, 200)
161        self.assertStageResponse(
162            response,
163            flow,
164            component="ak-stage-access-denied",
165            error_message="Invalid next URL",
166        )
167
168    @patch(
169        "authentik.flows.views.executor.to_stage_response",
170        TO_STAGE_RESPONSE_MOCK,
171    )
172    def test_valid_flow_redirect(self):
173        """Test valid flow with valid redirect destination"""
174        flow = create_test_flow()
175
176        dest = "/unique-string"
177        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
178
179        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
180        self.assertEqual(response.status_code, 302)
181        self.assertEqual(response.url, "/unique-string")
182
183    @patch(
184        "authentik.flows.views.executor.to_stage_response",
185        TO_STAGE_RESPONSE_MOCK,
186    )
187    def test_valid_flow_redirect_authenticated(self):
188        """Test valid flow with valid redirect destination, authenticated already"""
189        flow = create_test_flow()
190        flow.designation = FlowDesignation.AUTHENTICATION
191        flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
192        flow.save()
193        self.client.force_login(create_test_user())
194
195        dest = "/unique-string"
196        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
197
198        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
199        self.assertEqual(response.status_code, 302)
200        self.assertEqual(response.url, "/unique-string")
201
202    @patch(
203        "authentik.flows.views.executor.to_stage_response",
204        TO_STAGE_RESPONSE_MOCK,
205    )
206    def test_valid_flow_invalid_redirect(self):
207        """Test valid flow redirect with an invalid URL"""
208        flow = create_test_flow()
209
210        dest = "http://something.example.com/unique-string"
211        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
212
213        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
214        self.assertEqual(response.status_code, 200)
215        self.assertStageResponse(
216            response,
217            flow,
218            component="ak-stage-access-denied",
219            error_message="Invalid next URL",
220        )
221
222    @patch(
223        "authentik.flows.views.executor.to_stage_response",
224        TO_STAGE_RESPONSE_MOCK,
225    )
226    def test_invalid_empty_flow(self):
227        """Tests that an empty flow returns the correct error message"""
228        flow = create_test_flow(
229            FlowDesignation.AUTHENTICATION,
230        )
231
232        response = self.client.get(
233            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
234        )
235        self.assertEqual(response.status_code, 302)
236        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
237
238    def test_multi_stage_flow(self):
239        """Test a full flow with multiple stages"""
240        flow = create_test_flow(
241            FlowDesignation.AUTHENTICATION,
242        )
243        FlowStageBinding.objects.create(
244            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
245        )
246        FlowStageBinding.objects.create(
247            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1
248        )
249
250        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
251        # First Request, start planning, renders form
252        response = self.client.get(exec_url)
253        self.assertEqual(response.status_code, 200)
254        # Check that two stages are in plan
255        session = self.client.session
256        plan: FlowPlan = session[SESSION_KEY_PLAN]
257        self.assertEqual(len(plan.bindings), 2)
258        # Second request, submit form, one stage left
259        response = self.client.post(exec_url)
260        # Second request redirects to the same URL
261        self.assertEqual(response.status_code, 302)
262        self.assertEqual(response.url, exec_url)
263        # Check that two stages are in plan
264        session = self.client.session
265        plan: FlowPlan = session[SESSION_KEY_PLAN]
266        self.assertEqual(len(plan.bindings), 1)
267
268    @patch(
269        "authentik.flows.views.executor.to_stage_response",
270        TO_STAGE_RESPONSE_MOCK,
271    )
272    def test_reevaluate_remove_last(self):
273        """Test planner with re-evaluate (last stage is removed)"""
274        flow = create_test_flow(
275            FlowDesignation.AUTHENTICATION,
276        )
277        false_policy = DummyPolicy.objects.create(
278            name=generate_id(), result=False, wait_min=1, wait_max=2
279        )
280
281        binding = FlowStageBinding.objects.create(
282            target=flow,
283            stage=DummyStage.objects.create(name=generate_id()),
284            order=0,
285            evaluate_on_plan=True,
286            re_evaluate_policies=False,
287        )
288        binding2 = FlowStageBinding.objects.create(
289            target=flow,
290            stage=DummyStage.objects.create(name=generate_id()),
291            order=1,
292            re_evaluate_policies=True,
293        )
294
295        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
296
297        # Here we patch the dummy policy to evaluate to true so the stage is included
298        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
299            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
300            # First request, run the planner
301            response = self.client.get(exec_url)
302            self.assertEqual(response.status_code, 200)
303
304            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
305
306            self.assertEqual(plan.bindings[0], binding)
307            self.assertEqual(plan.bindings[1], binding2)
308
309            self.assertEqual(plan.markers[0].__class__, StageMarker)
310            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
311
312            # Second request, this passes the first dummy stage
313            response = self.client.post(exec_url)
314            self.assertEqual(response.status_code, 302)
315
316        # third request, this should trigger the re-evaluate
317        # We do this request without the patch, so the policy results in false
318        response = self.client.post(exec_url)
319        self.assertEqual(response.status_code, 302)
320        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
321
322    def test_reevaluate_remove_middle(self):
323        """Test planner with re-evaluate (middle stage is removed)"""
324        flow = create_test_flow(
325            FlowDesignation.AUTHENTICATION,
326        )
327        false_policy = DummyPolicy.objects.create(
328            name=generate_id(), result=False, wait_min=1, wait_max=2
329        )
330
331        binding = FlowStageBinding.objects.create(
332            target=flow,
333            stage=DummyStage.objects.create(name=generate_id()),
334            order=0,
335            evaluate_on_plan=True,
336            re_evaluate_policies=False,
337        )
338        binding2 = FlowStageBinding.objects.create(
339            target=flow,
340            stage=DummyStage.objects.create(name=generate_id()),
341            order=1,
342            re_evaluate_policies=True,
343        )
344        binding3 = FlowStageBinding.objects.create(
345            target=flow,
346            stage=DummyStage.objects.create(name=generate_id()),
347            order=2,
348            evaluate_on_plan=True,
349            re_evaluate_policies=False,
350        )
351
352        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
353
354        # Here we patch the dummy policy to evaluate to true so the stage is included
355        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
356            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
357            # First request, run the planner
358            response = self.client.get(exec_url)
359
360            self.assertEqual(response.status_code, 200)
361            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
362
363            self.assertEqual(plan.bindings[0], binding)
364            self.assertEqual(plan.bindings[1], binding2)
365            self.assertEqual(plan.bindings[2], binding3)
366
367            self.assertEqual(plan.markers[0].__class__, StageMarker)
368            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
369            self.assertEqual(plan.markers[2].__class__, StageMarker)
370
371            # Second request, this passes the first dummy stage
372            response = self.client.post(exec_url)
373            self.assertEqual(response.status_code, 302)
374
375            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
376
377            self.assertEqual(plan.bindings[0], binding2)
378            self.assertEqual(plan.bindings[1], binding3)
379
380            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
381            self.assertEqual(plan.markers[1].__class__, StageMarker)
382
383        # third request, this should trigger the re-evaluate
384        # We do this request without the patch, so the policy results in false
385        response = self.client.post(exec_url)
386        self.assertEqual(response.status_code, 200)
387        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
388
389    def test_reevaluate_keep(self):
390        """Test planner with re-evaluate (everything is kept)"""
391        flow = create_test_flow(
392            FlowDesignation.AUTHENTICATION,
393        )
394        true_policy = DummyPolicy.objects.create(
395            name=generate_id(), result=True, wait_min=1, wait_max=2
396        )
397
398        binding = FlowStageBinding.objects.create(
399            target=flow,
400            stage=DummyStage.objects.create(name=generate_id()),
401            order=0,
402            evaluate_on_plan=True,
403            re_evaluate_policies=False,
404        )
405        binding2 = FlowStageBinding.objects.create(
406            target=flow,
407            stage=DummyStage.objects.create(name=generate_id()),
408            order=1,
409            re_evaluate_policies=True,
410        )
411        binding3 = FlowStageBinding.objects.create(
412            target=flow,
413            stage=DummyStage.objects.create(name=generate_id()),
414            order=2,
415            evaluate_on_plan=True,
416            re_evaluate_policies=False,
417        )
418
419        PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0)
420
421        # Here we patch the dummy policy to evaluate to true so the stage is included
422        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
423            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
424            # First request, run the planner
425            response = self.client.get(exec_url)
426
427            self.assertEqual(response.status_code, 200)
428            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
429
430            self.assertEqual(plan.bindings[0], binding)
431            self.assertEqual(plan.bindings[1], binding2)
432            self.assertEqual(plan.bindings[2], binding3)
433
434            self.assertEqual(plan.markers[0].__class__, StageMarker)
435            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
436            self.assertEqual(plan.markers[2].__class__, StageMarker)
437
438            # Second request, this passes the first dummy stage
439            response = self.client.post(exec_url)
440            self.assertEqual(response.status_code, 302)
441
442            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
443
444            self.assertEqual(plan.bindings[0], binding2)
445            self.assertEqual(plan.bindings[1], binding3)
446
447            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
448            self.assertEqual(plan.markers[1].__class__, StageMarker)
449
450            # Third request, this passes the first dummy stage
451            response = self.client.post(exec_url)
452            self.assertEqual(response.status_code, 302)
453
454            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
455
456            self.assertEqual(plan.bindings[0], binding3)
457
458            self.assertEqual(plan.markers[0].__class__, StageMarker)
459
460        # third request, this should trigger the re-evaluate
461        # We do this request without the patch, so the policy results in false
462        response = self.client.post(exec_url)
463        self.assertEqual(response.status_code, 200)
464        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
465
466    def test_reevaluate_remove_consecutive(self):
467        """Test planner with re-evaluate (consecutive stages are removed)"""
468        flow = create_test_flow(
469            FlowDesignation.AUTHENTICATION,
470        )
471        false_policy = DummyPolicy.objects.create(
472            name=generate_id(), result=False, wait_min=1, wait_max=2
473        )
474
475        binding = FlowStageBinding.objects.create(
476            target=flow,
477            stage=DummyStage.objects.create(name=generate_id()),
478            order=0,
479            evaluate_on_plan=True,
480            re_evaluate_policies=False,
481        )
482        binding2 = FlowStageBinding.objects.create(
483            target=flow,
484            stage=DummyStage.objects.create(name=generate_id()),
485            order=1,
486            re_evaluate_policies=True,
487        )
488        binding3 = FlowStageBinding.objects.create(
489            target=flow,
490            stage=DummyStage.objects.create(name=generate_id()),
491            order=2,
492            re_evaluate_policies=True,
493        )
494        binding4 = FlowStageBinding.objects.create(
495            target=flow,
496            stage=DummyStage.objects.create(name=generate_id()),
497            order=2,
498            evaluate_on_plan=True,
499            re_evaluate_policies=False,
500        )
501
502        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
503        PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0)
504
505        # Here we patch the dummy policy to evaluate to true so the stage is included
506        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
507            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
508            # First request, run the planner
509            response = self.client.get(exec_url)
510            self.assertEqual(response.status_code, 200)
511            self.assertStageResponse(response, flow, component="ak-stage-dummy")
512
513            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
514
515            self.assertEqual(plan.bindings[0], binding)
516            self.assertEqual(plan.bindings[1], binding2)
517            self.assertEqual(plan.bindings[2], binding3)
518            self.assertEqual(plan.bindings[3], binding4)
519
520            self.assertEqual(plan.markers[0].__class__, StageMarker)
521            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
522            self.assertEqual(plan.markers[2].__class__, ReevaluateMarker)
523            self.assertEqual(plan.markers[3].__class__, StageMarker)
524
525        # Second request, this passes the first dummy stage
526        response = self.client.post(exec_url)
527        self.assertEqual(response.status_code, 302)
528
529        # third request, this should trigger the re-evaluate
530        # A get request will evaluate the policies and this will return stage 4
531        # but it won't save it, hence we can't check the plan
532        response = self.client.get(exec_url)
533        self.assertStageResponse(response, flow, component="ak-stage-dummy")
534
535        # fourth request, this confirms the last stage (dummy4)
536        # We do this request without the patch, so the policy results in false
537        response = self.client.post(exec_url)
538        self.assertEqual(response.status_code, 200)
539        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
540
541    def test_stageview_user_identifier(self):
542        """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER"""
543        flow = create_test_flow(
544            FlowDesignation.AUTHENTICATION,
545        )
546        FlowStageBinding.objects.create(
547            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
548        )
549
550        ident = "test-identifier"
551
552        user = User.objects.create(username="test-user")
553        request = self.request_factory.get(
554            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
555        )
556        request.user = user
557        planner = FlowPlanner(flow)
558        plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident})
559
560        executor = FlowExecutorView()
561        executor.plan = plan
562        executor.flow = flow
563
564        stage_view = StageView(executor)
565        self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username)
566
567    def test_invalid_restart(self):
568        """Test flow that restarts on invalid entry"""
569        flow = create_test_flow(
570            FlowDesignation.AUTHENTICATION,
571        )
572        # Stage 0 is a deny stage that is added dynamically
573        # when the reputation policy says so
574        deny_stage = DenyStage.objects.create(name=generate_id())
575        reputation_policy = ReputationPolicy.objects.create(
576            name=generate_id(), threshold=-1, check_ip=False
577        )
578        deny_binding = FlowStageBinding.objects.create(
579            target=flow,
580            stage=deny_stage,
581            order=0,
582            evaluate_on_plan=False,
583            re_evaluate_policies=True,
584        )
585        PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0)
586
587        # Stage 1 is an identification stage
588        ident_stage = IdentificationStage.objects.create(
589            name=generate_id(),
590            user_fields=[UserFields.E_MAIL],
591            pretend_user_exists=False,
592        )
593        FlowStageBinding.objects.create(
594            target=flow,
595            stage=ident_stage,
596            order=1,
597            invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT,
598        )
599        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
600        # First request, run the planner
601        response = self.client.get(exec_url)
602        self.assertStageResponse(
603            response,
604            flow,
605            component="ak-stage-identification",
606            password_fields=False,
607            primary_action="Log in",
608            sources=[],
609            show_source_labels=False,
610            user_fields=[UserFields.E_MAIL],
611        )
612        response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True)
613        self.assertStageResponse(response, flow, component="ak-stage-access-denied")
614
615    def test_re_evaluate_group_binding(self):
616        """Test re-evaluate stage binding that has a policy binding to a group"""
617        flow = create_test_flow()
618
619        user_group_membership = create_test_user()
620        user_direct_binding = create_test_user()
621        user_other = create_test_user()
622
623        group_a = Group.objects.create(name=generate_id())
624        user_group_membership.groups.add(group_a)
625
626        # Stage 0 is an identification stage
627        ident_stage = IdentificationStage.objects.create(
628            name=generate_id(),
629            user_fields=[UserFields.USERNAME],
630            pretend_user_exists=False,
631        )
632        FlowStageBinding.objects.create(
633            target=flow,
634            stage=ident_stage,
635            order=0,
636        )
637
638        # Stage 1 is a dummy stage that is only shown for users in group_a
639        dummy_stage = DummyStage.objects.create(name=generate_id())
640        dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1)
641        PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0)
642        PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0)
643
644        # Stage 2 is a deny stage that (in this case) only user_b will see
645        deny_stage = DenyStage.objects.create(name=generate_id())
646        FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2)
647
648        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
649
650        with self.subTest(f"Test user access through group: {user_group_membership}"):
651            self.client.logout()
652            # First request, run the planner
653            response = self.client.get(exec_url)
654            self.assertStageResponse(response, flow, component="ak-stage-identification")
655            response = self.client.post(
656                exec_url, {"uid_field": user_group_membership.username}, follow=True
657            )
658            self.assertStageResponse(response, flow, component="ak-stage-dummy")
659        with self.subTest(f"Test user access through user: {user_direct_binding}"):
660            self.client.logout()
661            # First request, run the planner
662            response = self.client.get(exec_url)
663            self.assertStageResponse(response, flow, component="ak-stage-identification")
664            response = self.client.post(
665                exec_url, {"uid_field": user_direct_binding.username}, follow=True
666            )
667            self.assertStageResponse(response, flow, component="ak-stage-dummy")
668        with self.subTest(f"Test user has no access: {user_other}"):
669            self.client.logout()
670            # First request, run the planner
671            response = self.client.get(exec_url)
672            self.assertStageResponse(response, flow, component="ak-stage-identification")
673            response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True)
674            self.assertStageResponse(response, flow, component="ak-stage-access-denied")
675
676    @patch(
677        "authentik.flows.views.executor.to_stage_response",
678        TO_STAGE_RESPONSE_MOCK,
679    )
680    def test_invalid_json(self):
681        """Test invalid JSON body"""
682        flow = create_test_flow()
683        FlowStageBinding.objects.create(
684            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
685        )
686        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
687
688        with override_settings(TEST=False, DEBUG=False):
689            self.client.logout()
690            response = self.client.post(url, data="{", content_type="application/json")
691            self.assertEqual(response.status_code, 200)
692
693        with self.assertRaises(ParseError):
694            self.client.logout()
695            response = self.client.post(url, data="{", content_type="application/json")
696            self.assertEqual(response.status_code, 200)
697
698    def test_cancel_next(self):
699        """Test cancel URL with ?next param set"""
700        flow = create_test_flow()
701
702        # Stage 0 is an identification stage
703        ident_stage = IdentificationStage.objects.create(
704            name=generate_id(),
705            user_fields=[UserFields.USERNAME],
706        )
707        FlowStageBinding.objects.create(
708            target=flow,
709            stage=ident_stage,
710            order=0,
711        )
712
713        # Stage 1 is a password stage
714        password_stage = PasswordStage.objects.create(name=generate_id(), backends=[])
715        FlowStageBinding.objects.create(
716            target=flow,
717            stage=password_stage,
718            order=1,
719        )
720        res = self.client.get(
721            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
722            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}"
723        )
724        self.assertStageResponse(res, flow, component="ak-stage-identification")
725
726        res = self.client.post(
727            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
728            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}",
729            data={"component": "ak-stage-identification", "uid_field": generate_id()},
730            follow=True,
731        )
732        self.assertEqual(res.status_code, 200)
733        self.assertStageResponse(
734            res,
735            flow,
736            flow_info={
737                "background": "/static/dist/assets/images/flow_background.jpg",
738                "background_themed_urls": None,
739                "cancel_url": "/flows/-/cancel/?next=%2Ffoo",
740                "layout": "stacked",
741                "title": flow.title,
742            },
743        )

Test executor

def setUp(self):
58    def setUp(self):
59        self.request_factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_existing_plan_diff_flow(self):
61    @patch(
62        "authentik.flows.views.executor.to_stage_response",
63        TO_STAGE_RESPONSE_MOCK,
64    )
65    def test_existing_plan_diff_flow(self):
66        """Check that a plan for a different flow cancels the current plan"""
67        flow = create_test_flow(
68            FlowDesignation.AUTHENTICATION,
69        )
70        stage = DummyStage.objects.create(name=generate_id())
71        binding = FlowStageBinding(target=flow, stage=stage, order=0)
72        plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()])
73        session = self.client.session
74        session[SESSION_KEY_PLAN] = plan
75        session.save()
76
77        cancel_mock = MagicMock()
78        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock):
79            response = self.client.get(
80                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
81            )
82            self.assertEqual(response.status_code, 302)
83            self.assertEqual(cancel_mock.call_count, 2)

Check that a plan for a different flow cancels the current plan

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
@patch('authentik.policies.engine.PolicyEngine.result', POLICY_RETURN_FALSE)
def test_invalid_non_applicable_flow(self):
 85    @patch(
 86        "authentik.flows.views.executor.to_stage_response",
 87        TO_STAGE_RESPONSE_MOCK,
 88    )
 89    @patch(
 90        "authentik.policies.engine.PolicyEngine.result",
 91        POLICY_RETURN_FALSE,
 92    )
 93    def test_invalid_non_applicable_flow(self):
 94        """Tests that a non-applicable flow returns the correct error message"""
 95        flow = create_test_flow(
 96            FlowDesignation.AUTHENTICATION,
 97        )
 98
 99        response = self.client.get(
100            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
101        )
102        self.assertStageResponse(
103            response,
104            flow=flow,
105            error_message="foo",
106            component="ak-stage-access-denied",
107        )

Tests that a non-applicable flow returns the correct error message

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
@patch('authentik.policies.engine.PolicyEngine.result', POLICY_RETURN_FALSE)
def test_invalid_non_applicable_flow_continue(self):
109    @patch(
110        "authentik.flows.views.executor.to_stage_response",
111        TO_STAGE_RESPONSE_MOCK,
112    )
113    @patch(
114        "authentik.policies.engine.PolicyEngine.result",
115        POLICY_RETURN_FALSE,
116    )
117    def test_invalid_non_applicable_flow_continue(self):
118        """Tests that a non-applicable flow that should redirect"""
119        flow = create_test_flow(
120            FlowDesignation.AUTHENTICATION,
121            denied_action=FlowDeniedAction.CONTINUE,
122        )
123
124        response = self.client.get(
125            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
126        )
127        self.assertEqual(response.status_code, 302)
128        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))

Tests that a non-applicable flow that should redirect

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_flow_redirect(self):
130    @patch(
131        "authentik.flows.views.executor.to_stage_response",
132        TO_STAGE_RESPONSE_MOCK,
133    )
134    def test_invalid_flow_redirect(self):
135        """Test invalid flow with valid redirect destination"""
136        flow = create_test_flow(
137            FlowDesignation.AUTHENTICATION,
138        )
139
140        dest = "/unique-string"
141        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
142        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
143        self.assertEqual(response.status_code, 302)
144        self.assertEqual(response.url, "/unique-string")

Test invalid flow with valid redirect destination

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_flow_invalid_redirect(self):
146    @patch(
147        "authentik.flows.views.executor.to_stage_response",
148        TO_STAGE_RESPONSE_MOCK,
149    )
150    def test_invalid_flow_invalid_redirect(self):
151        """Test invalid flow redirect with an invalid URL"""
152        flow = create_test_flow(
153            FlowDesignation.AUTHENTICATION,
154        )
155
156        dest = "http://something.example.com/unique-string"
157        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
158
159        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
160        self.assertEqual(response.status_code, 200)
161        self.assertStageResponse(
162            response,
163            flow,
164            component="ak-stage-access-denied",
165            error_message="Invalid next URL",
166        )

Test invalid flow redirect with an invalid URL

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_valid_flow_redirect(self):
168    @patch(
169        "authentik.flows.views.executor.to_stage_response",
170        TO_STAGE_RESPONSE_MOCK,
171    )
172    def test_valid_flow_redirect(self):
173        """Test valid flow with valid redirect destination"""
174        flow = create_test_flow()
175
176        dest = "/unique-string"
177        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
178
179        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
180        self.assertEqual(response.status_code, 302)
181        self.assertEqual(response.url, "/unique-string")

Test valid flow with valid redirect destination

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_valid_flow_redirect_authenticated(self):
183    @patch(
184        "authentik.flows.views.executor.to_stage_response",
185        TO_STAGE_RESPONSE_MOCK,
186    )
187    def test_valid_flow_redirect_authenticated(self):
188        """Test valid flow with valid redirect destination, authenticated already"""
189        flow = create_test_flow()
190        flow.designation = FlowDesignation.AUTHENTICATION
191        flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
192        flow.save()
193        self.client.force_login(create_test_user())
194
195        dest = "/unique-string"
196        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
197
198        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
199        self.assertEqual(response.status_code, 302)
200        self.assertEqual(response.url, "/unique-string")

Test valid flow with valid redirect destination, authenticated already

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_valid_flow_invalid_redirect(self):
202    @patch(
203        "authentik.flows.views.executor.to_stage_response",
204        TO_STAGE_RESPONSE_MOCK,
205    )
206    def test_valid_flow_invalid_redirect(self):
207        """Test valid flow redirect with an invalid URL"""
208        flow = create_test_flow()
209
210        dest = "http://something.example.com/unique-string"
211        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
212
213        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
214        self.assertEqual(response.status_code, 200)
215        self.assertStageResponse(
216            response,
217            flow,
218            component="ak-stage-access-denied",
219            error_message="Invalid next URL",
220        )

Test valid flow redirect with an invalid URL

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_empty_flow(self):
222    @patch(
223        "authentik.flows.views.executor.to_stage_response",
224        TO_STAGE_RESPONSE_MOCK,
225    )
226    def test_invalid_empty_flow(self):
227        """Tests that an empty flow returns the correct error message"""
228        flow = create_test_flow(
229            FlowDesignation.AUTHENTICATION,
230        )
231
232        response = self.client.get(
233            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
234        )
235        self.assertEqual(response.status_code, 302)
236        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))

Tests that an empty flow returns the correct error message

def test_multi_stage_flow(self):
238    def test_multi_stage_flow(self):
239        """Test a full flow with multiple stages"""
240        flow = create_test_flow(
241            FlowDesignation.AUTHENTICATION,
242        )
243        FlowStageBinding.objects.create(
244            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
245        )
246        FlowStageBinding.objects.create(
247            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1
248        )
249
250        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
251        # First Request, start planning, renders form
252        response = self.client.get(exec_url)
253        self.assertEqual(response.status_code, 200)
254        # Check that two stages are in plan
255        session = self.client.session
256        plan: FlowPlan = session[SESSION_KEY_PLAN]
257        self.assertEqual(len(plan.bindings), 2)
258        # Second request, submit form, one stage left
259        response = self.client.post(exec_url)
260        # Second request redirects to the same URL
261        self.assertEqual(response.status_code, 302)
262        self.assertEqual(response.url, exec_url)
263        # Check that two stages are in plan
264        session = self.client.session
265        plan: FlowPlan = session[SESSION_KEY_PLAN]
266        self.assertEqual(len(plan.bindings), 1)

Test a full flow with multiple stages

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_reevaluate_remove_last(self):
268    @patch(
269        "authentik.flows.views.executor.to_stage_response",
270        TO_STAGE_RESPONSE_MOCK,
271    )
272    def test_reevaluate_remove_last(self):
273        """Test planner with re-evaluate (last stage is removed)"""
274        flow = create_test_flow(
275            FlowDesignation.AUTHENTICATION,
276        )
277        false_policy = DummyPolicy.objects.create(
278            name=generate_id(), result=False, wait_min=1, wait_max=2
279        )
280
281        binding = FlowStageBinding.objects.create(
282            target=flow,
283            stage=DummyStage.objects.create(name=generate_id()),
284            order=0,
285            evaluate_on_plan=True,
286            re_evaluate_policies=False,
287        )
288        binding2 = FlowStageBinding.objects.create(
289            target=flow,
290            stage=DummyStage.objects.create(name=generate_id()),
291            order=1,
292            re_evaluate_policies=True,
293        )
294
295        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
296
297        # Here we patch the dummy policy to evaluate to true so the stage is included
298        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
299            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
300            # First request, run the planner
301            response = self.client.get(exec_url)
302            self.assertEqual(response.status_code, 200)
303
304            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
305
306            self.assertEqual(plan.bindings[0], binding)
307            self.assertEqual(plan.bindings[1], binding2)
308
309            self.assertEqual(plan.markers[0].__class__, StageMarker)
310            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
311
312            # Second request, this passes the first dummy stage
313            response = self.client.post(exec_url)
314            self.assertEqual(response.status_code, 302)
315
316        # third request, this should trigger the re-evaluate
317        # We do this request without the patch, so the policy results in false
318        response = self.client.post(exec_url)
319        self.assertEqual(response.status_code, 302)
320        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))

Test planner with re-evaluate (last stage is removed)

def test_reevaluate_remove_middle(self):
322    def test_reevaluate_remove_middle(self):
323        """Test planner with re-evaluate (middle stage is removed)"""
324        flow = create_test_flow(
325            FlowDesignation.AUTHENTICATION,
326        )
327        false_policy = DummyPolicy.objects.create(
328            name=generate_id(), result=False, wait_min=1, wait_max=2
329        )
330
331        binding = FlowStageBinding.objects.create(
332            target=flow,
333            stage=DummyStage.objects.create(name=generate_id()),
334            order=0,
335            evaluate_on_plan=True,
336            re_evaluate_policies=False,
337        )
338        binding2 = FlowStageBinding.objects.create(
339            target=flow,
340            stage=DummyStage.objects.create(name=generate_id()),
341            order=1,
342            re_evaluate_policies=True,
343        )
344        binding3 = FlowStageBinding.objects.create(
345            target=flow,
346            stage=DummyStage.objects.create(name=generate_id()),
347            order=2,
348            evaluate_on_plan=True,
349            re_evaluate_policies=False,
350        )
351
352        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
353
354        # Here we patch the dummy policy to evaluate to true so the stage is included
355        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
356            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
357            # First request, run the planner
358            response = self.client.get(exec_url)
359
360            self.assertEqual(response.status_code, 200)
361            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
362
363            self.assertEqual(plan.bindings[0], binding)
364            self.assertEqual(plan.bindings[1], binding2)
365            self.assertEqual(plan.bindings[2], binding3)
366
367            self.assertEqual(plan.markers[0].__class__, StageMarker)
368            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
369            self.assertEqual(plan.markers[2].__class__, StageMarker)
370
371            # Second request, this passes the first dummy stage
372            response = self.client.post(exec_url)
373            self.assertEqual(response.status_code, 302)
374
375            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
376
377            self.assertEqual(plan.bindings[0], binding2)
378            self.assertEqual(plan.bindings[1], binding3)
379
380            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
381            self.assertEqual(plan.markers[1].__class__, StageMarker)
382
383        # third request, this should trigger the re-evaluate
384        # We do this request without the patch, so the policy results in false
385        response = self.client.post(exec_url)
386        self.assertEqual(response.status_code, 200)
387        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test planner with re-evaluate (middle stage is removed)

def test_reevaluate_keep(self):
389    def test_reevaluate_keep(self):
390        """Test planner with re-evaluate (everything is kept)"""
391        flow = create_test_flow(
392            FlowDesignation.AUTHENTICATION,
393        )
394        true_policy = DummyPolicy.objects.create(
395            name=generate_id(), result=True, wait_min=1, wait_max=2
396        )
397
398        binding = FlowStageBinding.objects.create(
399            target=flow,
400            stage=DummyStage.objects.create(name=generate_id()),
401            order=0,
402            evaluate_on_plan=True,
403            re_evaluate_policies=False,
404        )
405        binding2 = FlowStageBinding.objects.create(
406            target=flow,
407            stage=DummyStage.objects.create(name=generate_id()),
408            order=1,
409            re_evaluate_policies=True,
410        )
411        binding3 = FlowStageBinding.objects.create(
412            target=flow,
413            stage=DummyStage.objects.create(name=generate_id()),
414            order=2,
415            evaluate_on_plan=True,
416            re_evaluate_policies=False,
417        )
418
419        PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0)
420
421        # Here we patch the dummy policy to evaluate to true so the stage is included
422        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
423            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
424            # First request, run the planner
425            response = self.client.get(exec_url)
426
427            self.assertEqual(response.status_code, 200)
428            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
429
430            self.assertEqual(plan.bindings[0], binding)
431            self.assertEqual(plan.bindings[1], binding2)
432            self.assertEqual(plan.bindings[2], binding3)
433
434            self.assertEqual(plan.markers[0].__class__, StageMarker)
435            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
436            self.assertEqual(plan.markers[2].__class__, StageMarker)
437
438            # Second request, this passes the first dummy stage
439            response = self.client.post(exec_url)
440            self.assertEqual(response.status_code, 302)
441
442            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
443
444            self.assertEqual(plan.bindings[0], binding2)
445            self.assertEqual(plan.bindings[1], binding3)
446
447            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
448            self.assertEqual(plan.markers[1].__class__, StageMarker)
449
450            # Third request, this passes the first dummy stage
451            response = self.client.post(exec_url)
452            self.assertEqual(response.status_code, 302)
453
454            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
455
456            self.assertEqual(plan.bindings[0], binding3)
457
458            self.assertEqual(plan.markers[0].__class__, StageMarker)
459
460        # third request, this should trigger the re-evaluate
461        # We do this request without the patch, so the policy results in false
462        response = self.client.post(exec_url)
463        self.assertEqual(response.status_code, 200)
464        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test planner with re-evaluate (everything is kept)

def test_reevaluate_remove_consecutive(self):
466    def test_reevaluate_remove_consecutive(self):
467        """Test planner with re-evaluate (consecutive stages are removed)"""
468        flow = create_test_flow(
469            FlowDesignation.AUTHENTICATION,
470        )
471        false_policy = DummyPolicy.objects.create(
472            name=generate_id(), result=False, wait_min=1, wait_max=2
473        )
474
475        binding = FlowStageBinding.objects.create(
476            target=flow,
477            stage=DummyStage.objects.create(name=generate_id()),
478            order=0,
479            evaluate_on_plan=True,
480            re_evaluate_policies=False,
481        )
482        binding2 = FlowStageBinding.objects.create(
483            target=flow,
484            stage=DummyStage.objects.create(name=generate_id()),
485            order=1,
486            re_evaluate_policies=True,
487        )
488        binding3 = FlowStageBinding.objects.create(
489            target=flow,
490            stage=DummyStage.objects.create(name=generate_id()),
491            order=2,
492            re_evaluate_policies=True,
493        )
494        binding4 = FlowStageBinding.objects.create(
495            target=flow,
496            stage=DummyStage.objects.create(name=generate_id()),
497            order=2,
498            evaluate_on_plan=True,
499            re_evaluate_policies=False,
500        )
501
502        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
503        PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0)
504
505        # Here we patch the dummy policy to evaluate to true so the stage is included
506        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
507            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
508            # First request, run the planner
509            response = self.client.get(exec_url)
510            self.assertEqual(response.status_code, 200)
511            self.assertStageResponse(response, flow, component="ak-stage-dummy")
512
513            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
514
515            self.assertEqual(plan.bindings[0], binding)
516            self.assertEqual(plan.bindings[1], binding2)
517            self.assertEqual(plan.bindings[2], binding3)
518            self.assertEqual(plan.bindings[3], binding4)
519
520            self.assertEqual(plan.markers[0].__class__, StageMarker)
521            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
522            self.assertEqual(plan.markers[2].__class__, ReevaluateMarker)
523            self.assertEqual(plan.markers[3].__class__, StageMarker)
524
525        # Second request, this passes the first dummy stage
526        response = self.client.post(exec_url)
527        self.assertEqual(response.status_code, 302)
528
529        # third request, this should trigger the re-evaluate
530        # A get request will evaluate the policies and this will return stage 4
531        # but it won't save it, hence we can't check the plan
532        response = self.client.get(exec_url)
533        self.assertStageResponse(response, flow, component="ak-stage-dummy")
534
535        # fourth request, this confirms the last stage (dummy4)
536        # We do this request without the patch, so the policy results in false
537        response = self.client.post(exec_url)
538        self.assertEqual(response.status_code, 200)
539        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test planner with re-evaluate (consecutive stages are removed)

def test_stageview_user_identifier(self):
541    def test_stageview_user_identifier(self):
542        """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER"""
543        flow = create_test_flow(
544            FlowDesignation.AUTHENTICATION,
545        )
546        FlowStageBinding.objects.create(
547            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
548        )
549
550        ident = "test-identifier"
551
552        user = User.objects.create(username="test-user")
553        request = self.request_factory.get(
554            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
555        )
556        request.user = user
557        planner = FlowPlanner(flow)
558        plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident})
559
560        executor = FlowExecutorView()
561        executor.plan = plan
562        executor.flow = flow
563
564        stage_view = StageView(executor)
565        self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username)

Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER

def test_invalid_restart(self):
567    def test_invalid_restart(self):
568        """Test flow that restarts on invalid entry"""
569        flow = create_test_flow(
570            FlowDesignation.AUTHENTICATION,
571        )
572        # Stage 0 is a deny stage that is added dynamically
573        # when the reputation policy says so
574        deny_stage = DenyStage.objects.create(name=generate_id())
575        reputation_policy = ReputationPolicy.objects.create(
576            name=generate_id(), threshold=-1, check_ip=False
577        )
578        deny_binding = FlowStageBinding.objects.create(
579            target=flow,
580            stage=deny_stage,
581            order=0,
582            evaluate_on_plan=False,
583            re_evaluate_policies=True,
584        )
585        PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0)
586
587        # Stage 1 is an identification stage
588        ident_stage = IdentificationStage.objects.create(
589            name=generate_id(),
590            user_fields=[UserFields.E_MAIL],
591            pretend_user_exists=False,
592        )
593        FlowStageBinding.objects.create(
594            target=flow,
595            stage=ident_stage,
596            order=1,
597            invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT,
598        )
599        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
600        # First request, run the planner
601        response = self.client.get(exec_url)
602        self.assertStageResponse(
603            response,
604            flow,
605            component="ak-stage-identification",
606            password_fields=False,
607            primary_action="Log in",
608            sources=[],
609            show_source_labels=False,
610            user_fields=[UserFields.E_MAIL],
611        )
612        response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True)
613        self.assertStageResponse(response, flow, component="ak-stage-access-denied")

Test flow that restarts on invalid entry

def test_re_evaluate_group_binding(self):
615    def test_re_evaluate_group_binding(self):
616        """Test re-evaluate stage binding that has a policy binding to a group"""
617        flow = create_test_flow()
618
619        user_group_membership = create_test_user()
620        user_direct_binding = create_test_user()
621        user_other = create_test_user()
622
623        group_a = Group.objects.create(name=generate_id())
624        user_group_membership.groups.add(group_a)
625
626        # Stage 0 is an identification stage
627        ident_stage = IdentificationStage.objects.create(
628            name=generate_id(),
629            user_fields=[UserFields.USERNAME],
630            pretend_user_exists=False,
631        )
632        FlowStageBinding.objects.create(
633            target=flow,
634            stage=ident_stage,
635            order=0,
636        )
637
638        # Stage 1 is a dummy stage that is only shown for users in group_a
639        dummy_stage = DummyStage.objects.create(name=generate_id())
640        dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1)
641        PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0)
642        PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0)
643
644        # Stage 2 is a deny stage that (in this case) only user_b will see
645        deny_stage = DenyStage.objects.create(name=generate_id())
646        FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2)
647
648        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
649
650        with self.subTest(f"Test user access through group: {user_group_membership}"):
651            self.client.logout()
652            # First request, run the planner
653            response = self.client.get(exec_url)
654            self.assertStageResponse(response, flow, component="ak-stage-identification")
655            response = self.client.post(
656                exec_url, {"uid_field": user_group_membership.username}, follow=True
657            )
658            self.assertStageResponse(response, flow, component="ak-stage-dummy")
659        with self.subTest(f"Test user access through user: {user_direct_binding}"):
660            self.client.logout()
661            # First request, run the planner
662            response = self.client.get(exec_url)
663            self.assertStageResponse(response, flow, component="ak-stage-identification")
664            response = self.client.post(
665                exec_url, {"uid_field": user_direct_binding.username}, follow=True
666            )
667            self.assertStageResponse(response, flow, component="ak-stage-dummy")
668        with self.subTest(f"Test user has no access: {user_other}"):
669            self.client.logout()
670            # First request, run the planner
671            response = self.client.get(exec_url)
672            self.assertStageResponse(response, flow, component="ak-stage-identification")
673            response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True)
674            self.assertStageResponse(response, flow, component="ak-stage-access-denied")

Test re-evaluate stage binding that has a policy binding to a group

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_json(self):
676    @patch(
677        "authentik.flows.views.executor.to_stage_response",
678        TO_STAGE_RESPONSE_MOCK,
679    )
680    def test_invalid_json(self):
681        """Test invalid JSON body"""
682        flow = create_test_flow()
683        FlowStageBinding.objects.create(
684            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
685        )
686        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
687
688        with override_settings(TEST=False, DEBUG=False):
689            self.client.logout()
690            response = self.client.post(url, data="{", content_type="application/json")
691            self.assertEqual(response.status_code, 200)
692
693        with self.assertRaises(ParseError):
694            self.client.logout()
695            response = self.client.post(url, data="{", content_type="application/json")
696            self.assertEqual(response.status_code, 200)

Test invalid JSON body

def test_cancel_next(self):
698    def test_cancel_next(self):
699        """Test cancel URL with ?next param set"""
700        flow = create_test_flow()
701
702        # Stage 0 is an identification stage
703        ident_stage = IdentificationStage.objects.create(
704            name=generate_id(),
705            user_fields=[UserFields.USERNAME],
706        )
707        FlowStageBinding.objects.create(
708            target=flow,
709            stage=ident_stage,
710            order=0,
711        )
712
713        # Stage 1 is a password stage
714        password_stage = PasswordStage.objects.create(name=generate_id(), backends=[])
715        FlowStageBinding.objects.create(
716            target=flow,
717            stage=password_stage,
718            order=1,
719        )
720        res = self.client.get(
721            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
722            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}"
723        )
724        self.assertStageResponse(res, flow, component="ak-stage-identification")
725
726        res = self.client.post(
727            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
728            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}",
729            data={"component": "ak-stage-identification", "uid_field": generate_id()},
730            follow=True,
731        )
732        self.assertEqual(res.status_code, 200)
733        self.assertStageResponse(
734            res,
735            flow,
736            flow_info={
737                "background": "/static/dist/assets/images/flow_background.jpg",
738                "background_themed_urls": None,
739                "cancel_url": "/flows/-/cancel/?next=%2Ffoo",
740                "layout": "stacked",
741                "title": flow.title,
742            },
743        )

Test cancel URL with ?next param set