authentik.flows.tests.test_planner
flow planner tests
1"""flow planner tests""" 2 3from unittest.mock import MagicMock, Mock, PropertyMock, patch 4 5from django.core.cache import cache 6from django.http import HttpRequest 7from django.shortcuts import redirect 8from django.test import TestCase 9from django.urls import reverse 10 11from authentik.blueprints.tests import reconcile_app 12from authentik.core.models import User 13from authentik.core.tests.utils import ( 14 RequestFactory, 15 create_test_admin_user, 16 create_test_flow, 17 dummy_get_response, 18) 19from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException 20from authentik.flows.markers import ReevaluateMarker, StageMarker 21from authentik.flows.models import ( 22 FlowAuthenticationRequirement, 23 FlowDesignation, 24 FlowStageBinding, 25 in_memory_stage, 26) 27from authentik.flows.planner import ( 28 PLAN_CONTEXT_IS_REDIRECTED, 29 PLAN_CONTEXT_PENDING_USER, 30 FlowPlanner, 31 cache_key, 32) 33from authentik.flows.stage import StageView 34from authentik.lib.generators import generate_id 35from authentik.outposts.apps import MANAGED_OUTPOST 36from authentik.outposts.models import Outpost 37from authentik.policies.dummy.models import DummyPolicy 38from authentik.policies.models import PolicyBinding 39from authentik.policies.types import PolicyResult 40from authentik.root.middleware import ClientIPMiddleware 41from authentik.stages.dummy.models import DummyStage 42 43POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False)) 44CACHE_MOCK = Mock(wraps=cache) 45 46POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True)) 47 48 49class TestFlowPlanner(TestCase): 50 """Test planner logic""" 51 52 def setUp(self): 53 self.request_factory = RequestFactory() 54 55 def test_empty_plan(self): 56 """Test that empty plan raises exception""" 57 flow = create_test_flow() 58 request = self.request_factory.get( 59 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 60 ) 61 62 with self.assertRaises(EmptyFlowException): 63 planner = FlowPlanner(flow) 64 planner.plan(request) 65 66 def test_authentication(self): 67 """Test flow authentication""" 68 flow = create_test_flow() 69 flow.authentication = FlowAuthenticationRequirement.NONE 70 request = self.request_factory.get( 71 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 72 ) 73 planner = FlowPlanner(flow) 74 planner.allow_empty_flows = True 75 planner.plan(request) 76 77 with self.assertRaises(FlowNonApplicableException): 78 flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 79 FlowPlanner(flow).plan(request) 80 with self.assertRaises(FlowNonApplicableException): 81 flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER 82 FlowPlanner(flow).plan(request) 83 84 request.user = create_test_admin_user() 85 planner = FlowPlanner(flow) 86 planner.allow_empty_flows = True 87 planner.plan(request) 88 89 def test_authentication_redirect_required(self): 90 """Test flow authentication (redirect required)""" 91 flow = create_test_flow() 92 flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT 93 request = self.request_factory.get( 94 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 95 ) 96 planner = FlowPlanner(flow) 97 planner.allow_empty_flows = True 98 99 with self.assertRaises(FlowNonApplicableException): 100 planner.plan(request) 101 102 context = {} 103 context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow() 104 planner.plan(request, context) 105 106 @reconcile_app("authentik_outposts") 107 def test_authentication_outpost(self): 108 """Test flow authentication (outpost)""" 109 flow = create_test_flow() 110 flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST 111 request = self.request_factory.get( 112 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 113 ) 114 with self.assertRaises(FlowNonApplicableException): 115 planner = FlowPlanner(flow) 116 planner.allow_empty_flows = True 117 planner.plan(request) 118 119 outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first() 120 request = self.request_factory.get( 121 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 122 HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key, 123 HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4", 124 ) 125 middleware = ClientIPMiddleware(dummy_get_response) 126 middleware(request) 127 128 planner = FlowPlanner(flow) 129 planner.allow_empty_flows = True 130 planner.plan(request) 131 132 @patch( 133 "authentik.policies.engine.PolicyEngine.result", 134 POLICY_RETURN_FALSE, 135 ) 136 def test_non_applicable_plan(self): 137 """Test that empty plan raises exception""" 138 flow = create_test_flow() 139 request = self.request_factory.get( 140 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 141 ) 142 143 with self.assertRaises(FlowNonApplicableException): 144 planner = FlowPlanner(flow) 145 planner.plan(request) 146 147 @patch("authentik.flows.planner.cache", CACHE_MOCK) 148 def test_planner_cache(self): 149 """Test planner cache""" 150 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 151 FlowStageBinding.objects.create( 152 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 153 ) 154 request = self.request_factory.get( 155 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 156 ) 157 158 planner = FlowPlanner(flow) 159 planner.plan(request) 160 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure plan is written to cache 161 planner = FlowPlanner(flow) 162 planner.plan(request) 163 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure nothing is written to cache 164 self.assertEqual(CACHE_MOCK.get.call_count, 2) # Get is called twice 165 166 def test_planner_default_context(self): 167 """Test planner with default_context""" 168 flow = create_test_flow() 169 FlowStageBinding.objects.create( 170 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 171 ) 172 173 user = User.objects.create(username="test-user") 174 request = self.request_factory.get( 175 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 176 ) 177 request.user = user 178 planner = FlowPlanner(flow) 179 planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user}) 180 key = cache_key(flow, user) 181 self.assertTrue(cache.get(key) is not None) 182 183 def test_planner_marker_reevaluate(self): 184 """Test that the planner creates the proper marker""" 185 flow = create_test_flow() 186 187 FlowStageBinding.objects.create( 188 target=flow, 189 stage=DummyStage.objects.create(name=generate_id()), 190 order=0, 191 re_evaluate_policies=True, 192 ) 193 194 request = self.request_factory.get( 195 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 196 ) 197 198 planner = FlowPlanner(flow) 199 plan = planner.plan(request) 200 201 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 202 203 def test_planner_reevaluate_actual(self): 204 """Test planner with re-evaluate""" 205 flow = create_test_flow() 206 false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2) 207 208 binding = FlowStageBinding.objects.create( 209 target=flow, 210 stage=DummyStage.objects.create(name=generate_id()), 211 order=0, 212 re_evaluate_policies=False, 213 ) 214 binding2 = FlowStageBinding.objects.create( 215 target=flow, 216 stage=DummyStage.objects.create(name=generate_id()), 217 order=1, 218 re_evaluate_policies=True, 219 ) 220 221 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 222 223 request = self.request_factory.get( 224 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 225 ) 226 227 # Here we patch the dummy policy to evaluate to true so the stage is included 228 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 229 planner = FlowPlanner(flow) 230 plan = planner.plan(request) 231 232 self.assertEqual(plan.bindings[0], binding) 233 self.assertEqual(plan.bindings[1], binding2) 234 235 self.assertEqual(plan.markers[0].__class__, StageMarker) 236 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 237 self.assertIsInstance(plan.markers[0], StageMarker) 238 self.assertIsInstance(plan.markers[1], ReevaluateMarker) 239 240 def test_to_redirect(self): 241 """Test to_redirect and skipping the flow executor""" 242 flow = create_test_flow() 243 flow.authentication = FlowAuthenticationRequirement.NONE 244 request = self.request_factory.get( 245 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 246 ) 247 248 planner = FlowPlanner(flow) 249 planner.allow_empty_flows = True 250 plan = planner.plan(request) 251 self.assertTrue(plan.requires_flow_executor()) 252 self.assertEqual( 253 plan.to_redirect(request, flow).url, 254 reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}), 255 ) 256 257 def test_to_redirect_skip_simple(self): 258 """Test to_redirect and skipping the flow executor""" 259 flow = create_test_flow() 260 flow.authentication = FlowAuthenticationRequirement.NONE 261 request = self.request_factory.get( 262 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 263 ) 264 planner = FlowPlanner(flow) 265 planner.allow_empty_flows = True 266 plan = planner.plan(request) 267 268 class TStageView(StageView): 269 def dispatch(self, request: HttpRequest, *args, **kwargs): 270 return redirect("https://authentik.company") 271 272 plan.append_stage(in_memory_stage(TStageView)) 273 self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 274 self.assertEqual( 275 plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url, 276 "https://authentik.company", 277 ) 278 279 def test_to_redirect_skip_stage(self): 280 """Test to_redirect and skipping the flow executor 281 (with a stage bound that cannot be skipped)""" 282 flow = create_test_flow() 283 flow.authentication = FlowAuthenticationRequirement.NONE 284 285 FlowStageBinding.objects.create( 286 target=flow, stage=DummyStage.objects.create(name="dummy"), order=0 287 ) 288 289 request = self.request_factory.get( 290 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 291 ) 292 planner = FlowPlanner(flow) 293 planner.allow_empty_flows = True 294 plan = planner.plan(request) 295 296 class TStageView(StageView): 297 def dispatch(self, request: HttpRequest, *args, **kwargs): 298 return redirect("https://authentik.company") 299 300 plan.append_stage(in_memory_stage(TStageView)) 301 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 302 303 def test_to_redirect_skip_policies(self): 304 """Test to_redirect and skipping the flow executor 305 (with a marker on the stage view type that can be skipped) 306 307 Note that this is not actually used anywhere in the code, all stages that are dynamically 308 added are statically added""" 309 flow = create_test_flow() 310 flow.authentication = FlowAuthenticationRequirement.NONE 311 312 request = self.request_factory.get( 313 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 314 ) 315 planner = FlowPlanner(flow) 316 planner.allow_empty_flows = True 317 plan = planner.plan(request) 318 319 class TStageView(StageView): 320 def dispatch(self, request: HttpRequest, *args, **kwargs): 321 return redirect("https://authentik.company") 322 323 plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None)) 324 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
A mock intended to be used as a property, or other descriptor, on a class.
PropertyMock provides __get__ and __set__ methods so you can specify
a return value when it is fetched.
Fetching a PropertyMock instance from an object calls the mock, with
no args. Setting it calls the mock with the value being set.
50class TestFlowPlanner(TestCase): 51 """Test planner logic""" 52 53 def setUp(self): 54 self.request_factory = RequestFactory() 55 56 def test_empty_plan(self): 57 """Test that empty plan raises exception""" 58 flow = create_test_flow() 59 request = self.request_factory.get( 60 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 61 ) 62 63 with self.assertRaises(EmptyFlowException): 64 planner = FlowPlanner(flow) 65 planner.plan(request) 66 67 def test_authentication(self): 68 """Test flow authentication""" 69 flow = create_test_flow() 70 flow.authentication = FlowAuthenticationRequirement.NONE 71 request = self.request_factory.get( 72 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 73 ) 74 planner = FlowPlanner(flow) 75 planner.allow_empty_flows = True 76 planner.plan(request) 77 78 with self.assertRaises(FlowNonApplicableException): 79 flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 80 FlowPlanner(flow).plan(request) 81 with self.assertRaises(FlowNonApplicableException): 82 flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER 83 FlowPlanner(flow).plan(request) 84 85 request.user = create_test_admin_user() 86 planner = FlowPlanner(flow) 87 planner.allow_empty_flows = True 88 planner.plan(request) 89 90 def test_authentication_redirect_required(self): 91 """Test flow authentication (redirect required)""" 92 flow = create_test_flow() 93 flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT 94 request = self.request_factory.get( 95 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 96 ) 97 planner = FlowPlanner(flow) 98 planner.allow_empty_flows = True 99 100 with self.assertRaises(FlowNonApplicableException): 101 planner.plan(request) 102 103 context = {} 104 context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow() 105 planner.plan(request, context) 106 107 @reconcile_app("authentik_outposts") 108 def test_authentication_outpost(self): 109 """Test flow authentication (outpost)""" 110 flow = create_test_flow() 111 flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST 112 request = self.request_factory.get( 113 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 114 ) 115 with self.assertRaises(FlowNonApplicableException): 116 planner = FlowPlanner(flow) 117 planner.allow_empty_flows = True 118 planner.plan(request) 119 120 outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first() 121 request = self.request_factory.get( 122 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 123 HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key, 124 HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4", 125 ) 126 middleware = ClientIPMiddleware(dummy_get_response) 127 middleware(request) 128 129 planner = FlowPlanner(flow) 130 planner.allow_empty_flows = True 131 planner.plan(request) 132 133 @patch( 134 "authentik.policies.engine.PolicyEngine.result", 135 POLICY_RETURN_FALSE, 136 ) 137 def test_non_applicable_plan(self): 138 """Test that empty plan raises exception""" 139 flow = create_test_flow() 140 request = self.request_factory.get( 141 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 142 ) 143 144 with self.assertRaises(FlowNonApplicableException): 145 planner = FlowPlanner(flow) 146 planner.plan(request) 147 148 @patch("authentik.flows.planner.cache", CACHE_MOCK) 149 def test_planner_cache(self): 150 """Test planner cache""" 151 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 152 FlowStageBinding.objects.create( 153 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 154 ) 155 request = self.request_factory.get( 156 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 157 ) 158 159 planner = FlowPlanner(flow) 160 planner.plan(request) 161 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure plan is written to cache 162 planner = FlowPlanner(flow) 163 planner.plan(request) 164 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure nothing is written to cache 165 self.assertEqual(CACHE_MOCK.get.call_count, 2) # Get is called twice 166 167 def test_planner_default_context(self): 168 """Test planner with default_context""" 169 flow = create_test_flow() 170 FlowStageBinding.objects.create( 171 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 172 ) 173 174 user = User.objects.create(username="test-user") 175 request = self.request_factory.get( 176 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 177 ) 178 request.user = user 179 planner = FlowPlanner(flow) 180 planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user}) 181 key = cache_key(flow, user) 182 self.assertTrue(cache.get(key) is not None) 183 184 def test_planner_marker_reevaluate(self): 185 """Test that the planner creates the proper marker""" 186 flow = create_test_flow() 187 188 FlowStageBinding.objects.create( 189 target=flow, 190 stage=DummyStage.objects.create(name=generate_id()), 191 order=0, 192 re_evaluate_policies=True, 193 ) 194 195 request = self.request_factory.get( 196 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 197 ) 198 199 planner = FlowPlanner(flow) 200 plan = planner.plan(request) 201 202 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker) 203 204 def test_planner_reevaluate_actual(self): 205 """Test planner with re-evaluate""" 206 flow = create_test_flow() 207 false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2) 208 209 binding = FlowStageBinding.objects.create( 210 target=flow, 211 stage=DummyStage.objects.create(name=generate_id()), 212 order=0, 213 re_evaluate_policies=False, 214 ) 215 binding2 = FlowStageBinding.objects.create( 216 target=flow, 217 stage=DummyStage.objects.create(name=generate_id()), 218 order=1, 219 re_evaluate_policies=True, 220 ) 221 222 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 223 224 request = self.request_factory.get( 225 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 226 ) 227 228 # Here we patch the dummy policy to evaluate to true so the stage is included 229 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 230 planner = FlowPlanner(flow) 231 plan = planner.plan(request) 232 233 self.assertEqual(plan.bindings[0], binding) 234 self.assertEqual(plan.bindings[1], binding2) 235 236 self.assertEqual(plan.markers[0].__class__, StageMarker) 237 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 238 self.assertIsInstance(plan.markers[0], StageMarker) 239 self.assertIsInstance(plan.markers[1], ReevaluateMarker) 240 241 def test_to_redirect(self): 242 """Test to_redirect and skipping the flow executor""" 243 flow = create_test_flow() 244 flow.authentication = FlowAuthenticationRequirement.NONE 245 request = self.request_factory.get( 246 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 247 ) 248 249 planner = FlowPlanner(flow) 250 planner.allow_empty_flows = True 251 plan = planner.plan(request) 252 self.assertTrue(plan.requires_flow_executor()) 253 self.assertEqual( 254 plan.to_redirect(request, flow).url, 255 reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}), 256 ) 257 258 def test_to_redirect_skip_simple(self): 259 """Test to_redirect and skipping the flow executor""" 260 flow = create_test_flow() 261 flow.authentication = FlowAuthenticationRequirement.NONE 262 request = self.request_factory.get( 263 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 264 ) 265 planner = FlowPlanner(flow) 266 planner.allow_empty_flows = True 267 plan = planner.plan(request) 268 269 class TStageView(StageView): 270 def dispatch(self, request: HttpRequest, *args, **kwargs): 271 return redirect("https://authentik.company") 272 273 plan.append_stage(in_memory_stage(TStageView)) 274 self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 275 self.assertEqual( 276 plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url, 277 "https://authentik.company", 278 ) 279 280 def test_to_redirect_skip_stage(self): 281 """Test to_redirect and skipping the flow executor 282 (with a stage bound that cannot be skipped)""" 283 flow = create_test_flow() 284 flow.authentication = FlowAuthenticationRequirement.NONE 285 286 FlowStageBinding.objects.create( 287 target=flow, stage=DummyStage.objects.create(name="dummy"), order=0 288 ) 289 290 request = self.request_factory.get( 291 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 292 ) 293 planner = FlowPlanner(flow) 294 planner.allow_empty_flows = True 295 plan = planner.plan(request) 296 297 class TStageView(StageView): 298 def dispatch(self, request: HttpRequest, *args, **kwargs): 299 return redirect("https://authentik.company") 300 301 plan.append_stage(in_memory_stage(TStageView)) 302 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 303 304 def test_to_redirect_skip_policies(self): 305 """Test to_redirect and skipping the flow executor 306 (with a marker on the stage view type that can be skipped) 307 308 Note that this is not actually used anywhere in the code, all stages that are dynamically 309 added are statically added""" 310 flow = create_test_flow() 311 flow.authentication = FlowAuthenticationRequirement.NONE 312 313 request = self.request_factory.get( 314 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 315 ) 316 planner = FlowPlanner(flow) 317 planner.allow_empty_flows = True 318 plan = planner.plan(request) 319 320 class TStageView(StageView): 321 def dispatch(self, request: HttpRequest, *args, **kwargs): 322 return redirect("https://authentik.company") 323 324 plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None)) 325 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
Test planner logic
56 def test_empty_plan(self): 57 """Test that empty plan raises exception""" 58 flow = create_test_flow() 59 request = self.request_factory.get( 60 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 61 ) 62 63 with self.assertRaises(EmptyFlowException): 64 planner = FlowPlanner(flow) 65 planner.plan(request)
Test that empty plan raises exception
67 def test_authentication(self): 68 """Test flow authentication""" 69 flow = create_test_flow() 70 flow.authentication = FlowAuthenticationRequirement.NONE 71 request = self.request_factory.get( 72 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 73 ) 74 planner = FlowPlanner(flow) 75 planner.allow_empty_flows = True 76 planner.plan(request) 77 78 with self.assertRaises(FlowNonApplicableException): 79 flow.authentication = FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 80 FlowPlanner(flow).plan(request) 81 with self.assertRaises(FlowNonApplicableException): 82 flow.authentication = FlowAuthenticationRequirement.REQUIRE_SUPERUSER 83 FlowPlanner(flow).plan(request) 84 85 request.user = create_test_admin_user() 86 planner = FlowPlanner(flow) 87 planner.allow_empty_flows = True 88 planner.plan(request)
Test flow authentication
90 def test_authentication_redirect_required(self): 91 """Test flow authentication (redirect required)""" 92 flow = create_test_flow() 93 flow.authentication = FlowAuthenticationRequirement.REQUIRE_REDIRECT 94 request = self.request_factory.get( 95 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 96 ) 97 planner = FlowPlanner(flow) 98 planner.allow_empty_flows = True 99 100 with self.assertRaises(FlowNonApplicableException): 101 planner.plan(request) 102 103 context = {} 104 context[PLAN_CONTEXT_IS_REDIRECTED] = create_test_flow() 105 planner.plan(request, context)
Test flow authentication (redirect required)
107 @reconcile_app("authentik_outposts") 108 def test_authentication_outpost(self): 109 """Test flow authentication (outpost)""" 110 flow = create_test_flow() 111 flow.authentication = FlowAuthenticationRequirement.REQUIRE_OUTPOST 112 request = self.request_factory.get( 113 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 114 ) 115 with self.assertRaises(FlowNonApplicableException): 116 planner = FlowPlanner(flow) 117 planner.allow_empty_flows = True 118 planner.plan(request) 119 120 outpost = Outpost.objects.filter(managed=MANAGED_OUTPOST).first() 121 request = self.request_factory.get( 122 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 123 HTTP_X_AUTHENTIK_OUTPOST_TOKEN=outpost.token.key, 124 HTTP_X_AUTHENTIK_REMOTE_IP="1.2.3.4", 125 ) 126 middleware = ClientIPMiddleware(dummy_get_response) 127 middleware(request) 128 129 planner = FlowPlanner(flow) 130 planner.allow_empty_flows = True 131 planner.plan(request)
Test flow authentication (outpost)
133 @patch( 134 "authentik.policies.engine.PolicyEngine.result", 135 POLICY_RETURN_FALSE, 136 ) 137 def test_non_applicable_plan(self): 138 """Test that empty plan raises exception""" 139 flow = create_test_flow() 140 request = self.request_factory.get( 141 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 142 ) 143 144 with self.assertRaises(FlowNonApplicableException): 145 planner = FlowPlanner(flow) 146 planner.plan(request)
Test that empty plan raises exception
148 @patch("authentik.flows.planner.cache", CACHE_MOCK) 149 def test_planner_cache(self): 150 """Test planner cache""" 151 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 152 FlowStageBinding.objects.create( 153 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 154 ) 155 request = self.request_factory.get( 156 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 157 ) 158 159 planner = FlowPlanner(flow) 160 planner.plan(request) 161 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure plan is written to cache 162 planner = FlowPlanner(flow) 163 planner.plan(request) 164 self.assertEqual(CACHE_MOCK.set.call_count, 1) # Ensure nothing is written to cache 165 self.assertEqual(CACHE_MOCK.get.call_count, 2) # Get is called twice
Test planner cache
167 def test_planner_default_context(self): 168 """Test planner with default_context""" 169 flow = create_test_flow() 170 FlowStageBinding.objects.create( 171 target=flow, stage=DummyStage.objects.create(name=generate_id()), order=0 172 ) 173 174 user = User.objects.create(username="test-user") 175 request = self.request_factory.get( 176 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 177 ) 178 request.user = user 179 planner = FlowPlanner(flow) 180 planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user}) 181 key = cache_key(flow, user) 182 self.assertTrue(cache.get(key) is not None)
Test planner with default_context
184 def test_planner_marker_reevaluate(self): 185 """Test that the planner creates the proper marker""" 186 flow = create_test_flow() 187 188 FlowStageBinding.objects.create( 189 target=flow, 190 stage=DummyStage.objects.create(name=generate_id()), 191 order=0, 192 re_evaluate_policies=True, 193 ) 194 195 request = self.request_factory.get( 196 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 197 ) 198 199 planner = FlowPlanner(flow) 200 plan = planner.plan(request) 201 202 self.assertEqual(plan.markers[0].__class__, ReevaluateMarker)
Test that the planner creates the proper marker
204 def test_planner_reevaluate_actual(self): 205 """Test planner with re-evaluate""" 206 flow = create_test_flow() 207 false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2) 208 209 binding = FlowStageBinding.objects.create( 210 target=flow, 211 stage=DummyStage.objects.create(name=generate_id()), 212 order=0, 213 re_evaluate_policies=False, 214 ) 215 binding2 = FlowStageBinding.objects.create( 216 target=flow, 217 stage=DummyStage.objects.create(name=generate_id()), 218 order=1, 219 re_evaluate_policies=True, 220 ) 221 222 PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0) 223 224 request = self.request_factory.get( 225 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 226 ) 227 228 # Here we patch the dummy policy to evaluate to true so the stage is included 229 with patch("authentik.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE): 230 planner = FlowPlanner(flow) 231 plan = planner.plan(request) 232 233 self.assertEqual(plan.bindings[0], binding) 234 self.assertEqual(plan.bindings[1], binding2) 235 236 self.assertEqual(plan.markers[0].__class__, StageMarker) 237 self.assertEqual(plan.markers[1].__class__, ReevaluateMarker) 238 self.assertIsInstance(plan.markers[0], StageMarker) 239 self.assertIsInstance(plan.markers[1], ReevaluateMarker)
Test planner with re-evaluate
241 def test_to_redirect(self): 242 """Test to_redirect and skipping the flow executor""" 243 flow = create_test_flow() 244 flow.authentication = FlowAuthenticationRequirement.NONE 245 request = self.request_factory.get( 246 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 247 ) 248 249 planner = FlowPlanner(flow) 250 planner.allow_empty_flows = True 251 plan = planner.plan(request) 252 self.assertTrue(plan.requires_flow_executor()) 253 self.assertEqual( 254 plan.to_redirect(request, flow).url, 255 reverse("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}), 256 )
Test to_redirect and skipping the flow executor
258 def test_to_redirect_skip_simple(self): 259 """Test to_redirect and skipping the flow executor""" 260 flow = create_test_flow() 261 flow.authentication = FlowAuthenticationRequirement.NONE 262 request = self.request_factory.get( 263 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 264 ) 265 planner = FlowPlanner(flow) 266 planner.allow_empty_flows = True 267 plan = planner.plan(request) 268 269 class TStageView(StageView): 270 def dispatch(self, request: HttpRequest, *args, **kwargs): 271 return redirect("https://authentik.company") 272 273 plan.append_stage(in_memory_stage(TStageView)) 274 self.assertFalse(plan.requires_flow_executor(allowed_silent_types=[TStageView])) 275 self.assertEqual( 276 plan.to_redirect(request, flow, allowed_silent_types=[TStageView]).url, 277 "https://authentik.company", 278 )
Test to_redirect and skipping the flow executor
280 def test_to_redirect_skip_stage(self): 281 """Test to_redirect and skipping the flow executor 282 (with a stage bound that cannot be skipped)""" 283 flow = create_test_flow() 284 flow.authentication = FlowAuthenticationRequirement.NONE 285 286 FlowStageBinding.objects.create( 287 target=flow, stage=DummyStage.objects.create(name="dummy"), order=0 288 ) 289 290 request = self.request_factory.get( 291 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 292 ) 293 planner = FlowPlanner(flow) 294 planner.allow_empty_flows = True 295 plan = planner.plan(request) 296 297 class TStageView(StageView): 298 def dispatch(self, request: HttpRequest, *args, **kwargs): 299 return redirect("https://authentik.company") 300 301 plan.append_stage(in_memory_stage(TStageView)) 302 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
Test to_redirect and skipping the flow executor (with a stage bound that cannot be skipped)
304 def test_to_redirect_skip_policies(self): 305 """Test to_redirect and skipping the flow executor 306 (with a marker on the stage view type that can be skipped) 307 308 Note that this is not actually used anywhere in the code, all stages that are dynamically 309 added are statically added""" 310 flow = create_test_flow() 311 flow.authentication = FlowAuthenticationRequirement.NONE 312 313 request = self.request_factory.get( 314 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 315 ) 316 planner = FlowPlanner(flow) 317 planner.allow_empty_flows = True 318 plan = planner.plan(request) 319 320 class TStageView(StageView): 321 def dispatch(self, request: HttpRequest, *args, **kwargs): 322 return redirect("https://authentik.company") 323 324 plan.append_stage(in_memory_stage(TStageView), ReevaluateMarker(None)) 325 self.assertTrue(plan.requires_flow_executor(allowed_silent_types=[TStageView]))
Test to_redirect and skipping the flow executor (with a marker on the stage view type that can be skipped)
Note that this is not actually used anywhere in the code, all stages that are dynamically added are statically added