authentik.flows.tests.test_planner

flow planner tests

  1"""flow planner tests"""
  2
  3from unittest.mock import MagicMock, Mock, PropertyMock, patch
  4
  5from django.core.cache import cache
  6from django.http import HttpRequest
  7from django.shortcuts import redirect
  8from django.test import TestCase
  9from django.urls import reverse
 10
 11from authentik.blueprints.tests import reconcile_app
 12from authentik.core.models import User
 13from authentik.core.tests.utils import (
 14    RequestFactory,
 15    create_test_admin_user,
 16    create_test_flow,
 17    dummy_get_response,
 18)
 19from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException
 20from authentik.flows.markers import ReevaluateMarker, StageMarker
 21from authentik.flows.models import (
 22    FlowAuthenticationRequirement,
 23    FlowDesignation,
 24    FlowStageBinding,
 25    in_memory_stage,
 26)
 27from authentik.flows.planner import (
 28    PLAN_CONTEXT_IS_REDIRECTED,
 29    PLAN_CONTEXT_IS_RESTORED,
 30    PLAN_CONTEXT_PENDING_USER,
 31    FlowPlanner,
 32    cache_key,
 33)
 34from authentik.flows.stage import StageView
 35from authentik.lib.generators import generate_id
 36from authentik.outposts.apps import MANAGED_OUTPOST
 37from authentik.outposts.models import Outpost
 38from authentik.policies.dummy.models import DummyPolicy
 39from authentik.policies.models import PolicyBinding
 40from authentik.policies.types import PolicyResult
 41from authentik.root.middleware import ClientIPMiddleware
 42from authentik.stages.dummy.models import DummyStage
 43
 44POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False))
 45CACHE_MOCK = Mock(wraps=cache)
 46
 47POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True))
 48
 49
 50class TestFlowPlanner(TestCase):
 51    """Test planner logic"""
 52
 53    def setUp(self):
 54        self.request_factory = RequestFactory()
 55
 56    def test_empty_plan(self):
 57        """Test that empty plan raises exception"""
 58        flow = create_test_flow()
 59        request = self.request_factory.get(
 60            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 61        )
 62
 63        with self.assertRaises(EmptyFlowException):
 64            planner = FlowPlanner(flow)
 65            planner.plan(request)
 66
 67    def test_authentication(self):
 68        """Test flow authentication"""
 69        flow = create_test_flow()
 70        flow.authentication = FlowAuthenticationRequirement.NONE
 71        request = self.request_factory.get(
 72            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 73        )
 74        planner = FlowPlanner(flow)
 75        planner.allow_empty_flows = True
 76        planner.plan(request)
 77
 78        with self.assertRaises(FlowNonApplicableException):
 79            flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
 80            FlowPlanner(flow).plan(request)
 81        with self.assertRaises(FlowNonApplicableException):
 82            flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER
 83            FlowPlanner(flow).plan(request)
 84
 85        request.user = create_test_admin_user()
 86        planner = FlowPlanner(flow)
 87        planner.allow_empty_flows = True
 88        planner.plan(request)
 89
 90    def test_authentication_redirect_required(self):
 91        """Test flow authentication (redirect required)"""
 92        flow = create_test_flow()
 93        flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT
 94        request = self.request_factory.get(
 95            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 96        )
 97        planner = FlowPlanner(flow)
 98        planner.allow_empty_flows = True
 99
100        with self.assertRaises(FlowNonApplicableException):
101            planner.plan(request)
102
103        context = {}
104        context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow()
105        planner.plan(request, context)
106
107    @reconcile_app("authentik_outposts")
108    def test_authentication_outpost(self):
109        """Test flow authentication (outpost)"""
110        flow = create_test_flow()
111        flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST
112        request = self.request_factory.get(
113            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
114        )
115        with self.assertRaises(FlowNonApplicableException):
116            planner = FlowPlanner(flow)
117            planner.allow_empty_flows = True
118            planner.plan(request)
119
120        outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first()
121        request = self.request_factory.get(
122            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
123            HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key,
124            HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4",
125        )
126        middleware = ClientIPMiddleware(dummy_get_response)
127        middleware(request)
128
129        planner = FlowPlanner(flow)
130        planner.allow_empty_flows = True
131        planner.plan(request)
132
133    def test_authentication_require_token(self):
134        """Test flow authentication (require_token)"""
135        flow = create_test_flow()
136        flow.authentication = FlowAuthenticationRequirement.REQUIRE_TOKEN
137        request = self.request_factory.get(
138            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
139        )
140        planner = FlowPlanner(flow)
141        planner.allow_empty_flows = True
142
143        with self.assertRaises(FlowNonApplicableException):
144            planner.plan(request)
145
146        context = {PLAN_CONTEXT_IS_RESTORED: True}
147        planner.plan(request, context)
148
149    @patch(
150        "authentik.policies.engine.PolicyEngine.result",
151        POLICY_RETURN_FALSE,
152    )
153    def test_non_applicable_plan(self):
154        """Test that empty plan raises exception"""
155        flow = create_test_flow()
156        request = self.request_factory.get(
157            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
158        )
159
160        with self.assertRaises(FlowNonApplicableException):
161            planner = FlowPlanner(flow)
162            planner.plan(request)
163
164    @patch("authentik.flows.planner.cache", CACHE_MOCK)
165    def test_planner_cache(self):
166        """Test planner cache"""
167        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
168        FlowStageBinding.objects.create(
169            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
170        )
171        request = self.request_factory.get(
172            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
173        )
174
175        planner = FlowPlanner(flow)
176        planner.plan(request)
177        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure plan is written to cache
178        planner = FlowPlanner(flow)
179        planner.plan(request)
180        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure nothing is written to cache
181        self.assertEqual(CACHE_MOCK.get.call_count, 2)  # Get is called twice
182
183    def test_planner_default_context(self):
184        """Test planner with default_context"""
185        flow = create_test_flow()
186        FlowStageBinding.objects.create(
187            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
188        )
189
190        user = User.objects.create(username="test-user")
191        request = self.request_factory.get(
192            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
193        )
194        request.user = user
195        planner = FlowPlanner(flow)
196        planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user})
197        key = cache_key(flow, user)
198        self.assertTrue(cache.get(key) is not None)
199
200    def test_planner_marker_reevaluate(self):
201        """Test that the planner creates the proper marker"""
202        flow = create_test_flow()
203
204        FlowStageBinding.objects.create(
205            target=flow,
206            stage=DummyStage.objects.create(name=generate_id()),
207            order=0,
208            re_evaluate_policies=True,
209        )
210
211        request = self.request_factory.get(
212            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
213        )
214
215        planner = FlowPlanner(flow)
216        plan = planner.plan(request)
217
218        self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
219
220    def test_planner_reevaluate_actual(self):
221        """Test planner with re-evaluate"""
222        flow = create_test_flow()
223        false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
224
225        binding = FlowStageBinding.objects.create(
226            target=flow,
227            stage=DummyStage.objects.create(name=generate_id()),
228            order=0,
229            re_evaluate_policies=False,
230        )
231        binding2 = FlowStageBinding.objects.create(
232            target=flow,
233            stage=DummyStage.objects.create(name=generate_id()),
234            order=1,
235            re_evaluate_policies=True,
236        )
237
238        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
239
240        request = self.request_factory.get(
241            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
242        )
243
244        # Here we patch the dummy policy to evaluate to true so the stage is included
245        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
246            planner = FlowPlanner(flow)
247            plan = planner.plan(request)
248
249            self.assertEqual(plan.bindings[0], binding)
250            self.assertEqual(plan.bindings[1], binding2)
251
252            self.assertEqual(plan.markers[0].__class__, StageMarker)
253            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
254            self.assertIsInstance(plan.markers[0], StageMarker)
255            self.assertIsInstance(plan.markers[1], ReevaluateMarker)
256
257    def test_to_redirect(self):
258        """Test to_redirect and skipping the flow executor"""
259        flow = create_test_flow()
260        flow.authentication = FlowAuthenticationRequirement.NONE
261        request = self.request_factory.get(
262            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
263        )
264
265        planner = FlowPlanner(flow)
266        planner.allow_empty_flows = True
267        plan = planner.plan(request)
268        self.assertTrue(plan.requires_flow_executor())
269        self.assertEqual(
270            plan.to_redirect(request, flow).url,
271            reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}),
272        )
273
274    def test_to_redirect_skip_simple(self):
275        """Test to_redirect and skipping the flow executor"""
276        flow = create_test_flow()
277        flow.authentication = FlowAuthenticationRequirement.NONE
278        request = self.request_factory.get(
279            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
280        )
281        planner = FlowPlanner(flow)
282        planner.allow_empty_flows = True
283        plan = planner.plan(request)
284
285        class TStageView(StageView):
286            def dispatch(self, request: HttpRequest, *args, **kwargs):
287                return redirect("https://authentik.company")
288
289        plan.append_stage(in_memory_stage(TStageView))
290        self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
291        self.assertEqual(
292            plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url,
293            "https://authentik.company",
294        )
295
296    def test_to_redirect_skip_stage(self):
297        """Test to_redirect and skipping the flow executor
298        (with a stage bound that cannot be skipped)"""
299        flow = create_test_flow()
300        flow.authentication = FlowAuthenticationRequirement.NONE
301
302        FlowStageBinding.objects.create(
303            target=flow, stage=DummyStage.objects.create(name="dummy"), order=0
304        )
305
306        request = self.request_factory.get(
307            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
308        )
309        planner = FlowPlanner(flow)
310        planner.allow_empty_flows = True
311        plan = planner.plan(request)
312
313        class TStageView(StageView):
314            def dispatch(self, request: HttpRequest, *args, **kwargs):
315                return redirect("https://authentik.company")
316
317        plan.append_stage(in_memory_stage(TStageView))
318        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
319
320    def test_to_redirect_skip_policies(self):
321        """Test to_redirect and skipping the flow executor
322        (with a marker on the stage view type that can be skipped)
323
324        Note that this is not actually used anywhere in the code, all stages that are dynamically
325        added are statically added"""
326        flow = create_test_flow()
327        flow.authentication = FlowAuthenticationRequirement.NONE
328
329        request = self.request_factory.get(
330            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
331        )
332        planner = FlowPlanner(flow)
333        planner.allow_empty_flows = True
334        plan = planner.plan(request)
335
336        class TStageView(StageView):
337            def dispatch(self, request: HttpRequest, *args, **kwargs):
338                return redirect("https://authentik.company")
339
340        plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None))
341        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
POLICY_RETURN_FALSE

A mock intended to be used as a property, or other descriptor, on a class. PropertyMock provides __get__ and __set__ methods so you can specify a return value when it is fetched.

Fetching a PropertyMock instance from an object calls the mock, with no args. Setting it calls the mock with the value being set.

CACHE_MOCK = <Mock id='139655627173440'>
POLICY_RETURN_TRUE = <MagicMock id='139655627173776'>
class TestFlowPlanner(django.test.testcases.TestCase):
 51class TestFlowPlanner(TestCase):
 52    """Test planner logic"""
 53
 54    def setUp(self):
 55        self.request_factory = RequestFactory()
 56
 57    def test_empty_plan(self):
 58        """Test that empty plan raises exception"""
 59        flow = create_test_flow()
 60        request = self.request_factory.get(
 61            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 62        )
 63
 64        with self.assertRaises(EmptyFlowException):
 65            planner = FlowPlanner(flow)
 66            planner.plan(request)
 67
 68    def test_authentication(self):
 69        """Test flow authentication"""
 70        flow = create_test_flow()
 71        flow.authentication = FlowAuthenticationRequirement.NONE
 72        request = self.request_factory.get(
 73            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 74        )
 75        planner = FlowPlanner(flow)
 76        planner.allow_empty_flows = True
 77        planner.plan(request)
 78
 79        with self.assertRaises(FlowNonApplicableException):
 80            flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
 81            FlowPlanner(flow).plan(request)
 82        with self.assertRaises(FlowNonApplicableException):
 83            flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER
 84            FlowPlanner(flow).plan(request)
 85
 86        request.user = create_test_admin_user()
 87        planner = FlowPlanner(flow)
 88        planner.allow_empty_flows = True
 89        planner.plan(request)
 90
 91    def test_authentication_redirect_required(self):
 92        """Test flow authentication (redirect required)"""
 93        flow = create_test_flow()
 94        flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT
 95        request = self.request_factory.get(
 96            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 97        )
 98        planner = FlowPlanner(flow)
 99        planner.allow_empty_flows = True
100
101        with self.assertRaises(FlowNonApplicableException):
102            planner.plan(request)
103
104        context = {}
105        context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow()
106        planner.plan(request, context)
107
108    @reconcile_app("authentik_outposts")
109    def test_authentication_outpost(self):
110        """Test flow authentication (outpost)"""
111        flow = create_test_flow()
112        flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST
113        request = self.request_factory.get(
114            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
115        )
116        with self.assertRaises(FlowNonApplicableException):
117            planner = FlowPlanner(flow)
118            planner.allow_empty_flows = True
119            planner.plan(request)
120
121        outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first()
122        request = self.request_factory.get(
123            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
124            HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key,
125            HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4",
126        )
127        middleware = ClientIPMiddleware(dummy_get_response)
128        middleware(request)
129
130        planner = FlowPlanner(flow)
131        planner.allow_empty_flows = True
132        planner.plan(request)
133
134    def test_authentication_require_token(self):
135        """Test flow authentication (require_token)"""
136        flow = create_test_flow()
137        flow.authentication = FlowAuthenticationRequirement.REQUIRE_TOKEN
138        request = self.request_factory.get(
139            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
140        )
141        planner = FlowPlanner(flow)
142        planner.allow_empty_flows = True
143
144        with self.assertRaises(FlowNonApplicableException):
145            planner.plan(request)
146
147        context = {PLAN_CONTEXT_IS_RESTORED: True}
148        planner.plan(request, context)
149
150    @patch(
151        "authentik.policies.engine.PolicyEngine.result",
152        POLICY_RETURN_FALSE,
153    )
154    def test_non_applicable_plan(self):
155        """Test that empty plan raises exception"""
156        flow = create_test_flow()
157        request = self.request_factory.get(
158            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
159        )
160
161        with self.assertRaises(FlowNonApplicableException):
162            planner = FlowPlanner(flow)
163            planner.plan(request)
164
165    @patch("authentik.flows.planner.cache", CACHE_MOCK)
166    def test_planner_cache(self):
167        """Test planner cache"""
168        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
169        FlowStageBinding.objects.create(
170            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
171        )
172        request = self.request_factory.get(
173            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
174        )
175
176        planner = FlowPlanner(flow)
177        planner.plan(request)
178        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure plan is written to cache
179        planner = FlowPlanner(flow)
180        planner.plan(request)
181        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure nothing is written to cache
182        self.assertEqual(CACHE_MOCK.get.call_count, 2)  # Get is called twice
183
184    def test_planner_default_context(self):
185        """Test planner with default_context"""
186        flow = create_test_flow()
187        FlowStageBinding.objects.create(
188            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
189        )
190
191        user = User.objects.create(username="test-user")
192        request = self.request_factory.get(
193            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
194        )
195        request.user = user
196        planner = FlowPlanner(flow)
197        planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user})
198        key = cache_key(flow, user)
199        self.assertTrue(cache.get(key) is not None)
200
201    def test_planner_marker_reevaluate(self):
202        """Test that the planner creates the proper marker"""
203        flow = create_test_flow()
204
205        FlowStageBinding.objects.create(
206            target=flow,
207            stage=DummyStage.objects.create(name=generate_id()),
208            order=0,
209            re_evaluate_policies=True,
210        )
211
212        request = self.request_factory.get(
213            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
214        )
215
216        planner = FlowPlanner(flow)
217        plan = planner.plan(request)
218
219        self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
220
221    def test_planner_reevaluate_actual(self):
222        """Test planner with re-evaluate"""
223        flow = create_test_flow()
224        false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
225
226        binding = FlowStageBinding.objects.create(
227            target=flow,
228            stage=DummyStage.objects.create(name=generate_id()),
229            order=0,
230            re_evaluate_policies=False,
231        )
232        binding2 = FlowStageBinding.objects.create(
233            target=flow,
234            stage=DummyStage.objects.create(name=generate_id()),
235            order=1,
236            re_evaluate_policies=True,
237        )
238
239        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
240
241        request = self.request_factory.get(
242            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
243        )
244
245        # Here we patch the dummy policy to evaluate to true so the stage is included
246        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
247            planner = FlowPlanner(flow)
248            plan = planner.plan(request)
249
250            self.assertEqual(plan.bindings[0], binding)
251            self.assertEqual(plan.bindings[1], binding2)
252
253            self.assertEqual(plan.markers[0].__class__, StageMarker)
254            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
255            self.assertIsInstance(plan.markers[0], StageMarker)
256            self.assertIsInstance(plan.markers[1], ReevaluateMarker)
257
258    def test_to_redirect(self):
259        """Test to_redirect and skipping the flow executor"""
260        flow = create_test_flow()
261        flow.authentication = FlowAuthenticationRequirement.NONE
262        request = self.request_factory.get(
263            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
264        )
265
266        planner = FlowPlanner(flow)
267        planner.allow_empty_flows = True
268        plan = planner.plan(request)
269        self.assertTrue(plan.requires_flow_executor())
270        self.assertEqual(
271            plan.to_redirect(request, flow).url,
272            reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}),
273        )
274
275    def test_to_redirect_skip_simple(self):
276        """Test to_redirect and skipping the flow executor"""
277        flow = create_test_flow()
278        flow.authentication = FlowAuthenticationRequirement.NONE
279        request = self.request_factory.get(
280            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
281        )
282        planner = FlowPlanner(flow)
283        planner.allow_empty_flows = True
284        plan = planner.plan(request)
285
286        class TStageView(StageView):
287            def dispatch(self, request: HttpRequest, *args, **kwargs):
288                return redirect("https://authentik.company")
289
290        plan.append_stage(in_memory_stage(TStageView))
291        self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
292        self.assertEqual(
293            plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url,
294            "https://authentik.company",
295        )
296
297    def test_to_redirect_skip_stage(self):
298        """Test to_redirect and skipping the flow executor
299        (with a stage bound that cannot be skipped)"""
300        flow = create_test_flow()
301        flow.authentication = FlowAuthenticationRequirement.NONE
302
303        FlowStageBinding.objects.create(
304            target=flow, stage=DummyStage.objects.create(name="dummy"), order=0
305        )
306
307        request = self.request_factory.get(
308            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
309        )
310        planner = FlowPlanner(flow)
311        planner.allow_empty_flows = True
312        plan = planner.plan(request)
313
314        class TStageView(StageView):
315            def dispatch(self, request: HttpRequest, *args, **kwargs):
316                return redirect("https://authentik.company")
317
318        plan.append_stage(in_memory_stage(TStageView))
319        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
320
321    def test_to_redirect_skip_policies(self):
322        """Test to_redirect and skipping the flow executor
323        (with a marker on the stage view type that can be skipped)
324
325        Note that this is not actually used anywhere in the code, all stages that are dynamically
326        added are statically added"""
327        flow = create_test_flow()
328        flow.authentication = FlowAuthenticationRequirement.NONE
329
330        request = self.request_factory.get(
331            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
332        )
333        planner = FlowPlanner(flow)
334        planner.allow_empty_flows = True
335        plan = planner.plan(request)
336
337        class TStageView(StageView):
338            def dispatch(self, request: HttpRequest, *args, **kwargs):
339                return redirect("https://authentik.company")
340
341        plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None))
342        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))

Test planner logic

def setUp(self):
54    def setUp(self):
55        self.request_factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_empty_plan(self):
57    def test_empty_plan(self):
58        """Test that empty plan raises exception"""
59        flow = create_test_flow()
60        request = self.request_factory.get(
61            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
62        )
63
64        with self.assertRaises(EmptyFlowException):
65            planner = FlowPlanner(flow)
66            planner.plan(request)

Test that empty plan raises exception

def test_authentication(self):
68    def test_authentication(self):
69        """Test flow authentication"""
70        flow = create_test_flow()
71        flow.authentication = FlowAuthenticationRequirement.NONE
72        request = self.request_factory.get(
73            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
74        )
75        planner = FlowPlanner(flow)
76        planner.allow_empty_flows = True
77        planner.plan(request)
78
79        with self.assertRaises(FlowNonApplicableException):
80            flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
81            FlowPlanner(flow).plan(request)
82        with self.assertRaises(FlowNonApplicableException):
83            flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER
84            FlowPlanner(flow).plan(request)
85
86        request.user = create_test_admin_user()
87        planner = FlowPlanner(flow)
88        planner.allow_empty_flows = True
89        planner.plan(request)

Test flow authentication

def test_authentication_redirect_required(self):
 91    def test_authentication_redirect_required(self):
 92        """Test flow authentication (redirect required)"""
 93        flow = create_test_flow()
 94        flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT
 95        request = self.request_factory.get(
 96            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 97        )
 98        planner = FlowPlanner(flow)
 99        planner.allow_empty_flows = True
100
101        with self.assertRaises(FlowNonApplicableException):
102            planner.plan(request)
103
104        context = {}
105        context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow()
106        planner.plan(request, context)

Test flow authentication (redirect required)

@reconcile_app('authentik_outposts')
def test_authentication_outpost(self):
108    @reconcile_app("authentik_outposts")
109    def test_authentication_outpost(self):
110        """Test flow authentication (outpost)"""
111        flow = create_test_flow()
112        flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST
113        request = self.request_factory.get(
114            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
115        )
116        with self.assertRaises(FlowNonApplicableException):
117            planner = FlowPlanner(flow)
118            planner.allow_empty_flows = True
119            planner.plan(request)
120
121        outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first()
122        request = self.request_factory.get(
123            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
124            HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key,
125            HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4",
126        )
127        middleware = ClientIPMiddleware(dummy_get_response)
128        middleware(request)
129
130        planner = FlowPlanner(flow)
131        planner.allow_empty_flows = True
132        planner.plan(request)

Test flow authentication (outpost)

def test_authentication_require_token(self):
134    def test_authentication_require_token(self):
135        """Test flow authentication (require_token)"""
136        flow = create_test_flow()
137        flow.authentication = FlowAuthenticationRequirement.REQUIRE_TOKEN
138        request = self.request_factory.get(
139            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
140        )
141        planner = FlowPlanner(flow)
142        planner.allow_empty_flows = True
143
144        with self.assertRaises(FlowNonApplicableException):
145            planner.plan(request)
146
147        context = {PLAN_CONTEXT_IS_RESTORED: True}
148        planner.plan(request, context)

Test flow authentication (require_token)

@patch('authentik.policies.engine.PolicyEngine.result', POLICY_RETURN_FALSE)
def test_non_applicable_plan(self):
150    @patch(
151        "authentik.policies.engine.PolicyEngine.result",
152        POLICY_RETURN_FALSE,
153    )
154    def test_non_applicable_plan(self):
155        """Test that empty plan raises exception"""
156        flow = create_test_flow()
157        request = self.request_factory.get(
158            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
159        )
160
161        with self.assertRaises(FlowNonApplicableException):
162            planner = FlowPlanner(flow)
163            planner.plan(request)

Test that empty plan raises exception

@patch('authentik.flows.planner.cache', CACHE_MOCK)
def test_planner_cache(self):
165    @patch("authentik.flows.planner.cache", CACHE_MOCK)
166    def test_planner_cache(self):
167        """Test planner cache"""
168        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
169        FlowStageBinding.objects.create(
170            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
171        )
172        request = self.request_factory.get(
173            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
174        )
175
176        planner = FlowPlanner(flow)
177        planner.plan(request)
178        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure plan is written to cache
179        planner = FlowPlanner(flow)
180        planner.plan(request)
181        self.assertEqual(CACHE_MOCK.set.call_count, 1)  # Ensure nothing is written to cache
182        self.assertEqual(CACHE_MOCK.get.call_count, 2)  # Get is called twice

Test planner cache

def test_planner_default_context(self):
184    def test_planner_default_context(self):
185        """Test planner with default_context"""
186        flow = create_test_flow()
187        FlowStageBinding.objects.create(
188            target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0
189        )
190
191        user = User.objects.create(username="test-user")
192        request = self.request_factory.get(
193            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
194        )
195        request.user = user
196        planner = FlowPlanner(flow)
197        planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user})
198        key = cache_key(flow, user)
199        self.assertTrue(cache.get(key) is not None)

Test planner with default_context

def test_planner_marker_reevaluate(self):
201    def test_planner_marker_reevaluate(self):
202        """Test that the planner creates the proper marker"""
203        flow = create_test_flow()
204
205        FlowStageBinding.objects.create(
206            target=flow,
207            stage=DummyStage.objects.create(name=generate_id()),
208            order=0,
209            re_evaluate_policies=True,
210        )
211
212        request = self.request_factory.get(
213            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
214        )
215
216        planner = FlowPlanner(flow)
217        plan = planner.plan(request)
218
219        self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)

Test that the planner creates the proper marker

def test_planner_reevaluate_actual(self):
221    def test_planner_reevaluate_actual(self):
222        """Test planner with re-evaluate"""
223        flow = create_test_flow()
224        false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
225
226        binding = FlowStageBinding.objects.create(
227            target=flow,
228            stage=DummyStage.objects.create(name=generate_id()),
229            order=0,
230            re_evaluate_policies=False,
231        )
232        binding2 = FlowStageBinding.objects.create(
233            target=flow,
234            stage=DummyStage.objects.create(name=generate_id()),
235            order=1,
236            re_evaluate_policies=True,
237        )
238
239        PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
240
241        request = self.request_factory.get(
242            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
243        )
244
245        # Here we patch the dummy policy to evaluate to true so the stage is included
246        with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE):
247            planner = FlowPlanner(flow)
248            plan = planner.plan(request)
249
250            self.assertEqual(plan.bindings[0], binding)
251            self.assertEqual(plan.bindings[1], binding2)
252
253            self.assertEqual(plan.markers[0].__class__, StageMarker)
254            self.assertEqual(plan.markers[1].__class__, ReevaluateMarker)
255            self.assertIsInstance(plan.markers[0], StageMarker)
256            self.assertIsInstance(plan.markers[1], ReevaluateMarker)

Test planner with re-evaluate

def test_to_redirect(self):
258    def test_to_redirect(self):
259        """Test to_redirect and skipping the flow executor"""
260        flow = create_test_flow()
261        flow.authentication = FlowAuthenticationRequirement.NONE
262        request = self.request_factory.get(
263            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
264        )
265
266        planner = FlowPlanner(flow)
267        planner.allow_empty_flows = True
268        plan = planner.plan(request)
269        self.assertTrue(plan.requires_flow_executor())
270        self.assertEqual(
271            plan.to_redirect(request, flow).url,
272            reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}),
273        )

Test to_redirect and skipping the flow executor

def test_to_redirect_skip_simple(self):
275    def test_to_redirect_skip_simple(self):
276        """Test to_redirect and skipping the flow executor"""
277        flow = create_test_flow()
278        flow.authentication = FlowAuthenticationRequirement.NONE
279        request = self.request_factory.get(
280            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
281        )
282        planner = FlowPlanner(flow)
283        planner.allow_empty_flows = True
284        plan = planner.plan(request)
285
286        class TStageView(StageView):
287            def dispatch(self, request: HttpRequest, *args, **kwargs):
288                return redirect("https://authentik.company")
289
290        plan.append_stage(in_memory_stage(TStageView))
291        self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
292        self.assertEqual(
293            plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url,
294            "https://authentik.company",
295        )

Test to_redirect and skipping the flow executor

def test_to_redirect_skip_stage(self):
297    def test_to_redirect_skip_stage(self):
298        """Test to_redirect and skipping the flow executor
299        (with a stage bound that cannot be skipped)"""
300        flow = create_test_flow()
301        flow.authentication = FlowAuthenticationRequirement.NONE
302
303        FlowStageBinding.objects.create(
304            target=flow, stage=DummyStage.objects.create(name="dummy"), order=0
305        )
306
307        request = self.request_factory.get(
308            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
309        )
310        planner = FlowPlanner(flow)
311        planner.allow_empty_flows = True
312        plan = planner.plan(request)
313
314        class TStageView(StageView):
315            def dispatch(self, request: HttpRequest, *args, **kwargs):
316                return redirect("https://authentik.company")
317
318        plan.append_stage(in_memory_stage(TStageView))
319        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))

Test to_redirect and skipping the flow executor (with a stage bound that cannot be skipped)

def test_to_redirect_skip_policies(self):
321    def test_to_redirect_skip_policies(self):
322        """Test to_redirect and skipping the flow executor
323        (with a marker on the stage view type that can be skipped)
324
325        Note that this is not actually used anywhere in the code, all stages that are dynamically
326        added are statically added"""
327        flow = create_test_flow()
328        flow.authentication = FlowAuthenticationRequirement.NONE
329
330        request = self.request_factory.get(
331            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
332        )
333        planner = FlowPlanner(flow)
334        planner.allow_empty_flows = True
335        plan = planner.plan(request)
336
337        class TStageView(StageView):
338            def dispatch(self, request: HttpRequest, *args, **kwargs):
339                return redirect("https://authentik.company")
340
341        plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None))
342        self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))

Test to_redirect and skipping the flow executor (with a marker on the stage view type that can be skipped)

Note that this is not actually used anywhere in the code, all stages that are dynamically added are statically added