authentik.flows.tests.test_planner
flow planner tests
1"""flow planner tests""" 2 3from unittest.mock import MagicMock, Mock, PropertyMock, patch 4 5from django.core.cache import cache 6from django.http import HttpRequest 7from django.shortcuts import redirect 8from django.test import TestCase 9from django.urls import reverse 10 11from authentik.blueprints.tests import reconcile_app 12from authentik.core.models import User 13from authentik.core.tests.utils import ( 14 RequestFactory, 15 create_test_admin_user, 16 create_test_flow, 17 dummy_get_response, 18) 19from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException 20from authentik.flows.markers import ReevaluateMarker, StageMarker 21from authentik.flows.models import ( 22 FlowAuthenticationRequirement, 23 FlowDesignation, 24 FlowStageBinding, 25 in_memory_stage, 26) 27from authentik.flows.planner import ( 28 PLAN_CONTEXT_IS_REDIRECTED, 29 PLAN_CONTEXT_IS_RESTORED, 30 PLAN_CONTEXT_PENDING_USER, 31 FlowPlanner, 32 cache_key, 33) 34from authentik.flows.stage import StageView 35from authentik.lib.generators import generate_id 36from authentik.outposts.apps import MANAGED_OUTPOST 37from authentik.outposts.models import Outpost 38from authentik.policies.dummy.models import DummyPolicy 39from authentik.policies.models import PolicyBinding 40from authentik.policies.types import PolicyResult 41from authentik.root.middleware import ClientIPMiddleware 42from authentik.stages.dummy.models import DummyStage 43 44POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False)) 45CACHE_MOCK = Mock(wraps=cache) 46 47POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True)) 48 49 50class TestFlowPlanner(TestCase): 51 """Test planner logic""" 52 53 def setUp(self): 54 self.request_factory = RequestFactory() 55 56 def test_empty_plan(self): 57 """Test that empty plan raises exception""" 58 flow = create_test_flow() 59 request = self.request_factory.get( 60 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 61 ) 62 63 with self.assertRaises(EmptyFlowException): 64 planner = FlowPlanner(flow) 65 planner.plan(request) 66 67 def test_authentication(self): 68 """Test flow authentication""" 69 flow = create_test_flow() 70 flow.authentication = FlowAuthenticationRequirement.NONE 71 request = self.request_factory.get( 72 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 73 ) 74 planner = FlowPlanner(flow) 75 planner.allow_empty_flows = True 76 planner.plan(request) 77 78 with self.assertRaises(FlowNonApplicableException): 79 flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 80 FlowPlanner(flow).plan(request) 81 with self.assertRaises(FlowNonApplicableException): 82 flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER 83 FlowPlanner(flow).plan(request) 84 85 request.user = create_test_admin_user() 86 planner = FlowPlanner(flow) 87 planner.allow_empty_flows = True 88 planner.plan(request) 89 90 def test_authentication_redirect_required(self): 91 """Test flow authentication (redirect required)""" 92 flow = create_test_flow() 93 flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT 94 request = self.request_factory.get( 95 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 96 ) 97 planner = FlowPlanner(flow) 98 planner.allow_empty_flows = True 99 100 with self.assertRaises(FlowNonApplicableException): 101 planner.plan(request) 102 103 context = {} 104 context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow() 105 planner.plan(request, context) 106 107 @reconcile_app("authentik_outposts") 108 def test_authentication_outpost(self): 109 """Test flow authentication (outpost)""" 110 flow = create_test_flow() 111 flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST 112 request = self.request_factory.get( 113 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 114 ) 115 with self.assertRaises(FlowNonApplicableException): 116 planner = FlowPlanner(flow) 117 planner.allow_empty_flows = True 118 planner.plan(request) 119 120 outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first() 121 request = self.request_factory.get( 122 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 123 HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key, 124 HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4", 125 ) 126 middleware = ClientIPMiddleware(dummy_get_response) 127 middleware(request) 128 129 planner = FlowPlanner(flow) 130 planner.allow_empty_flows = True 131 planner.plan(request) 132 133 def test_authentication_require_token(self): 134 """Test flow authentication (require_token)""" 135 flow = create_test_flow() 136 flow.authentication = FlowAuthenticationRequirement.REQUIRE_TOKEN 137 request = self.request_factory.get( 138 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 139 ) 140 planner = FlowPlanner(flow) 141 planner.allow_empty_flows = True 142 143 with self.assertRaises(FlowNonApplicableException): 144 planner.plan(request) 145 146 context = {PLAN_CONTEXT_IS_RESTORED: True} 147 planner.plan(request, context) 148 149 @patch( 150 "authentik.policies.engine.PolicyEngine.result", 151 POLICY_RETURN_FALSE, 152 ) 153 def test_non_applicable_plan(self): 154 """Test that empty plan raises exception""" 155 flow = create_test_flow() 156 request = self.request_factory.get( 157 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 158 ) 159 160 with self.assertRaises(FlowNonApplicableException): 161 planner = FlowPlanner(flow) 162 planner.plan(request) 163 164 @patch("authentik.flows.planner.cache", CACHE_MOCK) 165 def test_planner_cache(self): 166 """Test planner cache""" 167 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 168 FlowStageBinding.objects.create( 169 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 170 ) 171 request = self.request_factory.get( 172 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 173 ) 174 175 planner = FlowPlanner(flow) 176 planner.plan(request) 177 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure plan is written to cache 178 planner = FlowPlanner(flow) 179 planner.plan(request) 180 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure nothing is written to cache 181 self.assertEqual(CACHE_MOCK.get.call_count, 2) # Get is called twice 182 183 def test_planner_default_context(self): 184 """Test planner with default_context""" 185 flow = create_test_flow() 186 FlowStageBinding.objects.create( 187 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 188 ) 189 190 user = User.objects.create(username="test-user") 191 request = self.request_factory.get( 192 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 193 ) 194 request.user = user 195 planner = FlowPlanner(flow) 196 planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user}) 197 key = cache_key(flow, user) 198 self.assertTrue(cache.get(key) is not None) 199 200 def test_planner_marker_reevaluate(self): 201 """Test that the planner creates the proper marker""" 202 flow = create_test_flow() 203 204 FlowStageBinding.objects.create( 205 target=flow, 206 stage=DummyStage.objects.create(name=generate_id()), 207 order=0, 208 re_evaluate_policies=True, 209 ) 210 211 request = self.request_factory.get( 212 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 213 ) 214 215 planner = FlowPlanner(flow) 216 plan = planner.plan(request) 217 218 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 219 220 def test_planner_reevaluate_actual(self): 221 """Test planner with re-evaluate""" 222 flow = create_test_flow() 223 false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2) 224 225 binding = FlowStageBinding.objects.create( 226 target=flow, 227 stage=DummyStage.objects.create(name=generate_id()), 228 order=0, 229 re_evaluate_policies=False, 230 ) 231 binding2 = FlowStageBinding.objects.create( 232 target=flow, 233 stage=DummyStage.objects.create(name=generate_id()), 234 order=1, 235 re_evaluate_policies=True, 236 ) 237 238 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 239 240 request = self.request_factory.get( 241 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 242 ) 243 244 # Here we patch the dummy policy to evaluate to true so the stage is included 245 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 246 planner = FlowPlanner(flow) 247 plan = planner.plan(request) 248 249 self.assertEqual(plan.bindings[0], binding) 250 self.assertEqual(plan.bindings[1], binding2) 251 252 self.assertEqual(plan.markers[0].__class__, StageMarker) 253 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 254 self.assertIsInstance(plan.markers[0], StageMarker) 255 self.assertIsInstance(plan.markers[1], ReevaluateMarker) 256 257 def test_to_redirect(self): 258 """Test to_redirect and skipping the flow executor""" 259 flow = create_test_flow() 260 flow.authentication = FlowAuthenticationRequirement.NONE 261 request = self.request_factory.get( 262 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 263 ) 264 265 planner = FlowPlanner(flow) 266 planner.allow_empty_flows = True 267 plan = planner.plan(request) 268 self.assertTrue(plan.requires_flow_executor()) 269 self.assertEqual( 270 plan.to_redirect(request, flow).url, 271 reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}), 272 ) 273 274 def test_to_redirect_skip_simple(self): 275 """Test to_redirect and skipping the flow executor""" 276 flow = create_test_flow() 277 flow.authentication = FlowAuthenticationRequirement.NONE 278 request = self.request_factory.get( 279 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 280 ) 281 planner = FlowPlanner(flow) 282 planner.allow_empty_flows = True 283 plan = planner.plan(request) 284 285 class TStageView(StageView): 286 def dispatch(self, request: HttpRequest, *args, **kwargs): 287 return redirect("https://authentik.company") 288 289 plan.append_stage(in_memory_stage(TStageView)) 290 self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 291 self.assertEqual( 292 plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url, 293 "https://authentik.company", 294 ) 295 296 def test_to_redirect_skip_stage(self): 297 """Test to_redirect and skipping the flow executor 298 (with a stage bound that cannot be skipped)""" 299 flow = create_test_flow() 300 flow.authentication = FlowAuthenticationRequirement.NONE 301 302 FlowStageBinding.objects.create( 303 target=flow, stage=DummyStage.objects.create(name="dummy"), order=0 304 ) 305 306 request = self.request_factory.get( 307 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 308 ) 309 planner = FlowPlanner(flow) 310 planner.allow_empty_flows = True 311 plan = planner.plan(request) 312 313 class TStageView(StageView): 314 def dispatch(self, request: HttpRequest, *args, **kwargs): 315 return redirect("https://authentik.company") 316 317 plan.append_stage(in_memory_stage(TStageView)) 318 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 319 320 def test_to_redirect_skip_policies(self): 321 """Test to_redirect and skipping the flow executor 322 (with a marker on the stage view type that can be skipped) 323 324 Note that this is not actually used anywhere in the code, all stages that are dynamically 325 added are statically added""" 326 flow = create_test_flow() 327 flow.authentication = FlowAuthenticationRequirement.NONE 328 329 request = self.request_factory.get( 330 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 331 ) 332 planner = FlowPlanner(flow) 333 planner.allow_empty_flows = True 334 plan = planner.plan(request) 335 336 class TStageView(StageView): 337 def dispatch(self, request: HttpRequest, *args, **kwargs): 338 return redirect("https://authentik.company") 339 340 plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None)) 341 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
A mock intended to be used as a property, or other descriptor, on a class.
PropertyMock provides __get__ and __set__ methods so you can specify
a return value when it is fetched.
Fetching a PropertyMock instance from an object calls the mock, with
no args. Setting it calls the mock with the value being set.
51class TestFlowPlanner(TestCase): 52 """Test planner logic""" 53 54 def setUp(self): 55 self.request_factory = RequestFactory() 56 57 def test_empty_plan(self): 58 """Test that empty plan raises exception""" 59 flow = create_test_flow() 60 request = self.request_factory.get( 61 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 62 ) 63 64 with self.assertRaises(EmptyFlowException): 65 planner = FlowPlanner(flow) 66 planner.plan(request) 67 68 def test_authentication(self): 69 """Test flow authentication""" 70 flow = create_test_flow() 71 flow.authentication = FlowAuthenticationRequirement.NONE 72 request = self.request_factory.get( 73 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 74 ) 75 planner = FlowPlanner(flow) 76 planner.allow_empty_flows = True 77 planner.plan(request) 78 79 with self.assertRaises(FlowNonApplicableException): 80 flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 81 FlowPlanner(flow).plan(request) 82 with self.assertRaises(FlowNonApplicableException): 83 flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER 84 FlowPlanner(flow).plan(request) 85 86 request.user = create_test_admin_user() 87 planner = FlowPlanner(flow) 88 planner.allow_empty_flows = True 89 planner.plan(request) 90 91 def test_authentication_redirect_required(self): 92 """Test flow authentication (redirect required)""" 93 flow = create_test_flow() 94 flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT 95 request = self.request_factory.get( 96 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 97 ) 98 planner = FlowPlanner(flow) 99 planner.allow_empty_flows = True 100 101 with self.assertRaises(FlowNonApplicableException): 102 planner.plan(request) 103 104 context = {} 105 context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow() 106 planner.plan(request, context) 107 108 @reconcile_app("authentik_outposts") 109 def test_authentication_outpost(self): 110 """Test flow authentication (outpost)""" 111 flow = create_test_flow() 112 flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST 113 request = self.request_factory.get( 114 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 115 ) 116 with self.assertRaises(FlowNonApplicableException): 117 planner = FlowPlanner(flow) 118 planner.allow_empty_flows = True 119 planner.plan(request) 120 121 outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first() 122 request = self.request_factory.get( 123 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 124 HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key, 125 HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4", 126 ) 127 middleware = ClientIPMiddleware(dummy_get_response) 128 middleware(request) 129 130 planner = FlowPlanner(flow) 131 planner.allow_empty_flows = True 132 planner.plan(request) 133 134 def test_authentication_require_token(self): 135 """Test flow authentication (require_token)""" 136 flow = create_test_flow() 137 flow.authentication = FlowAuthenticationRequirement.REQUIRE_TOKEN 138 request = self.request_factory.get( 139 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 140 ) 141 planner = FlowPlanner(flow) 142 planner.allow_empty_flows = True 143 144 with self.assertRaises(FlowNonApplicableException): 145 planner.plan(request) 146 147 context = {PLAN_CONTEXT_IS_RESTORED: True} 148 planner.plan(request, context) 149 150 @patch( 151 "authentik.policies.engine.PolicyEngine.result", 152 POLICY_RETURN_FALSE, 153 ) 154 def test_non_applicable_plan(self): 155 """Test that empty plan raises exception""" 156 flow = create_test_flow() 157 request = self.request_factory.get( 158 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 159 ) 160 161 with self.assertRaises(FlowNonApplicableException): 162 planner = FlowPlanner(flow) 163 planner.plan(request) 164 165 @patch("authentik.flows.planner.cache", CACHE_MOCK) 166 def test_planner_cache(self): 167 """Test planner cache""" 168 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 169 FlowStageBinding.objects.create( 170 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 171 ) 172 request = self.request_factory.get( 173 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 174 ) 175 176 planner = FlowPlanner(flow) 177 planner.plan(request) 178 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure plan is written to cache 179 planner = FlowPlanner(flow) 180 planner.plan(request) 181 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure nothing is written to cache 182 self.assertEqual(CACHE_MOCK.get.call_count, 2) # Get is called twice 183 184 def test_planner_default_context(self): 185 """Test planner with default_context""" 186 flow = create_test_flow() 187 FlowStageBinding.objects.create( 188 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 189 ) 190 191 user = User.objects.create(username="test-user") 192 request = self.request_factory.get( 193 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 194 ) 195 request.user = user 196 planner = FlowPlanner(flow) 197 planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user}) 198 key = cache_key(flow, user) 199 self.assertTrue(cache.get(key) is not None) 200 201 def test_planner_marker_reevaluate(self): 202 """Test that the planner creates the proper marker""" 203 flow = create_test_flow() 204 205 FlowStageBinding.objects.create( 206 target=flow, 207 stage=DummyStage.objects.create(name=generate_id()), 208 order=0, 209 re_evaluate_policies=True, 210 ) 211 212 request = self.request_factory.get( 213 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 214 ) 215 216 planner = FlowPlanner(flow) 217 plan = planner.plan(request) 218 219 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 220 221 def test_planner_reevaluate_actual(self): 222 """Test planner with re-evaluate""" 223 flow = create_test_flow() 224 false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2) 225 226 binding = FlowStageBinding.objects.create( 227 target=flow, 228 stage=DummyStage.objects.create(name=generate_id()), 229 order=0, 230 re_evaluate_policies=False, 231 ) 232 binding2 = FlowStageBinding.objects.create( 233 target=flow, 234 stage=DummyStage.objects.create(name=generate_id()), 235 order=1, 236 re_evaluate_policies=True, 237 ) 238 239 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 240 241 request = self.request_factory.get( 242 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 243 ) 244 245 # Here we patch the dummy policy to evaluate to true so the stage is included 246 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 247 planner = FlowPlanner(flow) 248 plan = planner.plan(request) 249 250 self.assertEqual(plan.bindings[0], binding) 251 self.assertEqual(plan.bindings[1], binding2) 252 253 self.assertEqual(plan.markers[0].__class__, StageMarker) 254 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 255 self.assertIsInstance(plan.markers[0], StageMarker) 256 self.assertIsInstance(plan.markers[1], ReevaluateMarker) 257 258 def test_to_redirect(self): 259 """Test to_redirect and skipping the flow executor""" 260 flow = create_test_flow() 261 flow.authentication = FlowAuthenticationRequirement.NONE 262 request = self.request_factory.get( 263 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 264 ) 265 266 planner = FlowPlanner(flow) 267 planner.allow_empty_flows = True 268 plan = planner.plan(request) 269 self.assertTrue(plan.requires_flow_executor()) 270 self.assertEqual( 271 plan.to_redirect(request, flow).url, 272 reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}), 273 ) 274 275 def test_to_redirect_skip_simple(self): 276 """Test to_redirect and skipping the flow executor""" 277 flow = create_test_flow() 278 flow.authentication = FlowAuthenticationRequirement.NONE 279 request = self.request_factory.get( 280 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 281 ) 282 planner = FlowPlanner(flow) 283 planner.allow_empty_flows = True 284 plan = planner.plan(request) 285 286 class TStageView(StageView): 287 def dispatch(self, request: HttpRequest, *args, **kwargs): 288 return redirect("https://authentik.company") 289 290 plan.append_stage(in_memory_stage(TStageView)) 291 self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 292 self.assertEqual( 293 plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url, 294 "https://authentik.company", 295 ) 296 297 def test_to_redirect_skip_stage(self): 298 """Test to_redirect and skipping the flow executor 299 (with a stage bound that cannot be skipped)""" 300 flow = create_test_flow() 301 flow.authentication = FlowAuthenticationRequirement.NONE 302 303 FlowStageBinding.objects.create( 304 target=flow, stage=DummyStage.objects.create(name="dummy"), order=0 305 ) 306 307 request = self.request_factory.get( 308 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 309 ) 310 planner = FlowPlanner(flow) 311 planner.allow_empty_flows = True 312 plan = planner.plan(request) 313 314 class TStageView(StageView): 315 def dispatch(self, request: HttpRequest, *args, **kwargs): 316 return redirect("https://authentik.company") 317 318 plan.append_stage(in_memory_stage(TStageView)) 319 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 320 321 def test_to_redirect_skip_policies(self): 322 """Test to_redirect and skipping the flow executor 323 (with a marker on the stage view type that can be skipped) 324 325 Note that this is not actually used anywhere in the code, all stages that are dynamically 326 added are statically added""" 327 flow = create_test_flow() 328 flow.authentication = FlowAuthenticationRequirement.NONE 329 330 request = self.request_factory.get( 331 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 332 ) 333 planner = FlowPlanner(flow) 334 planner.allow_empty_flows = True 335 plan = planner.plan(request) 336 337 class TStageView(StageView): 338 def dispatch(self, request: HttpRequest, *args, **kwargs): 339 return redirect("https://authentik.company") 340 341 plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None)) 342 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
Test planner logic
57 def test_empty_plan(self): 58 """Test that empty plan raises exception""" 59 flow = create_test_flow() 60 request = self.request_factory.get( 61 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 62 ) 63 64 with self.assertRaises(EmptyFlowException): 65 planner = FlowPlanner(flow) 66 planner.plan(request)
Test that empty plan raises exception
68 def test_authentication(self): 69 """Test flow authentication""" 70 flow = create_test_flow() 71 flow.authentication = FlowAuthenticationRequirement.NONE 72 request = self.request_factory.get( 73 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 74 ) 75 planner = FlowPlanner(flow) 76 planner.allow_empty_flows = True 77 planner.plan(request) 78 79 with self.assertRaises(FlowNonApplicableException): 80 flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 81 FlowPlanner(flow).plan(request) 82 with self.assertRaises(FlowNonApplicableException): 83 flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER 84 FlowPlanner(flow).plan(request) 85 86 request.user = create_test_admin_user() 87 planner = FlowPlanner(flow) 88 planner.allow_empty_flows = True 89 planner.plan(request)
Test flow authentication
91 def test_authentication_redirect_required(self): 92 """Test flow authentication (redirect required)""" 93 flow = create_test_flow() 94 flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT 95 request = self.request_factory.get( 96 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 97 ) 98 planner = FlowPlanner(flow) 99 planner.allow_empty_flows = True 100 101 with self.assertRaises(FlowNonApplicableException): 102 planner.plan(request) 103 104 context = {} 105 context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow() 106 planner.plan(request, context)
Test flow authentication (redirect required)
108 @reconcile_app("authentik_outposts") 109 def test_authentication_outpost(self): 110 """Test flow authentication (outpost)""" 111 flow = create_test_flow() 112 flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST 113 request = self.request_factory.get( 114 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 115 ) 116 with self.assertRaises(FlowNonApplicableException): 117 planner = FlowPlanner(flow) 118 planner.allow_empty_flows = True 119 planner.plan(request) 120 121 outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first() 122 request = self.request_factory.get( 123 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 124 HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key, 125 HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4", 126 ) 127 middleware = ClientIPMiddleware(dummy_get_response) 128 middleware(request) 129 130 planner = FlowPlanner(flow) 131 planner.allow_empty_flows = True 132 planner.plan(request)
Test flow authentication (outpost)
134 def test_authentication_require_token(self): 135 """Test flow authentication (require_token)""" 136 flow = create_test_flow() 137 flow.authentication = FlowAuthenticationRequirement.REQUIRE_TOKEN 138 request = self.request_factory.get( 139 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 140 ) 141 planner = FlowPlanner(flow) 142 planner.allow_empty_flows = True 143 144 with self.assertRaises(FlowNonApplicableException): 145 planner.plan(request) 146 147 context = {PLAN_CONTEXT_IS_RESTORED: True} 148 planner.plan(request, context)
Test flow authentication (require_token)
150 @patch( 151 "authentik.policies.engine.PolicyEngine.result", 152 POLICY_RETURN_FALSE, 153 ) 154 def test_non_applicable_plan(self): 155 """Test that empty plan raises exception""" 156 flow = create_test_flow() 157 request = self.request_factory.get( 158 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 159 ) 160 161 with self.assertRaises(FlowNonApplicableException): 162 planner = FlowPlanner(flow) 163 planner.plan(request)
Test that empty plan raises exception
165 @patch("authentik.flows.planner.cache", CACHE_MOCK) 166 def test_planner_cache(self): 167 """Test planner cache""" 168 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 169 FlowStageBinding.objects.create( 170 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 171 ) 172 request = self.request_factory.get( 173 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 174 ) 175 176 planner = FlowPlanner(flow) 177 planner.plan(request) 178 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure plan is written to cache 179 planner = FlowPlanner(flow) 180 planner.plan(request) 181 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure nothing is written to cache 182 self.assertEqual(CACHE_MOCK.get.call_count, 2) # Get is called twice
Test planner cache
184 def test_planner_default_context(self): 185 """Test planner with default_context""" 186 flow = create_test_flow() 187 FlowStageBinding.objects.create( 188 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 189 ) 190 191 user = User.objects.create(username="test-user") 192 request = self.request_factory.get( 193 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 194 ) 195 request.user = user 196 planner = FlowPlanner(flow) 197 planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user}) 198 key = cache_key(flow, user) 199 self.assertTrue(cache.get(key) is not None)
Test planner with default_context
201 def test_planner_marker_reevaluate(self): 202 """Test that the planner creates the proper marker""" 203 flow = create_test_flow() 204 205 FlowStageBinding.objects.create( 206 target=flow, 207 stage=DummyStage.objects.create(name=generate_id()), 208 order=0, 209 re_evaluate_policies=True, 210 ) 211 212 request = self.request_factory.get( 213 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 214 ) 215 216 planner = FlowPlanner(flow) 217 plan = planner.plan(request) 218 219 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
Test that the planner creates the proper marker
221 def test_planner_reevaluate_actual(self): 222 """Test planner with re-evaluate""" 223 flow = create_test_flow() 224 false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2) 225 226 binding = FlowStageBinding.objects.create( 227 target=flow, 228 stage=DummyStage.objects.create(name=generate_id()), 229 order=0, 230 re_evaluate_policies=False, 231 ) 232 binding2 = FlowStageBinding.objects.create( 233 target=flow, 234 stage=DummyStage.objects.create(name=generate_id()), 235 order=1, 236 re_evaluate_policies=True, 237 ) 238 239 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 240 241 request = self.request_factory.get( 242 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 243 ) 244 245 # Here we patch the dummy policy to evaluate to true so the stage is included 246 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 247 planner = FlowPlanner(flow) 248 plan = planner.plan(request) 249 250 self.assertEqual(plan.bindings[0], binding) 251 self.assertEqual(plan.bindings[1], binding2) 252 253 self.assertEqual(plan.markers[0].__class__, StageMarker) 254 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 255 self.assertIsInstance(plan.markers[0], StageMarker) 256 self.assertIsInstance(plan.markers[1], ReevaluateMarker)
Test planner with re-evaluate
258 def test_to_redirect(self): 259 """Test to_redirect and skipping the flow executor""" 260 flow = create_test_flow() 261 flow.authentication = FlowAuthenticationRequirement.NONE 262 request = self.request_factory.get( 263 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 264 ) 265 266 planner = FlowPlanner(flow) 267 planner.allow_empty_flows = True 268 plan = planner.plan(request) 269 self.assertTrue(plan.requires_flow_executor()) 270 self.assertEqual( 271 plan.to_redirect(request, flow).url, 272 reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}), 273 )
Test to_redirect and skipping the flow executor
275 def test_to_redirect_skip_simple(self): 276 """Test to_redirect and skipping the flow executor""" 277 flow = create_test_flow() 278 flow.authentication = FlowAuthenticationRequirement.NONE 279 request = self.request_factory.get( 280 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 281 ) 282 planner = FlowPlanner(flow) 283 planner.allow_empty_flows = True 284 plan = planner.plan(request) 285 286 class TStageView(StageView): 287 def dispatch(self, request: HttpRequest, *args, **kwargs): 288 return redirect("https://authentik.company") 289 290 plan.append_stage(in_memory_stage(TStageView)) 291 self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 292 self.assertEqual( 293 plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url, 294 "https://authentik.company", 295 )
Test to_redirect and skipping the flow executor
297 def test_to_redirect_skip_stage(self): 298 """Test to_redirect and skipping the flow executor 299 (with a stage bound that cannot be skipped)""" 300 flow = create_test_flow() 301 flow.authentication = FlowAuthenticationRequirement.NONE 302 303 FlowStageBinding.objects.create( 304 target=flow, stage=DummyStage.objects.create(name="dummy"), order=0 305 ) 306 307 request = self.request_factory.get( 308 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 309 ) 310 planner = FlowPlanner(flow) 311 planner.allow_empty_flows = True 312 plan = planner.plan(request) 313 314 class TStageView(StageView): 315 def dispatch(self, request: HttpRequest, *args, **kwargs): 316 return redirect("https://authentik.company") 317 318 plan.append_stage(in_memory_stage(TStageView)) 319 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
Test to_redirect and skipping the flow executor (with a stage bound that cannot be skipped)
321 def test_to_redirect_skip_policies(self): 322 """Test to_redirect and skipping the flow executor 323 (with a marker on the stage view type that can be skipped) 324 325 Note that this is not actually used anywhere in the code, all stages that are dynamically 326 added are statically added""" 327 flow = create_test_flow() 328 flow.authentication = FlowAuthenticationRequirement.NONE 329 330 request = self.request_factory.get( 331 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 332 ) 333 planner = FlowPlanner(flow) 334 planner.allow_empty_flows = True 335 plan = planner.plan(request) 336 337 class TStageView(StageView): 338 def dispatch(self, request: HttpRequest, *args, **kwargs): 339 return redirect("https://authentik.company") 340 341 plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None)) 342 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
Test to_redirect and skipping the flow executor (with a marker on the stage view type that can be skipped)
Note that this is not actually used anywhere in the code, all stages that are dynamically added are statically added