authentik.flows.tests.test_planner

flow planner tests

  1"""flow planner tests"""
  2
  3from unittest.mock import MagicMock, Mock, PropertyMock, patch
  4
  5from django.core.cache import cache
  6from django.http import HttpRequest
  7from django.shortcuts import redirect
  8from django.test import TestCase
  9from django.urls import reverse
 10
 11from authentik.blueprints.tests import reconcile_app
 12from authentik.core.models import User
 13from authentik.core.tests.utils import (
 14    RequestFactory,
 15    create_test_admin_user,
 16    create_test_flow,
 17    dummy_get_response,
 18)
 19from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException
 20from authentik.flows.markers import ReevaluateMarker, StageMarker
 21from authentik.flows.models import (
 22    FlowAuthenticationRequirement,
 23    FlowDesignation,
 24    FlowStageBinding,
 25    in_memory_stage,
 26)
 27from authentik.flows.planner import (
 28    PLAN_CONTEXT_IS_REDIRECTED,
 29    PLAN_CONTEXT_PENDING_USER,
 30    FlowPlanner,
 31    cache_key,
 32)
 33from authentik.flows.stage import StageView
 34from authentik.lib.generators import generate_id
 35from authentik.outposts.apps import MANAGED_OUTPOST
 36from authentik.outposts.models import Outpost
 37from authentik.policies.dummy.models import DummyPolicy
 38from authentik.policies.models import PolicyBinding
 39from authentik.policies.types import PolicyResult
 40from authentik.root.middleware import ClientIPMiddleware
 41from authentik.stages.dummy.models import DummyStage
 42
 43POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False))
 44CACHE_MOCK = Mock(wraps=cache)
 45
 46POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True))
 47
 48
 49class TestFlowPlanner(TestCase):
 50    """Test planner logic"""
 51
 52    def setUp(self):
 53        self.request_factory = RequestFactory()
 54
 55    def test_empty_plan(self):
 56        """Test that empty plan raises exception"""
 57        flow = create_test_flow()
 58        request = self.request_factory.get(
 59            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 60        )
 61
 62        with self.assertRaises(EmptyFlowException):
 63            planner = FlowPlanner(flow)
 64            planner.plan(request)
 65
 66    def test_authentication(self):
 67        """Test flow authentication"""
 68        flow = create_test_flow()
 69        flow.authentication = FlowAuthenticationRequirement.NONE
 70        request = self.request_factory.get(
 71            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 72        )
 73        planner = FlowPlanner(flow)
 74        planner.allow_empty_flows = True
 75        planner.plan(request)
 76
 77        with self.assertRaises(FlowNonApplicableException):
 78            flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
 79            FlowPlanner(flow).plan(request)
 80        with self.assertRaises(FlowNonApplicableException):
 81            flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER
 82            FlowPlanner(flow).plan(request)
 83
 84        request.user = create_test_admin_user()
 85        planner = FlowPlanner(flow)
 86        planner.allow_empty_flows = True
 87        planner.plan(request)
 88
 89    def test_authentication_redirect_required(self):
 90        """Test flow authentication (redirect required)"""
 91        flow = create_test_flow()
 92        flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT
 93        request = self.request_factory.get(
 94            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 95        )
 96        planner = FlowPlanner(flow)
 97        planner.allow_empty_flows = True
 98
 99        with self.assertRaises(FlowNonApplicableException):
100            planner.plan(request)
101
102        context = {}
103        context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow()
104        planner.plan(request, context)
105
106    @reconcile_app("authentik_outposts")
107    def test_authentication_outpost(self):
108        """Test flow authentication (outpost)"""
109        flow = create_test_flow()
110        flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST
111        request = self.request_factory.get(
112            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
113        )
114        with self.assertRaises(FlowNonApplicableException):
115            planner = FlowPlanner(flow)
116            planner.allow_empty_flows = True
117            planner.plan(request)
118
119        outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first()
120        request = self.request_factory.get(
121            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
122            HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key,
123            HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4",
124        )
125        middleware = ClientIPMiddleware(dummy_get_response)
126        middleware(request)
127
128        planner = FlowPlanner(flow)
129        planner.allow_empty_flows = True
130        planner.plan(request)
131
132    @patch(
133        "authentik.policies.engine.PolicyEngine.result",
134        POLICY_RETURN_FALSE,
135    )
136    def test_non_applicable_plan(self):
137        """Test that empty plan raises exception"""
138        flow = create_test_flow()
139        request = self.request_factory.get(
140            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
141        )
142
143        with self.assertRaises(FlowNonApplicableException):
144            planner = FlowPlanner(flow)
145            planner.plan(request)
146
147    @patch("authentik.flows.planner.cache", CACHE_MOCK)
148    def test_planner_cache(self):
149        """Test planner cache"""
150        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
151        FlowStageBinding.objects.create(
152            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
153        )
154        request = self.request_factory.get(
155            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
156        )
157
158        planner = FlowPlanner(flow)
159        planner.plan(request)
160        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure plan is written to cache
161        planner = FlowPlanner(flow)
162        planner.plan(request)
163        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure nothing is written to cache
164        self.assertEqual(CACHE_MOCK.get.call_count, 2)  # Get is called twice
165
166    def test_planner_default_context(self):
167        """Test planner with default_context"""
168        flow = create_test_flow()
169        FlowStageBinding.objects.create(
170            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
171        )
172
173        user = User.objects.create(username="test-user")
174        request = self.request_factory.get(
175            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
176        )
177        request.user = user
178        planner = FlowPlanner(flow)
179        planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user})
180        key = cache_key(flow, user)
181        self.assertTrue(cache.get(key) is not None)
182
183    def test_planner_marker_reevaluate(self):
184        """Test that the planner creates the proper marker"""
185        flow = create_test_flow()
186
187        FlowStageBinding.objects.create(
188            target=flow,
189            stage=DummyStage.objects.create(name=generate_id()),
190            order=0,
191            re_evaluate_policies=True,
192        )
193
194        request = self.request_factory.get(
195            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
196        )
197
198        planner = FlowPlanner(flow)
199        plan = planner.plan(request)
200
201        self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
202
203    def test_planner_reevaluate_actual(self):
204        """Test planner with re-evaluate"""
205        flow = create_test_flow()
206        false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
207
208        binding = FlowStageBinding.objects.create(
209            target=flow,
210            stage=DummyStage.objects.create(name=generate_id()),
211            order=0,
212            re_evaluate_policies=False,
213        )
214        binding2 = FlowStageBinding.objects.create(
215            target=flow,
216            stage=DummyStage.objects.create(name=generate_id()),
217            order=1,
218            re_evaluate_policies=True,
219        )
220
221        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
222
223        request = self.request_factory.get(
224            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
225        )
226
227        # Here we patch the dummy policy to evaluate to true so the stage is included
228        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
229            planner = FlowPlanner(flow)
230            plan = planner.plan(request)
231
232            self.assertEqual(plan.bindings[0], binding)
233            self.assertEqual(plan.bindings[1], binding2)
234
235            self.assertEqual(plan.markers[0].__class__, StageMarker)
236            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
237            self.assertIsInstance(plan.markers[0], StageMarker)
238            self.assertIsInstance(plan.markers[1], ReevaluateMarker)
239
240    def test_to_redirect(self):
241        """Test to_redirect and skipping the flow executor"""
242        flow = create_test_flow()
243        flow.authentication = FlowAuthenticationRequirement.NONE
244        request = self.request_factory.get(
245            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
246        )
247
248        planner = FlowPlanner(flow)
249        planner.allow_empty_flows = True
250        plan = planner.plan(request)
251        self.assertTrue(plan.requires_flow_executor())
252        self.assertEqual(
253            plan.to_redirect(request, flow).url,
254            reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}),
255        )
256
257    def test_to_redirect_skip_simple(self):
258        """Test to_redirect and skipping the flow executor"""
259        flow = create_test_flow()
260        flow.authentication = FlowAuthenticationRequirement.NONE
261        request = self.request_factory.get(
262            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
263        )
264        planner = FlowPlanner(flow)
265        planner.allow_empty_flows = True
266        plan = planner.plan(request)
267
268        class TStageView(StageView):
269            def dispatch(self, request: HttpRequest, *args, **kwargs):
270                return redirect("https://authentik.company")
271
272        plan.append_stage(in_memory_stage(TStageView))
273        self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
274        self.assertEqual(
275            plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url,
276            "https://authentik.company",
277        )
278
279    def test_to_redirect_skip_stage(self):
280        """Test to_redirect and skipping the flow executor
281        (with a stage bound that cannot be skipped)"""
282        flow = create_test_flow()
283        flow.authentication = FlowAuthenticationRequirement.NONE
284
285        FlowStageBinding.objects.create(
286            target=flow, stage=DummyStage.objects.create(name="dummy"), order=0
287        )
288
289        request = self.request_factory.get(
290            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
291        )
292        planner = FlowPlanner(flow)
293        planner.allow_empty_flows = True
294        plan = planner.plan(request)
295
296        class TStageView(StageView):
297            def dispatch(self, request: HttpRequest, *args, **kwargs):
298                return redirect("https://authentik.company")
299
300        plan.append_stage(in_memory_stage(TStageView))
301        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
302
303    def test_to_redirect_skip_policies(self):
304        """Test to_redirect and skipping the flow executor
305        (with a marker on the stage view type that can be skipped)
306
307        Note that this is not actually used anywhere in the code, all stages that are dynamically
308        added are statically added"""
309        flow = create_test_flow()
310        flow.authentication = FlowAuthenticationRequirement.NONE
311
312        request = self.request_factory.get(
313            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
314        )
315        planner = FlowPlanner(flow)
316        planner.allow_empty_flows = True
317        plan = planner.plan(request)
318
319        class TStageView(StageView):
320            def dispatch(self, request: HttpRequest, *args, **kwargs):
321                return redirect("https://authentik.company")
322
323        plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None))
324        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
POLICY_RETURN_FALSE

A mock intended to be used as a property, or other descriptor, on a class. PropertyMock provides __get__ and __set__ methods so you can specify a return value when it is fetched.

Fetching a PropertyMock instance from an object calls the mock, with no args. Setting it calls the mock with the value being set.

CACHE_MOCK = <Mock id='139707766232400'>
POLICY_RETURN_TRUE = <MagicMock id='139707766232736'>
class TestFlowPlanner(django.test.testcases.TestCase):
 50class TestFlowPlanner(TestCase):
 51    """Test planner logic"""
 52
 53    def setUp(self):
 54        self.request_factory = RequestFactory()
 55
 56    def test_empty_plan(self):
 57        """Test that empty plan raises exception"""
 58        flow = create_test_flow()
 59        request = self.request_factory.get(
 60            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 61        )
 62
 63        with self.assertRaises(EmptyFlowException):
 64            planner = FlowPlanner(flow)
 65            planner.plan(request)
 66
 67    def test_authentication(self):
 68        """Test flow authentication"""
 69        flow = create_test_flow()
 70        flow.authentication = FlowAuthenticationRequirement.NONE
 71        request = self.request_factory.get(
 72            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 73        )
 74        planner = FlowPlanner(flow)
 75        planner.allow_empty_flows = True
 76        planner.plan(request)
 77
 78        with self.assertRaises(FlowNonApplicableException):
 79            flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
 80            FlowPlanner(flow).plan(request)
 81        with self.assertRaises(FlowNonApplicableException):
 82            flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER
 83            FlowPlanner(flow).plan(request)
 84
 85        request.user = create_test_admin_user()
 86        planner = FlowPlanner(flow)
 87        planner.allow_empty_flows = True
 88        planner.plan(request)
 89
 90    def test_authentication_redirect_required(self):
 91        """Test flow authentication (redirect required)"""
 92        flow = create_test_flow()
 93        flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT
 94        request = self.request_factory.get(
 95            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 96        )
 97        planner = FlowPlanner(flow)
 98        planner.allow_empty_flows = True
 99
100        with self.assertRaises(FlowNonApplicableException):
101            planner.plan(request)
102
103        context = {}
104        context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow()
105        planner.plan(request, context)
106
107    @reconcile_app("authentik_outposts")
108    def test_authentication_outpost(self):
109        """Test flow authentication (outpost)"""
110        flow = create_test_flow()
111        flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST
112        request = self.request_factory.get(
113            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
114        )
115        with self.assertRaises(FlowNonApplicableException):
116            planner = FlowPlanner(flow)
117            planner.allow_empty_flows = True
118            planner.plan(request)
119
120        outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first()
121        request = self.request_factory.get(
122            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
123            HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key,
124            HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4",
125        )
126        middleware = ClientIPMiddleware(dummy_get_response)
127        middleware(request)
128
129        planner = FlowPlanner(flow)
130        planner.allow_empty_flows = True
131        planner.plan(request)
132
133    @patch(
134        "authentik.policies.engine.PolicyEngine.result",
135        POLICY_RETURN_FALSE,
136    )
137    def test_non_applicable_plan(self):
138        """Test that empty plan raises exception"""
139        flow = create_test_flow()
140        request = self.request_factory.get(
141            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
142        )
143
144        with self.assertRaises(FlowNonApplicableException):
145            planner = FlowPlanner(flow)
146            planner.plan(request)
147
148    @patch("authentik.flows.planner.cache", CACHE_MOCK)
149    def test_planner_cache(self):
150        """Test planner cache"""
151        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
152        FlowStageBinding.objects.create(
153            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
154        )
155        request = self.request_factory.get(
156            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
157        )
158
159        planner = FlowPlanner(flow)
160        planner.plan(request)
161        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure plan is written to cache
162        planner = FlowPlanner(flow)
163        planner.plan(request)
164        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure nothing is written to cache
165        self.assertEqual(CACHE_MOCK.get.call_count, 2)  # Get is called twice
166
167    def test_planner_default_context(self):
168        """Test planner with default_context"""
169        flow = create_test_flow()
170        FlowStageBinding.objects.create(
171            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
172        )
173
174        user = User.objects.create(username="test-user")
175        request = self.request_factory.get(
176            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
177        )
178        request.user = user
179        planner = FlowPlanner(flow)
180        planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user})
181        key = cache_key(flow, user)
182        self.assertTrue(cache.get(key) is not None)
183
184    def test_planner_marker_reevaluate(self):
185        """Test that the planner creates the proper marker"""
186        flow = create_test_flow()
187
188        FlowStageBinding.objects.create(
189            target=flow,
190            stage=DummyStage.objects.create(name=generate_id()),
191            order=0,
192            re_evaluate_policies=True,
193        )
194
195        request = self.request_factory.get(
196            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
197        )
198
199        planner = FlowPlanner(flow)
200        plan = planner.plan(request)
201
202        self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
203
204    def test_planner_reevaluate_actual(self):
205        """Test planner with re-evaluate"""
206        flow = create_test_flow()
207        false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
208
209        binding = FlowStageBinding.objects.create(
210            target=flow,
211            stage=DummyStage.objects.create(name=generate_id()),
212            order=0,
213            re_evaluate_policies=False,
214        )
215        binding2 = FlowStageBinding.objects.create(
216            target=flow,
217            stage=DummyStage.objects.create(name=generate_id()),
218            order=1,
219            re_evaluate_policies=True,
220        )
221
222        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
223
224        request = self.request_factory.get(
225            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
226        )
227
228        # Here we patch the dummy policy to evaluate to true so the stage is included
229        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
230            planner = FlowPlanner(flow)
231            plan = planner.plan(request)
232
233            self.assertEqual(plan.bindings[0], binding)
234            self.assertEqual(plan.bindings[1], binding2)
235
236            self.assertEqual(plan.markers[0].__class__, StageMarker)
237            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
238            self.assertIsInstance(plan.markers[0], StageMarker)
239            self.assertIsInstance(plan.markers[1], ReevaluateMarker)
240
241    def test_to_redirect(self):
242        """Test to_redirect and skipping the flow executor"""
243        flow = create_test_flow()
244        flow.authentication = FlowAuthenticationRequirement.NONE
245        request = self.request_factory.get(
246            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
247        )
248
249        planner = FlowPlanner(flow)
250        planner.allow_empty_flows = True
251        plan = planner.plan(request)
252        self.assertTrue(plan.requires_flow_executor())
253        self.assertEqual(
254            plan.to_redirect(request, flow).url,
255            reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}),
256        )
257
258    def test_to_redirect_skip_simple(self):
259        """Test to_redirect and skipping the flow executor"""
260        flow = create_test_flow()
261        flow.authentication = FlowAuthenticationRequirement.NONE
262        request = self.request_factory.get(
263            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
264        )
265        planner = FlowPlanner(flow)
266        planner.allow_empty_flows = True
267        plan = planner.plan(request)
268
269        class TStageView(StageView):
270            def dispatch(self, request: HttpRequest, *args, **kwargs):
271                return redirect("https://authentik.company")
272
273        plan.append_stage(in_memory_stage(TStageView))
274        self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
275        self.assertEqual(
276            plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url,
277            "https://authentik.company",
278        )
279
280    def test_to_redirect_skip_stage(self):
281        """Test to_redirect and skipping the flow executor
282        (with a stage bound that cannot be skipped)"""
283        flow = create_test_flow()
284        flow.authentication = FlowAuthenticationRequirement.NONE
285
286        FlowStageBinding.objects.create(
287            target=flow, stage=DummyStage.objects.create(name="dummy"), order=0
288        )
289
290        request = self.request_factory.get(
291            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
292        )
293        planner = FlowPlanner(flow)
294        planner.allow_empty_flows = True
295        plan = planner.plan(request)
296
297        class TStageView(StageView):
298            def dispatch(self, request: HttpRequest, *args, **kwargs):
299                return redirect("https://authentik.company")
300
301        plan.append_stage(in_memory_stage(TStageView))
302        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
303
304    def test_to_redirect_skip_policies(self):
305        """Test to_redirect and skipping the flow executor
306        (with a marker on the stage view type that can be skipped)
307
308        Note that this is not actually used anywhere in the code, all stages that are dynamically
309        added are statically added"""
310        flow = create_test_flow()
311        flow.authentication = FlowAuthenticationRequirement.NONE
312
313        request = self.request_factory.get(
314            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
315        )
316        planner = FlowPlanner(flow)
317        planner.allow_empty_flows = True
318        plan = planner.plan(request)
319
320        class TStageView(StageView):
321            def dispatch(self, request: HttpRequest, *args, **kwargs):
322                return redirect("https://authentik.company")
323
324        plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None))
325        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))

Test planner logic

def setUp(self):
53    def setUp(self):
54        self.request_factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_empty_plan(self):
56    def test_empty_plan(self):
57        """Test that empty plan raises exception"""
58        flow = create_test_flow()
59        request = self.request_factory.get(
60            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
61        )
62
63        with self.assertRaises(EmptyFlowException):
64            planner = FlowPlanner(flow)
65            planner.plan(request)

Test that empty plan raises exception

def test_authentication(self):
67    def test_authentication(self):
68        """Test flow authentication"""
69        flow = create_test_flow()
70        flow.authentication = FlowAuthenticationRequirement.NONE
71        request = self.request_factory.get(
72            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
73        )
74        planner = FlowPlanner(flow)
75        planner.allow_empty_flows = True
76        planner.plan(request)
77
78        with self.assertRaises(FlowNonApplicableException):
79            flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
80            FlowPlanner(flow).plan(request)
81        with self.assertRaises(FlowNonApplicableException):
82            flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER
83            FlowPlanner(flow).plan(request)
84
85        request.user = create_test_admin_user()
86        planner = FlowPlanner(flow)
87        planner.allow_empty_flows = True
88        planner.plan(request)

Test flow authentication

def test_authentication_redirect_required(self):
 90    def test_authentication_redirect_required(self):
 91        """Test flow authentication (redirect required)"""
 92        flow = create_test_flow()
 93        flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT
 94        request = self.request_factory.get(
 95            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 96        )
 97        planner = FlowPlanner(flow)
 98        planner.allow_empty_flows = True
 99
100        with self.assertRaises(FlowNonApplicableException):
101            planner.plan(request)
102
103        context = {}
104        context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow()
105        planner.plan(request, context)

Test flow authentication (redirect required)

@reconcile_app('authentik_outposts')
def test_authentication_outpost(self):
107    @reconcile_app("authentik_outposts")
108    def test_authentication_outpost(self):
109        """Test flow authentication (outpost)"""
110        flow = create_test_flow()
111        flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST
112        request = self.request_factory.get(
113            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
114        )
115        with self.assertRaises(FlowNonApplicableException):
116            planner = FlowPlanner(flow)
117            planner.allow_empty_flows = True
118            planner.plan(request)
119
120        outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first()
121        request = self.request_factory.get(
122            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
123            HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key,
124            HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4",
125        )
126        middleware = ClientIPMiddleware(dummy_get_response)
127        middleware(request)
128
129        planner = FlowPlanner(flow)
130        planner.allow_empty_flows = True
131        planner.plan(request)

Test flow authentication (outpost)

@patch('authentik.policies.engine.PolicyEngine.result', POLICY_RETURN_FALSE)
def test_non_applicable_plan(self):
133    @patch(
134        "authentik.policies.engine.PolicyEngine.result",
135        POLICY_RETURN_FALSE,
136    )
137    def test_non_applicable_plan(self):
138        """Test that empty plan raises exception"""
139        flow = create_test_flow()
140        request = self.request_factory.get(
141            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
142        )
143
144        with self.assertRaises(FlowNonApplicableException):
145            planner = FlowPlanner(flow)
146            planner.plan(request)

Test that empty plan raises exception

@patch('authentik.flows.planner.cache', CACHE_MOCK)
def test_planner_cache(self):
148    @patch("authentik.flows.planner.cache", CACHE_MOCK)
149    def test_planner_cache(self):
150        """Test planner cache"""
151        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
152        FlowStageBinding.objects.create(
153            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
154        )
155        request = self.request_factory.get(
156            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
157        )
158
159        planner = FlowPlanner(flow)
160        planner.plan(request)
161        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure plan is written to cache
162        planner = FlowPlanner(flow)
163        planner.plan(request)
164        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure nothing is written to cache
165        self.assertEqual(CACHE_MOCK.get.call_count, 2)  # Get is called twice

Test planner cache

def test_planner_default_context(self):
167    def test_planner_default_context(self):
168        """Test planner with default_context"""
169        flow = create_test_flow()
170        FlowStageBinding.objects.create(
171            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
172        )
173
174        user = User.objects.create(username="test-user")
175        request = self.request_factory.get(
176            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
177        )
178        request.user = user
179        planner = FlowPlanner(flow)
180        planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user})
181        key = cache_key(flow, user)
182        self.assertTrue(cache.get(key) is not None)

Test planner with default_context

def test_planner_marker_reevaluate(self):
184    def test_planner_marker_reevaluate(self):
185        """Test that the planner creates the proper marker"""
186        flow = create_test_flow()
187
188        FlowStageBinding.objects.create(
189            target=flow,
190            stage=DummyStage.objects.create(name=generate_id()),
191            order=0,
192            re_evaluate_policies=True,
193        )
194
195        request = self.request_factory.get(
196            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
197        )
198
199        planner = FlowPlanner(flow)
200        plan = planner.plan(request)
201
202        self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)

Test that the planner creates the proper marker

def test_planner_reevaluate_actual(self):
204    def test_planner_reevaluate_actual(self):
205        """Test planner with re-evaluate"""
206        flow = create_test_flow()
207        false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
208
209        binding = FlowStageBinding.objects.create(
210            target=flow,
211            stage=DummyStage.objects.create(name=generate_id()),
212            order=0,
213            re_evaluate_policies=False,
214        )
215        binding2 = FlowStageBinding.objects.create(
216            target=flow,
217            stage=DummyStage.objects.create(name=generate_id()),
218            order=1,
219            re_evaluate_policies=True,
220        )
221
222        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
223
224        request = self.request_factory.get(
225            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
226        )
227
228        # Here we patch the dummy policy to evaluate to true so the stage is included
229        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
230            planner = FlowPlanner(flow)
231            plan = planner.plan(request)
232
233            self.assertEqual(plan.bindings[0], binding)
234            self.assertEqual(plan.bindings[1], binding2)
235
236            self.assertEqual(plan.markers[0].__class__, StageMarker)
237            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
238            self.assertIsInstance(plan.markers[0], StageMarker)
239            self.assertIsInstance(plan.markers[1], ReevaluateMarker)

Test planner with re-evaluate

def test_to_redirect(self):
241    def test_to_redirect(self):
242        """Test to_redirect and skipping the flow executor"""
243        flow = create_test_flow()
244        flow.authentication = FlowAuthenticationRequirement.NONE
245        request = self.request_factory.get(
246            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
247        )
248
249        planner = FlowPlanner(flow)
250        planner.allow_empty_flows = True
251        plan = planner.plan(request)
252        self.assertTrue(plan.requires_flow_executor())
253        self.assertEqual(
254            plan.to_redirect(request, flow).url,
255            reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}),
256        )

Test to_redirect and skipping the flow executor

def test_to_redirect_skip_simple(self):
258    def test_to_redirect_skip_simple(self):
259        """Test to_redirect and skipping the flow executor"""
260        flow = create_test_flow()
261        flow.authentication = FlowAuthenticationRequirement.NONE
262        request = self.request_factory.get(
263            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
264        )
265        planner = FlowPlanner(flow)
266        planner.allow_empty_flows = True
267        plan = planner.plan(request)
268
269        class TStageView(StageView):
270            def dispatch(self, request: HttpRequest, *args, **kwargs):
271                return redirect("https://authentik.company")
272
273        plan.append_stage(in_memory_stage(TStageView))
274        self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
275        self.assertEqual(
276            plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url,
277            "https://authentik.company",
278        )

Test to_redirect and skipping the flow executor

def test_to_redirect_skip_stage(self):
280    def test_to_redirect_skip_stage(self):
281        """Test to_redirect and skipping the flow executor
282        (with a stage bound that cannot be skipped)"""
283        flow = create_test_flow()
284        flow.authentication = FlowAuthenticationRequirement.NONE
285
286        FlowStageBinding.objects.create(
287            target=flow, stage=DummyStage.objects.create(name="dummy"), order=0
288        )
289
290        request = self.request_factory.get(
291            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
292        )
293        planner = FlowPlanner(flow)
294        planner.allow_empty_flows = True
295        plan = planner.plan(request)
296
297        class TStageView(StageView):
298            def dispatch(self, request: HttpRequest, *args, **kwargs):
299                return redirect("https://authentik.company")
300
301        plan.append_stage(in_memory_stage(TStageView))
302        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))

Test to_redirect and skipping the flow executor (with a stage bound that cannot be skipped)

def test_to_redirect_skip_policies(self):
304    def test_to_redirect_skip_policies(self):
305        """Test to_redirect and skipping the flow executor
306        (with a marker on the stage view type that can be skipped)
307
308        Note that this is not actually used anywhere in the code, all stages that are dynamically
309        added are statically added"""
310        flow = create_test_flow()
311        flow.authentication = FlowAuthenticationRequirement.NONE
312
313        request = self.request_factory.get(
314            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
315        )
316        planner = FlowPlanner(flow)
317        planner.allow_empty_flows = True
318        plan = planner.plan(request)
319
320        class TStageView(StageView):
321            def dispatch(self, request: HttpRequest, *args, **kwargs):
322                return redirect("https://authentik.company")
323
324        plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None))
325        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))

Test to_redirect and skipping the flow executor (with a marker on the stage view type that can be skipped)

Note that this is not actually used anywhere in the code, all stages that are dynamically added are statically added