authentik.flows.tests.test_executor

flow views tests

  1"""flow views tests"""
  2
  3from datetime import timedelta
  4from unittest.mock import MagicMock, PropertyMock, patch
  5from urllib.parse import urlencode
  6
  7from django.http import HttpRequest, HttpResponse
  8from django.test import override_settings
  9from django.test.client import RequestFactory
 10from django.urls import reverse
 11from django.utils.timezone import now
 12from rest_framework.exceptions import ParseError
 13
 14from authentik.core.models import Group, User
 15from authentik.core.tests.utils import create_test_flow, create_test_user
 16from authentik.flows.markers import ReevaluateMarker, StageMarker
 17from authentik.flows.models import (
 18    FlowAuthenticationRequirement,
 19    FlowDeniedAction,
 20    FlowDesignation,
 21    FlowStageBinding,
 22    FlowToken,
 23    InvalidResponseAction,
 24)
 25from authentik.flows.planner import FlowPlan, FlowPlanner
 26from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView
 27from authentik.flows.tests import FlowTestCase
 28from authentik.flows.views.executor import (
 29    NEXT_ARG_NAME,
 30    QS_KEY_TOKEN,
 31    QS_QUERY,
 32    SESSION_KEY_PLAN,
 33    FlowExecutorView,
 34)
 35from authentik.lib.generators import generate_id
 36from authentik.policies.dummy.models import DummyPolicy
 37from authentik.policies.models import PolicyBinding
 38from authentik.policies.reputation.models import ReputationPolicy
 39from authentik.policies.types import PolicyResult
 40from authentik.stages.deny.models import DenyStage
 41from authentik.stages.dummy.models import DummyStage
 42from authentik.stages.identification.models import IdentificationStage, UserFields
 43from authentik.stages.password.models import PasswordStage
 44
 45POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False, "foo"))
 46POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True))
 47
 48
 49def to_stage_response(request: HttpRequest, source: HttpResponse):
 50    """Mock for to_stage_response that returns the original response, so we can check
 51    inheritance and member attributes"""
 52    return source
 53
 54
 55TO_STAGE_RESPONSE_MOCK = MagicMock(side_effect=to_stage_response)
 56
 57
 58class TestFlowExecutor(FlowTestCase):
 59    """Test executor"""
 60
 61    def setUp(self):
 62        self.request_factory = RequestFactory()
 63
 64    @patch(
 65        "authentik.flows.views.executor.to_stage_response",
 66        TO_STAGE_RESPONSE_MOCK,
 67    )
 68    def test_existing_plan_diff_flow(self):
 69        """Check that a plan for a different flow cancels the current plan"""
 70        flow = create_test_flow(
 71            FlowDesignation.AUTHENTICATION,
 72        )
 73        stage = DummyStage.objects.create(name=generate_id())
 74        binding = FlowStageBinding(target=flow, stage=stage, order=0)
 75        plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()])
 76        session = self.client.session
 77        session[SESSION_KEY_PLAN] = plan
 78        session.save()
 79
 80        cancel_mock = MagicMock()
 81        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock):
 82            response = self.client.get(
 83                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 84            )
 85            self.assertEqual(response.status_code, 302)
 86            self.assertEqual(cancel_mock.call_count, 2)
 87
 88    @patch(
 89        "authentik.flows.views.executor.to_stage_response",
 90        TO_STAGE_RESPONSE_MOCK,
 91    )
 92    @patch(
 93        "authentik.policies.engine.PolicyEngine.result",
 94        POLICY_RETURN_FALSE,
 95    )
 96    def test_invalid_non_applicable_flow(self):
 97        """Tests that a non-applicable flow returns the correct error message"""
 98        flow = create_test_flow(
 99            FlowDesignation.AUTHENTICATION,
100        )
101
102        response = self.client.get(
103            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
104        )
105        self.assertStageResponse(
106            response,
107            flow=flow,
108            error_message="foo",
109            component="ak-stage-access-denied",
110        )
111
112    @patch(
113        "authentik.flows.views.executor.to_stage_response",
114        TO_STAGE_RESPONSE_MOCK,
115    )
116    @patch(
117        "authentik.policies.engine.PolicyEngine.result",
118        POLICY_RETURN_FALSE,
119    )
120    def test_invalid_non_applicable_flow_continue(self):
121        """Tests that a non-applicable flow that should redirect"""
122        flow = create_test_flow(
123            FlowDesignation.AUTHENTICATION,
124            denied_action=FlowDeniedAction.CONTINUE,
125        )
126
127        response = self.client.get(
128            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
129        )
130        self.assertEqual(response.status_code, 302)
131        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
132
133    @patch(
134        "authentik.flows.views.executor.to_stage_response",
135        TO_STAGE_RESPONSE_MOCK,
136    )
137    def test_invalid_flow_redirect(self):
138        """Test invalid flow with valid redirect destination"""
139        flow = create_test_flow(
140            FlowDesignation.AUTHENTICATION,
141        )
142
143        dest = "/unique-string"
144        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
145        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
146        self.assertEqual(response.status_code, 302)
147        self.assertEqual(response.url, "/unique-string")
148
149    @patch(
150        "authentik.flows.views.executor.to_stage_response",
151        TO_STAGE_RESPONSE_MOCK,
152    )
153    def test_invalid_flow_invalid_redirect(self):
154        """Test invalid flow redirect with an invalid URL"""
155        flow = create_test_flow(
156            FlowDesignation.AUTHENTICATION,
157        )
158
159        dest = "http://something.example.com/unique-string"
160        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
161
162        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
163        self.assertEqual(response.status_code, 200)
164        self.assertStageResponse(
165            response,
166            flow,
167            component="ak-stage-access-denied",
168            error_message="Invalid next URL",
169        )
170
171    @patch(
172        "authentik.flows.views.executor.to_stage_response",
173        TO_STAGE_RESPONSE_MOCK,
174    )
175    def test_valid_flow_redirect(self):
176        """Test valid flow with valid redirect destination"""
177        flow = create_test_flow()
178
179        dest = "/unique-string"
180        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
181
182        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
183        self.assertEqual(response.status_code, 302)
184        self.assertEqual(response.url, "/unique-string")
185
186    @patch(
187        "authentik.flows.views.executor.to_stage_response",
188        TO_STAGE_RESPONSE_MOCK,
189    )
190    def test_valid_flow_redirect_authenticated(self):
191        """Test valid flow with valid redirect destination, authenticated already"""
192        flow = create_test_flow()
193        flow.designation = FlowDesignation.AUTHENTICATION
194        flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
195        flow.save()
196        self.client.force_login(create_test_user())
197
198        dest = "/unique-string"
199        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
200
201        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
202        self.assertEqual(response.status_code, 302)
203        self.assertEqual(response.url, "/unique-string")
204
205    @patch(
206        "authentik.flows.views.executor.to_stage_response",
207        TO_STAGE_RESPONSE_MOCK,
208    )
209    def test_valid_flow_invalid_redirect(self):
210        """Test valid flow redirect with an invalid URL"""
211        flow = create_test_flow()
212
213        dest = "http://something.example.com/unique-string"
214        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
215
216        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
217        self.assertEqual(response.status_code, 200)
218        self.assertStageResponse(
219            response,
220            flow,
221            component="ak-stage-access-denied",
222            error_message="Invalid next URL",
223        )
224
225    @patch(
226        "authentik.flows.views.executor.to_stage_response",
227        TO_STAGE_RESPONSE_MOCK,
228    )
229    def test_invalid_empty_flow(self):
230        """Tests that an empty flow returns the correct error message"""
231        flow = create_test_flow(
232            FlowDesignation.AUTHENTICATION,
233        )
234
235        response = self.client.get(
236            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
237        )
238        self.assertEqual(response.status_code, 302)
239        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
240
241    def test_multi_stage_flow(self):
242        """Test a full flow with multiple stages"""
243        flow = create_test_flow(
244            FlowDesignation.AUTHENTICATION,
245        )
246        FlowStageBinding.objects.create(
247            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
248        )
249        FlowStageBinding.objects.create(
250            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1
251        )
252
253        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
254        # First Request, start planning, renders form
255        response = self.client.get(exec_url)
256        self.assertEqual(response.status_code, 200)
257        # Check that two stages are in plan
258        session = self.client.session
259        plan: FlowPlan = session[SESSION_KEY_PLAN]
260        self.assertEqual(len(plan.bindings), 2)
261        # Second request, submit form, one stage left
262        response = self.client.post(exec_url)
263        # Second request redirects to the same URL
264        self.assertEqual(response.status_code, 302)
265        self.assertEqual(response.url, exec_url)
266        # Check that two stages are in plan
267        session = self.client.session
268        plan: FlowPlan = session[SESSION_KEY_PLAN]
269        self.assertEqual(len(plan.bindings), 1)
270
271    @patch(
272        "authentik.flows.views.executor.to_stage_response",
273        TO_STAGE_RESPONSE_MOCK,
274    )
275    def test_reevaluate_remove_last(self):
276        """Test planner with re-evaluate (last stage is removed)"""
277        flow = create_test_flow(
278            FlowDesignation.AUTHENTICATION,
279        )
280        false_policy = DummyPolicy.objects.create(
281            name=generate_id(), result=False, wait_min=1, wait_max=2
282        )
283
284        binding = FlowStageBinding.objects.create(
285            target=flow,
286            stage=DummyStage.objects.create(name=generate_id()),
287            order=0,
288            evaluate_on_plan=True,
289            re_evaluate_policies=False,
290        )
291        binding2 = FlowStageBinding.objects.create(
292            target=flow,
293            stage=DummyStage.objects.create(name=generate_id()),
294            order=1,
295            re_evaluate_policies=True,
296        )
297
298        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
299
300        # Here we patch the dummy policy to evaluate to true so the stage is included
301        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
302            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
303            # First request, run the planner
304            response = self.client.get(exec_url)
305            self.assertEqual(response.status_code, 200)
306
307            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
308
309            self.assertEqual(plan.bindings[0], binding)
310            self.assertEqual(plan.bindings[1], binding2)
311
312            self.assertEqual(plan.markers[0].__class__, StageMarker)
313            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
314
315            # Second request, this passes the first dummy stage
316            response = self.client.post(exec_url)
317            self.assertEqual(response.status_code, 302)
318
319        # third request, this should trigger the re-evaluate
320        # We do this request without the patch, so the policy results in false
321        response = self.client.post(exec_url)
322        self.assertEqual(response.status_code, 302)
323        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
324
325    def test_reevaluate_remove_middle(self):
326        """Test planner with re-evaluate (middle stage is removed)"""
327        flow = create_test_flow(
328            FlowDesignation.AUTHENTICATION,
329        )
330        false_policy = DummyPolicy.objects.create(
331            name=generate_id(), result=False, wait_min=1, wait_max=2
332        )
333
334        binding = FlowStageBinding.objects.create(
335            target=flow,
336            stage=DummyStage.objects.create(name=generate_id()),
337            order=0,
338            evaluate_on_plan=True,
339            re_evaluate_policies=False,
340        )
341        binding2 = FlowStageBinding.objects.create(
342            target=flow,
343            stage=DummyStage.objects.create(name=generate_id()),
344            order=1,
345            re_evaluate_policies=True,
346        )
347        binding3 = FlowStageBinding.objects.create(
348            target=flow,
349            stage=DummyStage.objects.create(name=generate_id()),
350            order=2,
351            evaluate_on_plan=True,
352            re_evaluate_policies=False,
353        )
354
355        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
356
357        # Here we patch the dummy policy to evaluate to true so the stage is included
358        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
359            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
360            # First request, run the planner
361            response = self.client.get(exec_url)
362
363            self.assertEqual(response.status_code, 200)
364            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
365
366            self.assertEqual(plan.bindings[0], binding)
367            self.assertEqual(plan.bindings[1], binding2)
368            self.assertEqual(plan.bindings[2], binding3)
369
370            self.assertEqual(plan.markers[0].__class__, StageMarker)
371            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
372            self.assertEqual(plan.markers[2].__class__, StageMarker)
373
374            # Second request, this passes the first dummy stage
375            response = self.client.post(exec_url)
376            self.assertEqual(response.status_code, 302)
377
378            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
379
380            self.assertEqual(plan.bindings[0], binding2)
381            self.assertEqual(plan.bindings[1], binding3)
382
383            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
384            self.assertEqual(plan.markers[1].__class__, StageMarker)
385
386        # third request, this should trigger the re-evaluate
387        # We do this request without the patch, so the policy results in false
388        response = self.client.post(exec_url)
389        self.assertEqual(response.status_code, 200)
390        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
391
392    def test_reevaluate_keep(self):
393        """Test planner with re-evaluate (everything is kept)"""
394        flow = create_test_flow(
395            FlowDesignation.AUTHENTICATION,
396        )
397        true_policy = DummyPolicy.objects.create(
398            name=generate_id(), result=True, wait_min=1, wait_max=2
399        )
400
401        binding = FlowStageBinding.objects.create(
402            target=flow,
403            stage=DummyStage.objects.create(name=generate_id()),
404            order=0,
405            evaluate_on_plan=True,
406            re_evaluate_policies=False,
407        )
408        binding2 = FlowStageBinding.objects.create(
409            target=flow,
410            stage=DummyStage.objects.create(name=generate_id()),
411            order=1,
412            re_evaluate_policies=True,
413        )
414        binding3 = FlowStageBinding.objects.create(
415            target=flow,
416            stage=DummyStage.objects.create(name=generate_id()),
417            order=2,
418            evaluate_on_plan=True,
419            re_evaluate_policies=False,
420        )
421
422        PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0)
423
424        # Here we patch the dummy policy to evaluate to true so the stage is included
425        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
426            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
427            # First request, run the planner
428            response = self.client.get(exec_url)
429
430            self.assertEqual(response.status_code, 200)
431            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
432
433            self.assertEqual(plan.bindings[0], binding)
434            self.assertEqual(plan.bindings[1], binding2)
435            self.assertEqual(plan.bindings[2], binding3)
436
437            self.assertEqual(plan.markers[0].__class__, StageMarker)
438            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
439            self.assertEqual(plan.markers[2].__class__, StageMarker)
440
441            # Second request, this passes the first dummy stage
442            response = self.client.post(exec_url)
443            self.assertEqual(response.status_code, 302)
444
445            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
446
447            self.assertEqual(plan.bindings[0], binding2)
448            self.assertEqual(plan.bindings[1], binding3)
449
450            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
451            self.assertEqual(plan.markers[1].__class__, StageMarker)
452
453            # Third request, this passes the first dummy stage
454            response = self.client.post(exec_url)
455            self.assertEqual(response.status_code, 302)
456
457            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
458
459            self.assertEqual(plan.bindings[0], binding3)
460
461            self.assertEqual(plan.markers[0].__class__, StageMarker)
462
463        # third request, this should trigger the re-evaluate
464        # We do this request without the patch, so the policy results in false
465        response = self.client.post(exec_url)
466        self.assertEqual(response.status_code, 200)
467        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
468
469    def test_reevaluate_remove_consecutive(self):
470        """Test planner with re-evaluate (consecutive stages are removed)"""
471        flow = create_test_flow(
472            FlowDesignation.AUTHENTICATION,
473        )
474        false_policy = DummyPolicy.objects.create(
475            name=generate_id(), result=False, wait_min=1, wait_max=2
476        )
477
478        binding = FlowStageBinding.objects.create(
479            target=flow,
480            stage=DummyStage.objects.create(name=generate_id()),
481            order=0,
482            evaluate_on_plan=True,
483            re_evaluate_policies=False,
484        )
485        binding2 = FlowStageBinding.objects.create(
486            target=flow,
487            stage=DummyStage.objects.create(name=generate_id()),
488            order=1,
489            re_evaluate_policies=True,
490        )
491        binding3 = FlowStageBinding.objects.create(
492            target=flow,
493            stage=DummyStage.objects.create(name=generate_id()),
494            order=2,
495            re_evaluate_policies=True,
496        )
497        binding4 = FlowStageBinding.objects.create(
498            target=flow,
499            stage=DummyStage.objects.create(name=generate_id()),
500            order=2,
501            evaluate_on_plan=True,
502            re_evaluate_policies=False,
503        )
504
505        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
506        PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0)
507
508        # Here we patch the dummy policy to evaluate to true so the stage is included
509        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
510            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
511            # First request, run the planner
512            response = self.client.get(exec_url)
513            self.assertEqual(response.status_code, 200)
514            self.assertStageResponse(response, flow, component="ak-stage-dummy")
515
516            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
517
518            self.assertEqual(plan.bindings[0], binding)
519            self.assertEqual(plan.bindings[1], binding2)
520            self.assertEqual(plan.bindings[2], binding3)
521            self.assertEqual(plan.bindings[3], binding4)
522
523            self.assertEqual(plan.markers[0].__class__, StageMarker)
524            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
525            self.assertEqual(plan.markers[2].__class__, ReevaluateMarker)
526            self.assertEqual(plan.markers[3].__class__, StageMarker)
527
528        # Second request, this passes the first dummy stage
529        response = self.client.post(exec_url)
530        self.assertEqual(response.status_code, 302)
531
532        # third request, this should trigger the re-evaluate
533        # A get request will evaluate the policies and this will return stage 4
534        # but it won't save it, hence we can't check the plan
535        response = self.client.get(exec_url)
536        self.assertStageResponse(response, flow, component="ak-stage-dummy")
537
538        # fourth request, this confirms the last stage (dummy4)
539        # We do this request without the patch, so the policy results in false
540        response = self.client.post(exec_url)
541        self.assertEqual(response.status_code, 200)
542        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
543
544    def test_stageview_user_identifier(self):
545        """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER"""
546        flow = create_test_flow(
547            FlowDesignation.AUTHENTICATION,
548        )
549        FlowStageBinding.objects.create(
550            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
551        )
552
553        ident = "test-identifier"
554
555        user = User.objects.create(username="test-user")
556        request = self.request_factory.get(
557            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
558        )
559        request.user = user
560        planner = FlowPlanner(flow)
561        plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident})
562
563        executor = FlowExecutorView()
564        executor.plan = plan
565        executor.flow = flow
566
567        stage_view = StageView(executor)
568        self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username)
569
570    def test_invalid_restart(self):
571        """Test flow that restarts on invalid entry"""
572        flow = create_test_flow(
573            FlowDesignation.AUTHENTICATION,
574        )
575        # Stage 0 is a deny stage that is added dynamically
576        # when the reputation policy says so
577        deny_stage = DenyStage.objects.create(name=generate_id())
578        reputation_policy = ReputationPolicy.objects.create(
579            name=generate_id(), threshold=-1, check_ip=False
580        )
581        deny_binding = FlowStageBinding.objects.create(
582            target=flow,
583            stage=deny_stage,
584            order=0,
585            evaluate_on_plan=False,
586            re_evaluate_policies=True,
587        )
588        PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0)
589
590        # Stage 1 is an identification stage
591        ident_stage = IdentificationStage.objects.create(
592            name=generate_id(),
593            user_fields=[UserFields.E_MAIL],
594            pretend_user_exists=False,
595        )
596        FlowStageBinding.objects.create(
597            target=flow,
598            stage=ident_stage,
599            order=1,
600            invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT,
601        )
602        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
603        # First request, run the planner
604        response = self.client.get(exec_url)
605        self.assertStageResponse(
606            response,
607            flow,
608            component="ak-stage-identification",
609            password_fields=False,
610            primary_action="Log in",
611            sources=[],
612            show_source_labels=False,
613            user_fields=[UserFields.E_MAIL],
614        )
615        response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True)
616        self.assertStageResponse(response, flow, component="ak-stage-access-denied")
617
618    def test_re_evaluate_group_binding(self):
619        """Test re-evaluate stage binding that has a policy binding to a group"""
620        flow = create_test_flow()
621
622        user_group_membership = create_test_user()
623        user_direct_binding = create_test_user()
624        user_other = create_test_user()
625
626        group_a = Group.objects.create(name=generate_id())
627        user_group_membership.groups.add(group_a)
628
629        # Stage 0 is an identification stage
630        ident_stage = IdentificationStage.objects.create(
631            name=generate_id(),
632            user_fields=[UserFields.USERNAME],
633            pretend_user_exists=False,
634        )
635        FlowStageBinding.objects.create(
636            target=flow,
637            stage=ident_stage,
638            order=0,
639        )
640
641        # Stage 1 is a dummy stage that is only shown for users in group_a
642        dummy_stage = DummyStage.objects.create(name=generate_id())
643        dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1)
644        PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0)
645        PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0)
646
647        # Stage 2 is a deny stage that (in this case) only user_b will see
648        deny_stage = DenyStage.objects.create(name=generate_id())
649        FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2)
650
651        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
652
653        with self.subTest(f"Test user access through group: {user_group_membership}"):
654            self.client.logout()
655            # First request, run the planner
656            response = self.client.get(exec_url)
657            self.assertStageResponse(response, flow, component="ak-stage-identification")
658            response = self.client.post(
659                exec_url, {"uid_field": user_group_membership.username}, follow=True
660            )
661            self.assertStageResponse(response, flow, component="ak-stage-dummy")
662        with self.subTest(f"Test user access through user: {user_direct_binding}"):
663            self.client.logout()
664            # First request, run the planner
665            response = self.client.get(exec_url)
666            self.assertStageResponse(response, flow, component="ak-stage-identification")
667            response = self.client.post(
668                exec_url, {"uid_field": user_direct_binding.username}, follow=True
669            )
670            self.assertStageResponse(response, flow, component="ak-stage-dummy")
671        with self.subTest(f"Test user has no access: {user_other}"):
672            self.client.logout()
673            # First request, run the planner
674            response = self.client.get(exec_url)
675            self.assertStageResponse(response, flow, component="ak-stage-identification")
676            response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True)
677            self.assertStageResponse(response, flow, component="ak-stage-access-denied")
678
679    @patch(
680        "authentik.flows.views.executor.to_stage_response",
681        TO_STAGE_RESPONSE_MOCK,
682    )
683    def test_invalid_json(self):
684        """Test invalid JSON body"""
685        flow = create_test_flow()
686        FlowStageBinding.objects.create(
687            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
688        )
689        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
690
691        with override_settings(TEST=False, DEBUG=False):
692            self.client.logout()
693            response = self.client.post(url, data="{", content_type="application/json")
694            self.assertEqual(response.status_code, 200)
695
696        with self.assertRaises(ParseError):
697            self.client.logout()
698            response = self.client.post(url, data="{", content_type="application/json")
699            self.assertEqual(response.status_code, 200)
700
701    def test_cancel_next(self):
702        """Test cancel URL with ?next param set"""
703        flow = create_test_flow()
704
705        # Stage 0 is an identification stage
706        ident_stage = IdentificationStage.objects.create(
707            name=generate_id(),
708            user_fields=[UserFields.USERNAME],
709        )
710        FlowStageBinding.objects.create(
711            target=flow,
712            stage=ident_stage,
713            order=0,
714        )
715
716        # Stage 1 is a password stage
717        password_stage = PasswordStage.objects.create(name=generate_id(), backends=[])
718        FlowStageBinding.objects.create(
719            target=flow,
720            stage=password_stage,
721            order=1,
722        )
723        res = self.client.get(
724            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
725            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}"
726        )
727        self.assertStageResponse(res, flow, component="ak-stage-identification")
728
729        res = self.client.post(
730            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
731            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}",
732            data={"component": "ak-stage-identification", "uid_field": generate_id()},
733            follow=True,
734        )
735        self.assertEqual(res.status_code, 200)
736        self.assertStageResponse(
737            res,
738            flow,
739            flow_info={
740                "background": "/static/dist/assets/images/flow_background.jpg",
741                "background_themed_urls": None,
742                "cancel_url": "/flows/-/cancel/?next=%2Ffoo",
743                "layout": "stacked",
744                "title": flow.title,
745            },
746        )
747
748    @patch(
749        "authentik.flows.views.executor.to_stage_response",
750        TO_STAGE_RESPONSE_MOCK,
751    )
752    def test_expired_flow_token(self):
753        """Test that an expired flow token shows an appropriate error message"""
754        flow = create_test_flow(
755            FlowDesignation.RECOVERY,
756            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
757        )
758        user = create_test_user()
759        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[], markers=[])
760
761        token = FlowToken.objects.create(
762            user=user,
763            identifier=generate_id(),
764            flow=flow,
765            _plan=FlowToken.pickle(plan),
766            expires=now() - timedelta(hours=1),
767        )
768
769        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
770        response = self.client.get(
771            url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: token.key})})}"
772        )
773        self.assertStageResponse(
774            response,
775            flow,
776            component="ak-stage-access-denied",
777            error_message="This link is invalid or has expired. Please request a new one.",
778        )
779
780    @patch(
781        "authentik.flows.views.executor.to_stage_response",
782        TO_STAGE_RESPONSE_MOCK,
783    )
784    def test_invalid_flow_token_require_token(self):
785        """Test that an invalid/nonexistent token on a REQUIRE_TOKEN flow shows error"""
786        flow = create_test_flow(
787            FlowDesignation.RECOVERY,
788            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
789        )
790
791        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
792        response = self.client.get(
793            url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: 'invalid-token'})})}"
794        )
795        self.assertStageResponse(
796            response,
797            flow,
798            component="ak-stage-access-denied",
799            error_message="This link is invalid or has expired. Please request a new one.",
800        )
801
802    @patch(
803        "authentik.flows.views.executor.to_stage_response",
804        TO_STAGE_RESPONSE_MOCK,
805    )
806    def test_no_token_require_token(self):
807        """Test that accessing a REQUIRE_TOKEN flow without any token shows error"""
808        flow = create_test_flow(
809            FlowDesignation.RECOVERY,
810            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
811        )
812
813        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
814        response = self.client.get(url)
815        self.assertStageResponse(
816            response,
817            flow,
818            component="ak-stage-access-denied",
819            error_message="This link is invalid or has expired. Please request a new one.",
820        )
POLICY_RETURN_FALSE

A mock intended to be used as a property, or other descriptor, on a class. PropertyMock provides __get__ and __set__ methods so you can specify a return value when it is fetched.

Fetching a PropertyMock instance from an object calls the mock, with no args. Setting it calls the mock with the value being set.

POLICY_RETURN_TRUE = <MagicMock id='139655627171760'>
def to_stage_response( request: django.http.request.HttpRequest, source: django.http.response.HttpResponse):
50def to_stage_response(request: HttpRequest, source: HttpResponse):
51    """Mock for to_stage_response that returns the original response, so we can check
52    inheritance and member attributes"""
53    return source

Mock for to_stage_response that returns the original response, so we can check inheritance and member attributes

TO_STAGE_RESPONSE_MOCK = <MagicMock id='139655627172096'>
class TestFlowExecutor(authentik.flows.tests.FlowTestCase):
 59class TestFlowExecutor(FlowTestCase):
 60    """Test executor"""
 61
 62    def setUp(self):
 63        self.request_factory = RequestFactory()
 64
 65    @patch(
 66        "authentik.flows.views.executor.to_stage_response",
 67        TO_STAGE_RESPONSE_MOCK,
 68    )
 69    def test_existing_plan_diff_flow(self):
 70        """Check that a plan for a different flow cancels the current plan"""
 71        flow = create_test_flow(
 72            FlowDesignation.AUTHENTICATION,
 73        )
 74        stage = DummyStage.objects.create(name=generate_id())
 75        binding = FlowStageBinding(target=flow, stage=stage, order=0)
 76        plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()])
 77        session = self.client.session
 78        session[SESSION_KEY_PLAN] = plan
 79        session.save()
 80
 81        cancel_mock = MagicMock()
 82        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock):
 83            response = self.client.get(
 84                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 85            )
 86            self.assertEqual(response.status_code, 302)
 87            self.assertEqual(cancel_mock.call_count, 2)
 88
 89    @patch(
 90        "authentik.flows.views.executor.to_stage_response",
 91        TO_STAGE_RESPONSE_MOCK,
 92    )
 93    @patch(
 94        "authentik.policies.engine.PolicyEngine.result",
 95        POLICY_RETURN_FALSE,
 96    )
 97    def test_invalid_non_applicable_flow(self):
 98        """Tests that a non-applicable flow returns the correct error message"""
 99        flow = create_test_flow(
100            FlowDesignation.AUTHENTICATION,
101        )
102
103        response = self.client.get(
104            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
105        )
106        self.assertStageResponse(
107            response,
108            flow=flow,
109            error_message="foo",
110            component="ak-stage-access-denied",
111        )
112
113    @patch(
114        "authentik.flows.views.executor.to_stage_response",
115        TO_STAGE_RESPONSE_MOCK,
116    )
117    @patch(
118        "authentik.policies.engine.PolicyEngine.result",
119        POLICY_RETURN_FALSE,
120    )
121    def test_invalid_non_applicable_flow_continue(self):
122        """Tests that a non-applicable flow that should redirect"""
123        flow = create_test_flow(
124            FlowDesignation.AUTHENTICATION,
125            denied_action=FlowDeniedAction.CONTINUE,
126        )
127
128        response = self.client.get(
129            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
130        )
131        self.assertEqual(response.status_code, 302)
132        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
133
134    @patch(
135        "authentik.flows.views.executor.to_stage_response",
136        TO_STAGE_RESPONSE_MOCK,
137    )
138    def test_invalid_flow_redirect(self):
139        """Test invalid flow with valid redirect destination"""
140        flow = create_test_flow(
141            FlowDesignation.AUTHENTICATION,
142        )
143
144        dest = "/unique-string"
145        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
146        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
147        self.assertEqual(response.status_code, 302)
148        self.assertEqual(response.url, "/unique-string")
149
150    @patch(
151        "authentik.flows.views.executor.to_stage_response",
152        TO_STAGE_RESPONSE_MOCK,
153    )
154    def test_invalid_flow_invalid_redirect(self):
155        """Test invalid flow redirect with an invalid URL"""
156        flow = create_test_flow(
157            FlowDesignation.AUTHENTICATION,
158        )
159
160        dest = "http://something.example.com/unique-string"
161        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
162
163        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
164        self.assertEqual(response.status_code, 200)
165        self.assertStageResponse(
166            response,
167            flow,
168            component="ak-stage-access-denied",
169            error_message="Invalid next URL",
170        )
171
172    @patch(
173        "authentik.flows.views.executor.to_stage_response",
174        TO_STAGE_RESPONSE_MOCK,
175    )
176    def test_valid_flow_redirect(self):
177        """Test valid flow with valid redirect destination"""
178        flow = create_test_flow()
179
180        dest = "/unique-string"
181        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
182
183        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
184        self.assertEqual(response.status_code, 302)
185        self.assertEqual(response.url, "/unique-string")
186
187    @patch(
188        "authentik.flows.views.executor.to_stage_response",
189        TO_STAGE_RESPONSE_MOCK,
190    )
191    def test_valid_flow_redirect_authenticated(self):
192        """Test valid flow with valid redirect destination, authenticated already"""
193        flow = create_test_flow()
194        flow.designation = FlowDesignation.AUTHENTICATION
195        flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
196        flow.save()
197        self.client.force_login(create_test_user())
198
199        dest = "/unique-string"
200        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
201
202        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
203        self.assertEqual(response.status_code, 302)
204        self.assertEqual(response.url, "/unique-string")
205
206    @patch(
207        "authentik.flows.views.executor.to_stage_response",
208        TO_STAGE_RESPONSE_MOCK,
209    )
210    def test_valid_flow_invalid_redirect(self):
211        """Test valid flow redirect with an invalid URL"""
212        flow = create_test_flow()
213
214        dest = "http://something.example.com/unique-string"
215        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
216
217        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
218        self.assertEqual(response.status_code, 200)
219        self.assertStageResponse(
220            response,
221            flow,
222            component="ak-stage-access-denied",
223            error_message="Invalid next URL",
224        )
225
226    @patch(
227        "authentik.flows.views.executor.to_stage_response",
228        TO_STAGE_RESPONSE_MOCK,
229    )
230    def test_invalid_empty_flow(self):
231        """Tests that an empty flow returns the correct error message"""
232        flow = create_test_flow(
233            FlowDesignation.AUTHENTICATION,
234        )
235
236        response = self.client.get(
237            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
238        )
239        self.assertEqual(response.status_code, 302)
240        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
241
242    def test_multi_stage_flow(self):
243        """Test a full flow with multiple stages"""
244        flow = create_test_flow(
245            FlowDesignation.AUTHENTICATION,
246        )
247        FlowStageBinding.objects.create(
248            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
249        )
250        FlowStageBinding.objects.create(
251            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1
252        )
253
254        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
255        # First Request, start planning, renders form
256        response = self.client.get(exec_url)
257        self.assertEqual(response.status_code, 200)
258        # Check that two stages are in plan
259        session = self.client.session
260        plan: FlowPlan = session[SESSION_KEY_PLAN]
261        self.assertEqual(len(plan.bindings), 2)
262        # Second request, submit form, one stage left
263        response = self.client.post(exec_url)
264        # Second request redirects to the same URL
265        self.assertEqual(response.status_code, 302)
266        self.assertEqual(response.url, exec_url)
267        # Check that two stages are in plan
268        session = self.client.session
269        plan: FlowPlan = session[SESSION_KEY_PLAN]
270        self.assertEqual(len(plan.bindings), 1)
271
272    @patch(
273        "authentik.flows.views.executor.to_stage_response",
274        TO_STAGE_RESPONSE_MOCK,
275    )
276    def test_reevaluate_remove_last(self):
277        """Test planner with re-evaluate (last stage is removed)"""
278        flow = create_test_flow(
279            FlowDesignation.AUTHENTICATION,
280        )
281        false_policy = DummyPolicy.objects.create(
282            name=generate_id(), result=False, wait_min=1, wait_max=2
283        )
284
285        binding = FlowStageBinding.objects.create(
286            target=flow,
287            stage=DummyStage.objects.create(name=generate_id()),
288            order=0,
289            evaluate_on_plan=True,
290            re_evaluate_policies=False,
291        )
292        binding2 = FlowStageBinding.objects.create(
293            target=flow,
294            stage=DummyStage.objects.create(name=generate_id()),
295            order=1,
296            re_evaluate_policies=True,
297        )
298
299        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
300
301        # Here we patch the dummy policy to evaluate to true so the stage is included
302        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
303            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
304            # First request, run the planner
305            response = self.client.get(exec_url)
306            self.assertEqual(response.status_code, 200)
307
308            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
309
310            self.assertEqual(plan.bindings[0], binding)
311            self.assertEqual(plan.bindings[1], binding2)
312
313            self.assertEqual(plan.markers[0].__class__, StageMarker)
314            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
315
316            # Second request, this passes the first dummy stage
317            response = self.client.post(exec_url)
318            self.assertEqual(response.status_code, 302)
319
320        # third request, this should trigger the re-evaluate
321        # We do this request without the patch, so the policy results in false
322        response = self.client.post(exec_url)
323        self.assertEqual(response.status_code, 302)
324        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
325
326    def test_reevaluate_remove_middle(self):
327        """Test planner with re-evaluate (middle stage is removed)"""
328        flow = create_test_flow(
329            FlowDesignation.AUTHENTICATION,
330        )
331        false_policy = DummyPolicy.objects.create(
332            name=generate_id(), result=False, wait_min=1, wait_max=2
333        )
334
335        binding = FlowStageBinding.objects.create(
336            target=flow,
337            stage=DummyStage.objects.create(name=generate_id()),
338            order=0,
339            evaluate_on_plan=True,
340            re_evaluate_policies=False,
341        )
342        binding2 = FlowStageBinding.objects.create(
343            target=flow,
344            stage=DummyStage.objects.create(name=generate_id()),
345            order=1,
346            re_evaluate_policies=True,
347        )
348        binding3 = FlowStageBinding.objects.create(
349            target=flow,
350            stage=DummyStage.objects.create(name=generate_id()),
351            order=2,
352            evaluate_on_plan=True,
353            re_evaluate_policies=False,
354        )
355
356        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
357
358        # Here we patch the dummy policy to evaluate to true so the stage is included
359        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
360            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
361            # First request, run the planner
362            response = self.client.get(exec_url)
363
364            self.assertEqual(response.status_code, 200)
365            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
366
367            self.assertEqual(plan.bindings[0], binding)
368            self.assertEqual(plan.bindings[1], binding2)
369            self.assertEqual(plan.bindings[2], binding3)
370
371            self.assertEqual(plan.markers[0].__class__, StageMarker)
372            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
373            self.assertEqual(plan.markers[2].__class__, StageMarker)
374
375            # Second request, this passes the first dummy stage
376            response = self.client.post(exec_url)
377            self.assertEqual(response.status_code, 302)
378
379            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
380
381            self.assertEqual(plan.bindings[0], binding2)
382            self.assertEqual(plan.bindings[1], binding3)
383
384            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
385            self.assertEqual(plan.markers[1].__class__, StageMarker)
386
387        # third request, this should trigger the re-evaluate
388        # We do this request without the patch, so the policy results in false
389        response = self.client.post(exec_url)
390        self.assertEqual(response.status_code, 200)
391        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
392
393    def test_reevaluate_keep(self):
394        """Test planner with re-evaluate (everything is kept)"""
395        flow = create_test_flow(
396            FlowDesignation.AUTHENTICATION,
397        )
398        true_policy = DummyPolicy.objects.create(
399            name=generate_id(), result=True, wait_min=1, wait_max=2
400        )
401
402        binding = FlowStageBinding.objects.create(
403            target=flow,
404            stage=DummyStage.objects.create(name=generate_id()),
405            order=0,
406            evaluate_on_plan=True,
407            re_evaluate_policies=False,
408        )
409        binding2 = FlowStageBinding.objects.create(
410            target=flow,
411            stage=DummyStage.objects.create(name=generate_id()),
412            order=1,
413            re_evaluate_policies=True,
414        )
415        binding3 = FlowStageBinding.objects.create(
416            target=flow,
417            stage=DummyStage.objects.create(name=generate_id()),
418            order=2,
419            evaluate_on_plan=True,
420            re_evaluate_policies=False,
421        )
422
423        PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0)
424
425        # Here we patch the dummy policy to evaluate to true so the stage is included
426        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
427            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
428            # First request, run the planner
429            response = self.client.get(exec_url)
430
431            self.assertEqual(response.status_code, 200)
432            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
433
434            self.assertEqual(plan.bindings[0], binding)
435            self.assertEqual(plan.bindings[1], binding2)
436            self.assertEqual(plan.bindings[2], binding3)
437
438            self.assertEqual(plan.markers[0].__class__, StageMarker)
439            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
440            self.assertEqual(plan.markers[2].__class__, StageMarker)
441
442            # Second request, this passes the first dummy stage
443            response = self.client.post(exec_url)
444            self.assertEqual(response.status_code, 302)
445
446            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
447
448            self.assertEqual(plan.bindings[0], binding2)
449            self.assertEqual(plan.bindings[1], binding3)
450
451            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
452            self.assertEqual(plan.markers[1].__class__, StageMarker)
453
454            # Third request, this passes the first dummy stage
455            response = self.client.post(exec_url)
456            self.assertEqual(response.status_code, 302)
457
458            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
459
460            self.assertEqual(plan.bindings[0], binding3)
461
462            self.assertEqual(plan.markers[0].__class__, StageMarker)
463
464        # third request, this should trigger the re-evaluate
465        # We do this request without the patch, so the policy results in false
466        response = self.client.post(exec_url)
467        self.assertEqual(response.status_code, 200)
468        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
469
470    def test_reevaluate_remove_consecutive(self):
471        """Test planner with re-evaluate (consecutive stages are removed)"""
472        flow = create_test_flow(
473            FlowDesignation.AUTHENTICATION,
474        )
475        false_policy = DummyPolicy.objects.create(
476            name=generate_id(), result=False, wait_min=1, wait_max=2
477        )
478
479        binding = FlowStageBinding.objects.create(
480            target=flow,
481            stage=DummyStage.objects.create(name=generate_id()),
482            order=0,
483            evaluate_on_plan=True,
484            re_evaluate_policies=False,
485        )
486        binding2 = FlowStageBinding.objects.create(
487            target=flow,
488            stage=DummyStage.objects.create(name=generate_id()),
489            order=1,
490            re_evaluate_policies=True,
491        )
492        binding3 = FlowStageBinding.objects.create(
493            target=flow,
494            stage=DummyStage.objects.create(name=generate_id()),
495            order=2,
496            re_evaluate_policies=True,
497        )
498        binding4 = FlowStageBinding.objects.create(
499            target=flow,
500            stage=DummyStage.objects.create(name=generate_id()),
501            order=2,
502            evaluate_on_plan=True,
503            re_evaluate_policies=False,
504        )
505
506        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
507        PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0)
508
509        # Here we patch the dummy policy to evaluate to true so the stage is included
510        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
511            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
512            # First request, run the planner
513            response = self.client.get(exec_url)
514            self.assertEqual(response.status_code, 200)
515            self.assertStageResponse(response, flow, component="ak-stage-dummy")
516
517            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
518
519            self.assertEqual(plan.bindings[0], binding)
520            self.assertEqual(plan.bindings[1], binding2)
521            self.assertEqual(plan.bindings[2], binding3)
522            self.assertEqual(plan.bindings[3], binding4)
523
524            self.assertEqual(plan.markers[0].__class__, StageMarker)
525            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
526            self.assertEqual(plan.markers[2].__class__, ReevaluateMarker)
527            self.assertEqual(plan.markers[3].__class__, StageMarker)
528
529        # Second request, this passes the first dummy stage
530        response = self.client.post(exec_url)
531        self.assertEqual(response.status_code, 302)
532
533        # third request, this should trigger the re-evaluate
534        # A get request will evaluate the policies and this will return stage 4
535        # but it won't save it, hence we can't check the plan
536        response = self.client.get(exec_url)
537        self.assertStageResponse(response, flow, component="ak-stage-dummy")
538
539        # fourth request, this confirms the last stage (dummy4)
540        # We do this request without the patch, so the policy results in false
541        response = self.client.post(exec_url)
542        self.assertEqual(response.status_code, 200)
543        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
544
545    def test_stageview_user_identifier(self):
546        """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER"""
547        flow = create_test_flow(
548            FlowDesignation.AUTHENTICATION,
549        )
550        FlowStageBinding.objects.create(
551            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
552        )
553
554        ident = "test-identifier"
555
556        user = User.objects.create(username="test-user")
557        request = self.request_factory.get(
558            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
559        )
560        request.user = user
561        planner = FlowPlanner(flow)
562        plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident})
563
564        executor = FlowExecutorView()
565        executor.plan = plan
566        executor.flow = flow
567
568        stage_view = StageView(executor)
569        self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username)
570
571    def test_invalid_restart(self):
572        """Test flow that restarts on invalid entry"""
573        flow = create_test_flow(
574            FlowDesignation.AUTHENTICATION,
575        )
576        # Stage 0 is a deny stage that is added dynamically
577        # when the reputation policy says so
578        deny_stage = DenyStage.objects.create(name=generate_id())
579        reputation_policy = ReputationPolicy.objects.create(
580            name=generate_id(), threshold=-1, check_ip=False
581        )
582        deny_binding = FlowStageBinding.objects.create(
583            target=flow,
584            stage=deny_stage,
585            order=0,
586            evaluate_on_plan=False,
587            re_evaluate_policies=True,
588        )
589        PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0)
590
591        # Stage 1 is an identification stage
592        ident_stage = IdentificationStage.objects.create(
593            name=generate_id(),
594            user_fields=[UserFields.E_MAIL],
595            pretend_user_exists=False,
596        )
597        FlowStageBinding.objects.create(
598            target=flow,
599            stage=ident_stage,
600            order=1,
601            invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT,
602        )
603        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
604        # First request, run the planner
605        response = self.client.get(exec_url)
606        self.assertStageResponse(
607            response,
608            flow,
609            component="ak-stage-identification",
610            password_fields=False,
611            primary_action="Log in",
612            sources=[],
613            show_source_labels=False,
614            user_fields=[UserFields.E_MAIL],
615        )
616        response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True)
617        self.assertStageResponse(response, flow, component="ak-stage-access-denied")
618
619    def test_re_evaluate_group_binding(self):
620        """Test re-evaluate stage binding that has a policy binding to a group"""
621        flow = create_test_flow()
622
623        user_group_membership = create_test_user()
624        user_direct_binding = create_test_user()
625        user_other = create_test_user()
626
627        group_a = Group.objects.create(name=generate_id())
628        user_group_membership.groups.add(group_a)
629
630        # Stage 0 is an identification stage
631        ident_stage = IdentificationStage.objects.create(
632            name=generate_id(),
633            user_fields=[UserFields.USERNAME],
634            pretend_user_exists=False,
635        )
636        FlowStageBinding.objects.create(
637            target=flow,
638            stage=ident_stage,
639            order=0,
640        )
641
642        # Stage 1 is a dummy stage that is only shown for users in group_a
643        dummy_stage = DummyStage.objects.create(name=generate_id())
644        dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1)
645        PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0)
646        PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0)
647
648        # Stage 2 is a deny stage that (in this case) only user_b will see
649        deny_stage = DenyStage.objects.create(name=generate_id())
650        FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2)
651
652        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
653
654        with self.subTest(f"Test user access through group: {user_group_membership}"):
655            self.client.logout()
656            # First request, run the planner
657            response = self.client.get(exec_url)
658            self.assertStageResponse(response, flow, component="ak-stage-identification")
659            response = self.client.post(
660                exec_url, {"uid_field": user_group_membership.username}, follow=True
661            )
662            self.assertStageResponse(response, flow, component="ak-stage-dummy")
663        with self.subTest(f"Test user access through user: {user_direct_binding}"):
664            self.client.logout()
665            # First request, run the planner
666            response = self.client.get(exec_url)
667            self.assertStageResponse(response, flow, component="ak-stage-identification")
668            response = self.client.post(
669                exec_url, {"uid_field": user_direct_binding.username}, follow=True
670            )
671            self.assertStageResponse(response, flow, component="ak-stage-dummy")
672        with self.subTest(f"Test user has no access: {user_other}"):
673            self.client.logout()
674            # First request, run the planner
675            response = self.client.get(exec_url)
676            self.assertStageResponse(response, flow, component="ak-stage-identification")
677            response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True)
678            self.assertStageResponse(response, flow, component="ak-stage-access-denied")
679
680    @patch(
681        "authentik.flows.views.executor.to_stage_response",
682        TO_STAGE_RESPONSE_MOCK,
683    )
684    def test_invalid_json(self):
685        """Test invalid JSON body"""
686        flow = create_test_flow()
687        FlowStageBinding.objects.create(
688            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
689        )
690        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
691
692        with override_settings(TEST=False, DEBUG=False):
693            self.client.logout()
694            response = self.client.post(url, data="{", content_type="application/json")
695            self.assertEqual(response.status_code, 200)
696
697        with self.assertRaises(ParseError):
698            self.client.logout()
699            response = self.client.post(url, data="{", content_type="application/json")
700            self.assertEqual(response.status_code, 200)
701
702    def test_cancel_next(self):
703        """Test cancel URL with ?next param set"""
704        flow = create_test_flow()
705
706        # Stage 0 is an identification stage
707        ident_stage = IdentificationStage.objects.create(
708            name=generate_id(),
709            user_fields=[UserFields.USERNAME],
710        )
711        FlowStageBinding.objects.create(
712            target=flow,
713            stage=ident_stage,
714            order=0,
715        )
716
717        # Stage 1 is a password stage
718        password_stage = PasswordStage.objects.create(name=generate_id(), backends=[])
719        FlowStageBinding.objects.create(
720            target=flow,
721            stage=password_stage,
722            order=1,
723        )
724        res = self.client.get(
725            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
726            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}"
727        )
728        self.assertStageResponse(res, flow, component="ak-stage-identification")
729
730        res = self.client.post(
731            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
732            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}",
733            data={"component": "ak-stage-identification", "uid_field": generate_id()},
734            follow=True,
735        )
736        self.assertEqual(res.status_code, 200)
737        self.assertStageResponse(
738            res,
739            flow,
740            flow_info={
741                "background": "/static/dist/assets/images/flow_background.jpg",
742                "background_themed_urls": None,
743                "cancel_url": "/flows/-/cancel/?next=%2Ffoo",
744                "layout": "stacked",
745                "title": flow.title,
746            },
747        )
748
749    @patch(
750        "authentik.flows.views.executor.to_stage_response",
751        TO_STAGE_RESPONSE_MOCK,
752    )
753    def test_expired_flow_token(self):
754        """Test that an expired flow token shows an appropriate error message"""
755        flow = create_test_flow(
756            FlowDesignation.RECOVERY,
757            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
758        )
759        user = create_test_user()
760        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[], markers=[])
761
762        token = FlowToken.objects.create(
763            user=user,
764            identifier=generate_id(),
765            flow=flow,
766            _plan=FlowToken.pickle(plan),
767            expires=now() - timedelta(hours=1),
768        )
769
770        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
771        response = self.client.get(
772            url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: token.key})})}"
773        )
774        self.assertStageResponse(
775            response,
776            flow,
777            component="ak-stage-access-denied",
778            error_message="This link is invalid or has expired. Please request a new one.",
779        )
780
781    @patch(
782        "authentik.flows.views.executor.to_stage_response",
783        TO_STAGE_RESPONSE_MOCK,
784    )
785    def test_invalid_flow_token_require_token(self):
786        """Test that an invalid/nonexistent token on a REQUIRE_TOKEN flow shows error"""
787        flow = create_test_flow(
788            FlowDesignation.RECOVERY,
789            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
790        )
791
792        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
793        response = self.client.get(
794            url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: 'invalid-token'})})}"
795        )
796        self.assertStageResponse(
797            response,
798            flow,
799            component="ak-stage-access-denied",
800            error_message="This link is invalid or has expired. Please request a new one.",
801        )
802
803    @patch(
804        "authentik.flows.views.executor.to_stage_response",
805        TO_STAGE_RESPONSE_MOCK,
806    )
807    def test_no_token_require_token(self):
808        """Test that accessing a REQUIRE_TOKEN flow without any token shows error"""
809        flow = create_test_flow(
810            FlowDesignation.RECOVERY,
811            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
812        )
813
814        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
815        response = self.client.get(url)
816        self.assertStageResponse(
817            response,
818            flow,
819            component="ak-stage-access-denied",
820            error_message="This link is invalid or has expired. Please request a new one.",
821        )

Test executor

def setUp(self):
62    def setUp(self):
63        self.request_factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_existing_plan_diff_flow(self):
65    @patch(
66        "authentik.flows.views.executor.to_stage_response",
67        TO_STAGE_RESPONSE_MOCK,
68    )
69    def test_existing_plan_diff_flow(self):
70        """Check that a plan for a different flow cancels the current plan"""
71        flow = create_test_flow(
72            FlowDesignation.AUTHENTICATION,
73        )
74        stage = DummyStage.objects.create(name=generate_id())
75        binding = FlowStageBinding(target=flow, stage=stage, order=0)
76        plan = FlowPlan(flow_pk=flow.pk.hex + "a", bindings=[binding], markers=[StageMarker()])
77        session = self.client.session
78        session[SESSION_KEY_PLAN] = plan
79        session.save()
80
81        cancel_mock = MagicMock()
82        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", cancel_mock):
83            response = self.client.get(
84                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
85            )
86            self.assertEqual(response.status_code, 302)
87            self.assertEqual(cancel_mock.call_count, 2)

Check that a plan for a different flow cancels the current plan

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
@patch('authentik.policies.engine.PolicyEngine.result', POLICY_RETURN_FALSE)
def test_invalid_non_applicable_flow(self):
 89    @patch(
 90        "authentik.flows.views.executor.to_stage_response",
 91        TO_STAGE_RESPONSE_MOCK,
 92    )
 93    @patch(
 94        "authentik.policies.engine.PolicyEngine.result",
 95        POLICY_RETURN_FALSE,
 96    )
 97    def test_invalid_non_applicable_flow(self):
 98        """Tests that a non-applicable flow returns the correct error message"""
 99        flow = create_test_flow(
100            FlowDesignation.AUTHENTICATION,
101        )
102
103        response = self.client.get(
104            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
105        )
106        self.assertStageResponse(
107            response,
108            flow=flow,
109            error_message="foo",
110            component="ak-stage-access-denied",
111        )

Tests that a non-applicable flow returns the correct error message

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
@patch('authentik.policies.engine.PolicyEngine.result', POLICY_RETURN_FALSE)
def test_invalid_non_applicable_flow_continue(self):
113    @patch(
114        "authentik.flows.views.executor.to_stage_response",
115        TO_STAGE_RESPONSE_MOCK,
116    )
117    @patch(
118        "authentik.policies.engine.PolicyEngine.result",
119        POLICY_RETURN_FALSE,
120    )
121    def test_invalid_non_applicable_flow_continue(self):
122        """Tests that a non-applicable flow that should redirect"""
123        flow = create_test_flow(
124            FlowDesignation.AUTHENTICATION,
125            denied_action=FlowDeniedAction.CONTINUE,
126        )
127
128        response = self.client.get(
129            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
130        )
131        self.assertEqual(response.status_code, 302)
132        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))

Tests that a non-applicable flow that should redirect

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_flow_redirect(self):
134    @patch(
135        "authentik.flows.views.executor.to_stage_response",
136        TO_STAGE_RESPONSE_MOCK,
137    )
138    def test_invalid_flow_redirect(self):
139        """Test invalid flow with valid redirect destination"""
140        flow = create_test_flow(
141            FlowDesignation.AUTHENTICATION,
142        )
143
144        dest = "/unique-string"
145        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
146        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
147        self.assertEqual(response.status_code, 302)
148        self.assertEqual(response.url, "/unique-string")

Test invalid flow with valid redirect destination

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_flow_invalid_redirect(self):
150    @patch(
151        "authentik.flows.views.executor.to_stage_response",
152        TO_STAGE_RESPONSE_MOCK,
153    )
154    def test_invalid_flow_invalid_redirect(self):
155        """Test invalid flow redirect with an invalid URL"""
156        flow = create_test_flow(
157            FlowDesignation.AUTHENTICATION,
158        )
159
160        dest = "http://something.example.com/unique-string"
161        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
162
163        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
164        self.assertEqual(response.status_code, 200)
165        self.assertStageResponse(
166            response,
167            flow,
168            component="ak-stage-access-denied",
169            error_message="Invalid next URL",
170        )

Test invalid flow redirect with an invalid URL

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_valid_flow_redirect(self):
172    @patch(
173        "authentik.flows.views.executor.to_stage_response",
174        TO_STAGE_RESPONSE_MOCK,
175    )
176    def test_valid_flow_redirect(self):
177        """Test valid flow with valid redirect destination"""
178        flow = create_test_flow()
179
180        dest = "/unique-string"
181        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
182
183        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
184        self.assertEqual(response.status_code, 302)
185        self.assertEqual(response.url, "/unique-string")

Test valid flow with valid redirect destination

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_valid_flow_redirect_authenticated(self):
187    @patch(
188        "authentik.flows.views.executor.to_stage_response",
189        TO_STAGE_RESPONSE_MOCK,
190    )
191    def test_valid_flow_redirect_authenticated(self):
192        """Test valid flow with valid redirect destination, authenticated already"""
193        flow = create_test_flow()
194        flow.designation = FlowDesignation.AUTHENTICATION
195        flow.authentication = FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
196        flow.save()
197        self.client.force_login(create_test_user())
198
199        dest = "/unique-string"
200        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
201
202        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
203        self.assertEqual(response.status_code, 302)
204        self.assertEqual(response.url, "/unique-string")

Test valid flow with valid redirect destination, authenticated already

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_valid_flow_invalid_redirect(self):
206    @patch(
207        "authentik.flows.views.executor.to_stage_response",
208        TO_STAGE_RESPONSE_MOCK,
209    )
210    def test_valid_flow_invalid_redirect(self):
211        """Test valid flow redirect with an invalid URL"""
212        flow = create_test_flow()
213
214        dest = "http://something.example.com/unique-string"
215        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
216
217        response = self.client.get(url + f"?{QS_QUERY}={urlencode({NEXT_ARG_NAME: dest})}")
218        self.assertEqual(response.status_code, 200)
219        self.assertStageResponse(
220            response,
221            flow,
222            component="ak-stage-access-denied",
223            error_message="Invalid next URL",
224        )

Test valid flow redirect with an invalid URL

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_empty_flow(self):
226    @patch(
227        "authentik.flows.views.executor.to_stage_response",
228        TO_STAGE_RESPONSE_MOCK,
229    )
230    def test_invalid_empty_flow(self):
231        """Tests that an empty flow returns the correct error message"""
232        flow = create_test_flow(
233            FlowDesignation.AUTHENTICATION,
234        )
235
236        response = self.client.get(
237            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
238        )
239        self.assertEqual(response.status_code, 302)
240        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))

Tests that an empty flow returns the correct error message

def test_multi_stage_flow(self):
242    def test_multi_stage_flow(self):
243        """Test a full flow with multiple stages"""
244        flow = create_test_flow(
245            FlowDesignation.AUTHENTICATION,
246        )
247        FlowStageBinding.objects.create(
248            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
249        )
250        FlowStageBinding.objects.create(
251            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=1
252        )
253
254        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
255        # First Request, start planning, renders form
256        response = self.client.get(exec_url)
257        self.assertEqual(response.status_code, 200)
258        # Check that two stages are in plan
259        session = self.client.session
260        plan: FlowPlan = session[SESSION_KEY_PLAN]
261        self.assertEqual(len(plan.bindings), 2)
262        # Second request, submit form, one stage left
263        response = self.client.post(exec_url)
264        # Second request redirects to the same URL
265        self.assertEqual(response.status_code, 302)
266        self.assertEqual(response.url, exec_url)
267        # Check that two stages are in plan
268        session = self.client.session
269        plan: FlowPlan = session[SESSION_KEY_PLAN]
270        self.assertEqual(len(plan.bindings), 1)

Test a full flow with multiple stages

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_reevaluate_remove_last(self):
272    @patch(
273        "authentik.flows.views.executor.to_stage_response",
274        TO_STAGE_RESPONSE_MOCK,
275    )
276    def test_reevaluate_remove_last(self):
277        """Test planner with re-evaluate (last stage is removed)"""
278        flow = create_test_flow(
279            FlowDesignation.AUTHENTICATION,
280        )
281        false_policy = DummyPolicy.objects.create(
282            name=generate_id(), result=False, wait_min=1, wait_max=2
283        )
284
285        binding = FlowStageBinding.objects.create(
286            target=flow,
287            stage=DummyStage.objects.create(name=generate_id()),
288            order=0,
289            evaluate_on_plan=True,
290            re_evaluate_policies=False,
291        )
292        binding2 = FlowStageBinding.objects.create(
293            target=flow,
294            stage=DummyStage.objects.create(name=generate_id()),
295            order=1,
296            re_evaluate_policies=True,
297        )
298
299        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
300
301        # Here we patch the dummy policy to evaluate to true so the stage is included
302        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
303            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
304            # First request, run the planner
305            response = self.client.get(exec_url)
306            self.assertEqual(response.status_code, 200)
307
308            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
309
310            self.assertEqual(plan.bindings[0], binding)
311            self.assertEqual(plan.bindings[1], binding2)
312
313            self.assertEqual(plan.markers[0].__class__, StageMarker)
314            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
315
316            # Second request, this passes the first dummy stage
317            response = self.client.post(exec_url)
318            self.assertEqual(response.status_code, 302)
319
320        # third request, this should trigger the re-evaluate
321        # We do this request without the patch, so the policy results in false
322        response = self.client.post(exec_url)
323        self.assertEqual(response.status_code, 302)
324        self.assertEqual(response.url, reverse("authentik_core:root-redirect"))

Test planner with re-evaluate (last stage is removed)

def test_reevaluate_remove_middle(self):
326    def test_reevaluate_remove_middle(self):
327        """Test planner with re-evaluate (middle stage is removed)"""
328        flow = create_test_flow(
329            FlowDesignation.AUTHENTICATION,
330        )
331        false_policy = DummyPolicy.objects.create(
332            name=generate_id(), result=False, wait_min=1, wait_max=2
333        )
334
335        binding = FlowStageBinding.objects.create(
336            target=flow,
337            stage=DummyStage.objects.create(name=generate_id()),
338            order=0,
339            evaluate_on_plan=True,
340            re_evaluate_policies=False,
341        )
342        binding2 = FlowStageBinding.objects.create(
343            target=flow,
344            stage=DummyStage.objects.create(name=generate_id()),
345            order=1,
346            re_evaluate_policies=True,
347        )
348        binding3 = FlowStageBinding.objects.create(
349            target=flow,
350            stage=DummyStage.objects.create(name=generate_id()),
351            order=2,
352            evaluate_on_plan=True,
353            re_evaluate_policies=False,
354        )
355
356        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
357
358        # Here we patch the dummy policy to evaluate to true so the stage is included
359        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
360            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
361            # First request, run the planner
362            response = self.client.get(exec_url)
363
364            self.assertEqual(response.status_code, 200)
365            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
366
367            self.assertEqual(plan.bindings[0], binding)
368            self.assertEqual(plan.bindings[1], binding2)
369            self.assertEqual(plan.bindings[2], binding3)
370
371            self.assertEqual(plan.markers[0].__class__, StageMarker)
372            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
373            self.assertEqual(plan.markers[2].__class__, StageMarker)
374
375            # Second request, this passes the first dummy stage
376            response = self.client.post(exec_url)
377            self.assertEqual(response.status_code, 302)
378
379            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
380
381            self.assertEqual(plan.bindings[0], binding2)
382            self.assertEqual(plan.bindings[1], binding3)
383
384            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
385            self.assertEqual(plan.markers[1].__class__, StageMarker)
386
387        # third request, this should trigger the re-evaluate
388        # We do this request without the patch, so the policy results in false
389        response = self.client.post(exec_url)
390        self.assertEqual(response.status_code, 200)
391        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test planner with re-evaluate (middle stage is removed)

def test_reevaluate_keep(self):
393    def test_reevaluate_keep(self):
394        """Test planner with re-evaluate (everything is kept)"""
395        flow = create_test_flow(
396            FlowDesignation.AUTHENTICATION,
397        )
398        true_policy = DummyPolicy.objects.create(
399            name=generate_id(), result=True, wait_min=1, wait_max=2
400        )
401
402        binding = FlowStageBinding.objects.create(
403            target=flow,
404            stage=DummyStage.objects.create(name=generate_id()),
405            order=0,
406            evaluate_on_plan=True,
407            re_evaluate_policies=False,
408        )
409        binding2 = FlowStageBinding.objects.create(
410            target=flow,
411            stage=DummyStage.objects.create(name=generate_id()),
412            order=1,
413            re_evaluate_policies=True,
414        )
415        binding3 = FlowStageBinding.objects.create(
416            target=flow,
417            stage=DummyStage.objects.create(name=generate_id()),
418            order=2,
419            evaluate_on_plan=True,
420            re_evaluate_policies=False,
421        )
422
423        PolicyBinding.objects.create(policy=true_policy, target=binding2, order=0)
424
425        # Here we patch the dummy policy to evaluate to true so the stage is included
426        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
427            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
428            # First request, run the planner
429            response = self.client.get(exec_url)
430
431            self.assertEqual(response.status_code, 200)
432            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
433
434            self.assertEqual(plan.bindings[0], binding)
435            self.assertEqual(plan.bindings[1], binding2)
436            self.assertEqual(plan.bindings[2], binding3)
437
438            self.assertEqual(plan.markers[0].__class__, StageMarker)
439            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
440            self.assertEqual(plan.markers[2].__class__, StageMarker)
441
442            # Second request, this passes the first dummy stage
443            response = self.client.post(exec_url)
444            self.assertEqual(response.status_code, 302)
445
446            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
447
448            self.assertEqual(plan.bindings[0], binding2)
449            self.assertEqual(plan.bindings[1], binding3)
450
451            self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
452            self.assertEqual(plan.markers[1].__class__, StageMarker)
453
454            # Third request, this passes the first dummy stage
455            response = self.client.post(exec_url)
456            self.assertEqual(response.status_code, 302)
457
458            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
459
460            self.assertEqual(plan.bindings[0], binding3)
461
462            self.assertEqual(plan.markers[0].__class__, StageMarker)
463
464        # third request, this should trigger the re-evaluate
465        # We do this request without the patch, so the policy results in false
466        response = self.client.post(exec_url)
467        self.assertEqual(response.status_code, 200)
468        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test planner with re-evaluate (everything is kept)

def test_reevaluate_remove_consecutive(self):
470    def test_reevaluate_remove_consecutive(self):
471        """Test planner with re-evaluate (consecutive stages are removed)"""
472        flow = create_test_flow(
473            FlowDesignation.AUTHENTICATION,
474        )
475        false_policy = DummyPolicy.objects.create(
476            name=generate_id(), result=False, wait_min=1, wait_max=2
477        )
478
479        binding = FlowStageBinding.objects.create(
480            target=flow,
481            stage=DummyStage.objects.create(name=generate_id()),
482            order=0,
483            evaluate_on_plan=True,
484            re_evaluate_policies=False,
485        )
486        binding2 = FlowStageBinding.objects.create(
487            target=flow,
488            stage=DummyStage.objects.create(name=generate_id()),
489            order=1,
490            re_evaluate_policies=True,
491        )
492        binding3 = FlowStageBinding.objects.create(
493            target=flow,
494            stage=DummyStage.objects.create(name=generate_id()),
495            order=2,
496            re_evaluate_policies=True,
497        )
498        binding4 = FlowStageBinding.objects.create(
499            target=flow,
500            stage=DummyStage.objects.create(name=generate_id()),
501            order=2,
502            evaluate_on_plan=True,
503            re_evaluate_policies=False,
504        )
505
506        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
507        PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0)
508
509        # Here we patch the dummy policy to evaluate to true so the stage is included
510        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
511            exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
512            # First request, run the planner
513            response = self.client.get(exec_url)
514            self.assertEqual(response.status_code, 200)
515            self.assertStageResponse(response, flow, component="ak-stage-dummy")
516
517            plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
518
519            self.assertEqual(plan.bindings[0], binding)
520            self.assertEqual(plan.bindings[1], binding2)
521            self.assertEqual(plan.bindings[2], binding3)
522            self.assertEqual(plan.bindings[3], binding4)
523
524            self.assertEqual(plan.markers[0].__class__, StageMarker)
525            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
526            self.assertEqual(plan.markers[2].__class__, ReevaluateMarker)
527            self.assertEqual(plan.markers[3].__class__, StageMarker)
528
529        # Second request, this passes the first dummy stage
530        response = self.client.post(exec_url)
531        self.assertEqual(response.status_code, 302)
532
533        # third request, this should trigger the re-evaluate
534        # A get request will evaluate the policies and this will return stage 4
535        # but it won't save it, hence we can't check the plan
536        response = self.client.get(exec_url)
537        self.assertStageResponse(response, flow, component="ak-stage-dummy")
538
539        # fourth request, this confirms the last stage (dummy4)
540        # We do this request without the patch, so the policy results in false
541        response = self.client.post(exec_url)
542        self.assertEqual(response.status_code, 200)
543        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test planner with re-evaluate (consecutive stages are removed)

def test_stageview_user_identifier(self):
545    def test_stageview_user_identifier(self):
546        """Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER"""
547        flow = create_test_flow(
548            FlowDesignation.AUTHENTICATION,
549        )
550        FlowStageBinding.objects.create(
551            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
552        )
553
554        ident = "test-identifier"
555
556        user = User.objects.create(username="test-user")
557        request = self.request_factory.get(
558            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
559        )
560        request.user = user
561        planner = FlowPlanner(flow)
562        plan = planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER_IDENTIFIER: ident})
563
564        executor = FlowExecutorView()
565        executor.plan = plan
566        executor.flow = flow
567
568        stage_view = StageView(executor)
569        self.assertEqual(ident, stage_view.get_pending_user(for_display=True).username)

Test PLAN_CONTEXT_PENDING_USER_IDENTIFIER

def test_invalid_restart(self):
571    def test_invalid_restart(self):
572        """Test flow that restarts on invalid entry"""
573        flow = create_test_flow(
574            FlowDesignation.AUTHENTICATION,
575        )
576        # Stage 0 is a deny stage that is added dynamically
577        # when the reputation policy says so
578        deny_stage = DenyStage.objects.create(name=generate_id())
579        reputation_policy = ReputationPolicy.objects.create(
580            name=generate_id(), threshold=-1, check_ip=False
581        )
582        deny_binding = FlowStageBinding.objects.create(
583            target=flow,
584            stage=deny_stage,
585            order=0,
586            evaluate_on_plan=False,
587            re_evaluate_policies=True,
588        )
589        PolicyBinding.objects.create(policy=reputation_policy, target=deny_binding, order=0)
590
591        # Stage 1 is an identification stage
592        ident_stage = IdentificationStage.objects.create(
593            name=generate_id(),
594            user_fields=[UserFields.E_MAIL],
595            pretend_user_exists=False,
596        )
597        FlowStageBinding.objects.create(
598            target=flow,
599            stage=ident_stage,
600            order=1,
601            invalid_response_action=InvalidResponseAction.RESTART_WITH_CONTEXT,
602        )
603        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
604        # First request, run the planner
605        response = self.client.get(exec_url)
606        self.assertStageResponse(
607            response,
608            flow,
609            component="ak-stage-identification",
610            password_fields=False,
611            primary_action="Log in",
612            sources=[],
613            show_source_labels=False,
614            user_fields=[UserFields.E_MAIL],
615        )
616        response = self.client.post(exec_url, {"uid_field": "invalid-string"}, follow=True)
617        self.assertStageResponse(response, flow, component="ak-stage-access-denied")

Test flow that restarts on invalid entry

def test_re_evaluate_group_binding(self):
619    def test_re_evaluate_group_binding(self):
620        """Test re-evaluate stage binding that has a policy binding to a group"""
621        flow = create_test_flow()
622
623        user_group_membership = create_test_user()
624        user_direct_binding = create_test_user()
625        user_other = create_test_user()
626
627        group_a = Group.objects.create(name=generate_id())
628        user_group_membership.groups.add(group_a)
629
630        # Stage 0 is an identification stage
631        ident_stage = IdentificationStage.objects.create(
632            name=generate_id(),
633            user_fields=[UserFields.USERNAME],
634            pretend_user_exists=False,
635        )
636        FlowStageBinding.objects.create(
637            target=flow,
638            stage=ident_stage,
639            order=0,
640        )
641
642        # Stage 1 is a dummy stage that is only shown for users in group_a
643        dummy_stage = DummyStage.objects.create(name=generate_id())
644        dummy_binding = FlowStageBinding.objects.create(target=flow, stage=dummy_stage, order=1)
645        PolicyBinding.objects.create(group=group_a, target=dummy_binding, order=0)
646        PolicyBinding.objects.create(user=user_direct_binding, target=dummy_binding, order=0)
647
648        # Stage 2 is a deny stage that (in this case) only user_b will see
649        deny_stage = DenyStage.objects.create(name=generate_id())
650        FlowStageBinding.objects.create(target=flow, stage=deny_stage, order=2)
651
652        exec_url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
653
654        with self.subTest(f"Test user access through group: {user_group_membership}"):
655            self.client.logout()
656            # First request, run the planner
657            response = self.client.get(exec_url)
658            self.assertStageResponse(response, flow, component="ak-stage-identification")
659            response = self.client.post(
660                exec_url, {"uid_field": user_group_membership.username}, follow=True
661            )
662            self.assertStageResponse(response, flow, component="ak-stage-dummy")
663        with self.subTest(f"Test user access through user: {user_direct_binding}"):
664            self.client.logout()
665            # First request, run the planner
666            response = self.client.get(exec_url)
667            self.assertStageResponse(response, flow, component="ak-stage-identification")
668            response = self.client.post(
669                exec_url, {"uid_field": user_direct_binding.username}, follow=True
670            )
671            self.assertStageResponse(response, flow, component="ak-stage-dummy")
672        with self.subTest(f"Test user has no access: {user_other}"):
673            self.client.logout()
674            # First request, run the planner
675            response = self.client.get(exec_url)
676            self.assertStageResponse(response, flow, component="ak-stage-identification")
677            response = self.client.post(exec_url, {"uid_field": user_other.username}, follow=True)
678            self.assertStageResponse(response, flow, component="ak-stage-access-denied")

Test re-evaluate stage binding that has a policy binding to a group

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_json(self):
680    @patch(
681        "authentik.flows.views.executor.to_stage_response",
682        TO_STAGE_RESPONSE_MOCK,
683    )
684    def test_invalid_json(self):
685        """Test invalid JSON body"""
686        flow = create_test_flow()
687        FlowStageBinding.objects.create(
688            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
689        )
690        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
691
692        with override_settings(TEST=False, DEBUG=False):
693            self.client.logout()
694            response = self.client.post(url, data="{", content_type="application/json")
695            self.assertEqual(response.status_code, 200)
696
697        with self.assertRaises(ParseError):
698            self.client.logout()
699            response = self.client.post(url, data="{", content_type="application/json")
700            self.assertEqual(response.status_code, 200)

Test invalid JSON body

def test_cancel_next(self):
702    def test_cancel_next(self):
703        """Test cancel URL with ?next param set"""
704        flow = create_test_flow()
705
706        # Stage 0 is an identification stage
707        ident_stage = IdentificationStage.objects.create(
708            name=generate_id(),
709            user_fields=[UserFields.USERNAME],
710        )
711        FlowStageBinding.objects.create(
712            target=flow,
713            stage=ident_stage,
714            order=0,
715        )
716
717        # Stage 1 is a password stage
718        password_stage = PasswordStage.objects.create(name=generate_id(), backends=[])
719        FlowStageBinding.objects.create(
720            target=flow,
721            stage=password_stage,
722            order=1,
723        )
724        res = self.client.get(
725            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
726            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}"
727        )
728        self.assertStageResponse(res, flow, component="ak-stage-identification")
729
730        res = self.client.post(
731            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
732            + f"?{urlencode({QS_QUERY: urlencode({NEXT_ARG_NAME: "/foo"})})}",
733            data={"component": "ak-stage-identification", "uid_field": generate_id()},
734            follow=True,
735        )
736        self.assertEqual(res.status_code, 200)
737        self.assertStageResponse(
738            res,
739            flow,
740            flow_info={
741                "background": "/static/dist/assets/images/flow_background.jpg",
742                "background_themed_urls": None,
743                "cancel_url": "/flows/-/cancel/?next=%2Ffoo",
744                "layout": "stacked",
745                "title": flow.title,
746            },
747        )

Test cancel URL with ?next param set

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_expired_flow_token(self):
749    @patch(
750        "authentik.flows.views.executor.to_stage_response",
751        TO_STAGE_RESPONSE_MOCK,
752    )
753    def test_expired_flow_token(self):
754        """Test that an expired flow token shows an appropriate error message"""
755        flow = create_test_flow(
756            FlowDesignation.RECOVERY,
757            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
758        )
759        user = create_test_user()
760        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[], markers=[])
761
762        token = FlowToken.objects.create(
763            user=user,
764            identifier=generate_id(),
765            flow=flow,
766            _plan=FlowToken.pickle(plan),
767            expires=now() - timedelta(hours=1),
768        )
769
770        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
771        response = self.client.get(
772            url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: token.key})})}"
773        )
774        self.assertStageResponse(
775            response,
776            flow,
777            component="ak-stage-access-denied",
778            error_message="This link is invalid or has expired. Please request a new one.",
779        )

Test that an expired flow token shows an appropriate error message

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_invalid_flow_token_require_token(self):
781    @patch(
782        "authentik.flows.views.executor.to_stage_response",
783        TO_STAGE_RESPONSE_MOCK,
784    )
785    def test_invalid_flow_token_require_token(self):
786        """Test that an invalid/nonexistent token on a REQUIRE_TOKEN flow shows error"""
787        flow = create_test_flow(
788            FlowDesignation.RECOVERY,
789            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
790        )
791
792        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
793        response = self.client.get(
794            url + f"?{urlencode({QS_QUERY: urlencode({QS_KEY_TOKEN: 'invalid-token'})})}"
795        )
796        self.assertStageResponse(
797            response,
798            flow,
799            component="ak-stage-access-denied",
800            error_message="This link is invalid or has expired. Please request a new one.",
801        )

Test that an invalid/nonexistent token on a REQUIRE_TOKEN flow shows error

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_no_token_require_token(self):
803    @patch(
804        "authentik.flows.views.executor.to_stage_response",
805        TO_STAGE_RESPONSE_MOCK,
806    )
807    def test_no_token_require_token(self):
808        """Test that accessing a REQUIRE_TOKEN flow without any token shows error"""
809        flow = create_test_flow(
810            FlowDesignation.RECOVERY,
811            authentication=FlowAuthenticationRequirement.REQUIRE_TOKEN,
812        )
813
814        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug})
815        response = self.client.get(url)
816        self.assertStageResponse(
817            response,
818            flow,
819            component="ak-stage-access-denied",
820            error_message="This link is invalid or has expired. Please request a new one.",
821        )

Test that accessing a REQUIRE_TOKEN flow without any token shows error