authentik.stages.consent.tests
consent tests
1"""consent tests""" 2 3from datetime import timedelta 4 5from django.urls import reverse 6from freezegun import freeze_time 7 8from authentik.core.models import Application 9from authentik.core.tasks import clean_expired_models 10from authentik.core.tests.utils import create_test_admin_user, create_test_flow 11from authentik.flows.challenge import PermissionDict 12from authentik.flows.markers import StageMarker 13from authentik.flows.models import FlowDesignation, FlowStageBinding 14from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, FlowPlan 15from authentik.flows.tests import FlowTestCase 16from authentik.flows.views.executor import SESSION_KEY_PLAN 17from authentik.lib.generators import generate_id 18from authentik.stages.consent.models import ConsentMode, ConsentStage, UserConsent 19from authentik.stages.consent.stage import ( 20 PLAN_CONTEXT_CONSENT_HEADER, 21 PLAN_CONTEXT_CONSENT_PERMISSIONS, 22 PLAN_CONTEXT_CONSENT_TOKEN, 23) 24 25 26class TestConsentStage(FlowTestCase): 27 """Consent tests""" 28 29 def setUp(self): 30 super().setUp() 31 self.user = create_test_admin_user() 32 self.application = Application.objects.create( 33 name=generate_id(), 34 slug=generate_id(), 35 ) 36 37 def test_mismatched_token(self): 38 """Test incorrect token""" 39 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 40 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE) 41 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 42 43 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()]) 44 session = self.client.session 45 session[SESSION_KEY_PLAN] = plan 46 session.save() 47 response = self.client.get( 48 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 49 ) 50 self.assertEqual(response.status_code, 200) 51 52 session = self.client.session 53 response = self.client.post( 54 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 55 { 56 "token": generate_id(), 57 }, 58 ) 59 60 self.assertEqual(response.status_code, 200) 61 self.assertStageResponse( 62 response, 63 flow, 64 component="ak-stage-consent", 65 response_errors={ 66 "token": [{"string": "Invalid consent token, re-showing prompt", "code": "invalid"}] 67 }, 68 ) 69 self.assertFalse(UserConsent.objects.filter(user=self.user).exists()) 70 71 def test_always_required(self): 72 """Test always required consent""" 73 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 74 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE) 75 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 76 77 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()]) 78 session = self.client.session 79 session[SESSION_KEY_PLAN] = plan 80 session.save() 81 response = self.client.get( 82 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 83 ) 84 self.assertEqual(response.status_code, 200) 85 86 response = self.client.post( 87 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 88 { 89 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 90 }, 91 ) 92 93 self.assertEqual(response.status_code, 200) 94 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 95 self.assertFalse(UserConsent.objects.filter(user=self.user).exists()) 96 97 def test_permanent(self): 98 """Test permanent consent from user""" 99 self.client.force_login(self.user) 100 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 101 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 102 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 103 104 plan = FlowPlan( 105 flow_pk=flow.pk.hex, 106 bindings=[binding], 107 markers=[StageMarker()], 108 context={ 109 PLAN_CONTEXT_APPLICATION: self.application, 110 }, 111 ) 112 session = self.client.session 113 session[SESSION_KEY_PLAN] = plan 114 session.save() 115 response = self.client.get( 116 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 117 ) 118 self.assertEqual(response.status_code, 200) 119 120 session = self.client.session 121 response = self.client.post( 122 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 123 { 124 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 125 }, 126 ) 127 self.assertEqual(response.status_code, 200) 128 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 129 self.assertTrue( 130 UserConsent.objects.filter(user=self.user, application=self.application).exists() 131 ) 132 133 def test_expire(self): 134 """Test expiring consent from user""" 135 self.client.force_login(self.user) 136 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 137 stage = ConsentStage.objects.create( 138 name=generate_id(), mode=ConsentMode.EXPIRING, consent_expire_in="seconds=1" 139 ) 140 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 141 142 plan = FlowPlan( 143 flow_pk=flow.pk.hex, 144 bindings=[binding], 145 markers=[StageMarker()], 146 context={PLAN_CONTEXT_APPLICATION: self.application}, 147 ) 148 session = self.client.session 149 session[SESSION_KEY_PLAN] = plan 150 session.save() 151 response = self.client.get( 152 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 153 {}, 154 ) 155 self.assertEqual(response.status_code, 200) 156 raw_res = self.assertStageResponse( 157 response, 158 flow, 159 self.user, 160 permissions=[], 161 additional_permissions=[], 162 ) 163 response = self.client.post( 164 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 165 { 166 "token": raw_res["token"], 167 }, 168 ) 169 self.assertEqual(response.status_code, 200) 170 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 171 self.assertTrue( 172 UserConsent.objects.filter(user=self.user, application=self.application).exists() 173 ) 174 with freeze_time() as frozen_time: 175 frozen_time.tick(timedelta(seconds=3)) 176 clean_expired_models.send() 177 self.assertFalse( 178 UserConsent.objects.filter(user=self.user, application=self.application).exists() 179 ) 180 181 def test_permanent_more_perms(self): 182 """Test permanent consent from user""" 183 self.client.force_login(self.user) 184 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 185 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 186 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 187 188 plan = FlowPlan( 189 flow_pk=flow.pk.hex, 190 bindings=[binding], 191 markers=[StageMarker()], 192 context={ 193 PLAN_CONTEXT_APPLICATION: self.application, 194 PLAN_CONTEXT_CONSENT_PERMISSIONS: [PermissionDict(id="foo", name="foo-desc")], 195 PLAN_CONTEXT_CONSENT_HEADER: "test header", 196 }, 197 ) 198 session = self.client.session 199 session[SESSION_KEY_PLAN] = plan 200 session.save() 201 202 # First, consent with a single permission 203 response = self.client.get( 204 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 205 {}, 206 ) 207 self.assertEqual(response.status_code, 200) 208 raw_res = self.assertStageResponse( 209 response, 210 flow, 211 self.user, 212 permissions=[ 213 {"id": "foo", "name": "foo-desc"}, 214 ], 215 additional_permissions=[], 216 ) 217 response = self.client.post( 218 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 219 { 220 "token": raw_res["token"], 221 }, 222 ) 223 self.assertEqual(response.status_code, 200) 224 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 225 self.assertTrue( 226 UserConsent.objects.filter( 227 user=self.user, application=self.application, permissions="foo" 228 ).exists() 229 ) 230 231 # Request again with more perms 232 plan = FlowPlan( 233 flow_pk=flow.pk.hex, 234 bindings=[binding], 235 markers=[StageMarker()], 236 context={ 237 PLAN_CONTEXT_APPLICATION: self.application, 238 PLAN_CONTEXT_CONSENT_PERMISSIONS: [ 239 PermissionDict(id="foo", name="foo-desc"), 240 PermissionDict(id="bar", name="bar-desc"), 241 ], 242 }, 243 ) 244 session = self.client.session 245 session[SESSION_KEY_PLAN] = plan 246 session.save() 247 248 response = self.client.get( 249 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 250 {}, 251 ) 252 self.assertEqual(response.status_code, 200) 253 raw_res = self.assertStageResponse( 254 response, 255 flow, 256 self.user, 257 permissions=[ 258 {"id": "foo", "name": "foo-desc"}, 259 ], 260 additional_permissions=[ 261 {"id": "bar", "name": "bar-desc"}, 262 ], 263 ) 264 response = self.client.post( 265 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 266 { 267 "token": raw_res["token"], 268 }, 269 ) 270 self.assertEqual(response.status_code, 200) 271 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 272 self.assertTrue( 273 UserConsent.objects.filter( 274 user=self.user, application=self.application, permissions="foo bar" 275 ).exists() 276 ) 277 278 def test_permanent_same(self): 279 """Test permanent consent from user""" 280 self.client.force_login(self.user) 281 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 282 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 283 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 284 285 plan = FlowPlan( 286 flow_pk=flow.pk.hex, 287 bindings=[binding], 288 markers=[StageMarker()], 289 context={ 290 PLAN_CONTEXT_APPLICATION: self.application, 291 }, 292 ) 293 session = self.client.session 294 session[SESSION_KEY_PLAN] = plan 295 session.save() 296 297 # First, consent with a single permission 298 response = self.client.get( 299 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 300 {}, 301 ) 302 self.assertEqual(response.status_code, 200) 303 raw_res = self.assertStageResponse( 304 response, 305 flow, 306 self.user, 307 permissions=[], 308 additional_permissions=[], 309 ) 310 response = self.client.post( 311 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 312 { 313 "token": raw_res["token"], 314 }, 315 ) 316 self.assertEqual(response.status_code, 200) 317 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 318 self.assertTrue( 319 UserConsent.objects.filter( 320 user=self.user, application=self.application, permissions="" 321 ).exists() 322 ) 323 324 # Request again with the same perms 325 plan = FlowPlan( 326 flow_pk=flow.pk.hex, 327 bindings=[binding], 328 markers=[StageMarker()], 329 context={ 330 PLAN_CONTEXT_APPLICATION: self.application, 331 PLAN_CONTEXT_CONSENT_PERMISSIONS: [], 332 }, 333 ) 334 session = self.client.session 335 session[SESSION_KEY_PLAN] = plan 336 session.save() 337 338 response = self.client.get( 339 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 340 {}, 341 ) 342 self.assertEqual(response.status_code, 200) 343 self.assertStageResponse(response, component="xak-flow-redirect")
27class TestConsentStage(FlowTestCase): 28 """Consent tests""" 29 30 def setUp(self): 31 super().setUp() 32 self.user = create_test_admin_user() 33 self.application = Application.objects.create( 34 name=generate_id(), 35 slug=generate_id(), 36 ) 37 38 def test_mismatched_token(self): 39 """Test incorrect token""" 40 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 41 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE) 42 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 43 44 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()]) 45 session = self.client.session 46 session[SESSION_KEY_PLAN] = plan 47 session.save() 48 response = self.client.get( 49 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 50 ) 51 self.assertEqual(response.status_code, 200) 52 53 session = self.client.session 54 response = self.client.post( 55 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 56 { 57 "token": generate_id(), 58 }, 59 ) 60 61 self.assertEqual(response.status_code, 200) 62 self.assertStageResponse( 63 response, 64 flow, 65 component="ak-stage-consent", 66 response_errors={ 67 "token": [{"string": "Invalid consent token, re-showing prompt", "code": "invalid"}] 68 }, 69 ) 70 self.assertFalse(UserConsent.objects.filter(user=self.user).exists()) 71 72 def test_always_required(self): 73 """Test always required consent""" 74 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 75 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE) 76 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 77 78 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()]) 79 session = self.client.session 80 session[SESSION_KEY_PLAN] = plan 81 session.save() 82 response = self.client.get( 83 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 84 ) 85 self.assertEqual(response.status_code, 200) 86 87 response = self.client.post( 88 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 89 { 90 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 91 }, 92 ) 93 94 self.assertEqual(response.status_code, 200) 95 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 96 self.assertFalse(UserConsent.objects.filter(user=self.user).exists()) 97 98 def test_permanent(self): 99 """Test permanent consent from user""" 100 self.client.force_login(self.user) 101 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 102 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 103 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 104 105 plan = FlowPlan( 106 flow_pk=flow.pk.hex, 107 bindings=[binding], 108 markers=[StageMarker()], 109 context={ 110 PLAN_CONTEXT_APPLICATION: self.application, 111 }, 112 ) 113 session = self.client.session 114 session[SESSION_KEY_PLAN] = plan 115 session.save() 116 response = self.client.get( 117 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 118 ) 119 self.assertEqual(response.status_code, 200) 120 121 session = self.client.session 122 response = self.client.post( 123 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 124 { 125 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 126 }, 127 ) 128 self.assertEqual(response.status_code, 200) 129 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 130 self.assertTrue( 131 UserConsent.objects.filter(user=self.user, application=self.application).exists() 132 ) 133 134 def test_expire(self): 135 """Test expiring consent from user""" 136 self.client.force_login(self.user) 137 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 138 stage = ConsentStage.objects.create( 139 name=generate_id(), mode=ConsentMode.EXPIRING, consent_expire_in="seconds=1" 140 ) 141 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 142 143 plan = FlowPlan( 144 flow_pk=flow.pk.hex, 145 bindings=[binding], 146 markers=[StageMarker()], 147 context={PLAN_CONTEXT_APPLICATION: self.application}, 148 ) 149 session = self.client.session 150 session[SESSION_KEY_PLAN] = plan 151 session.save() 152 response = self.client.get( 153 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 154 {}, 155 ) 156 self.assertEqual(response.status_code, 200) 157 raw_res = self.assertStageResponse( 158 response, 159 flow, 160 self.user, 161 permissions=[], 162 additional_permissions=[], 163 ) 164 response = self.client.post( 165 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 166 { 167 "token": raw_res["token"], 168 }, 169 ) 170 self.assertEqual(response.status_code, 200) 171 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 172 self.assertTrue( 173 UserConsent.objects.filter(user=self.user, application=self.application).exists() 174 ) 175 with freeze_time() as frozen_time: 176 frozen_time.tick(timedelta(seconds=3)) 177 clean_expired_models.send() 178 self.assertFalse( 179 UserConsent.objects.filter(user=self.user, application=self.application).exists() 180 ) 181 182 def test_permanent_more_perms(self): 183 """Test permanent consent from user""" 184 self.client.force_login(self.user) 185 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 186 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 187 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 188 189 plan = FlowPlan( 190 flow_pk=flow.pk.hex, 191 bindings=[binding], 192 markers=[StageMarker()], 193 context={ 194 PLAN_CONTEXT_APPLICATION: self.application, 195 PLAN_CONTEXT_CONSENT_PERMISSIONS: [PermissionDict(id="foo", name="foo-desc")], 196 PLAN_CONTEXT_CONSENT_HEADER: "test header", 197 }, 198 ) 199 session = self.client.session 200 session[SESSION_KEY_PLAN] = plan 201 session.save() 202 203 # First, consent with a single permission 204 response = self.client.get( 205 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 206 {}, 207 ) 208 self.assertEqual(response.status_code, 200) 209 raw_res = self.assertStageResponse( 210 response, 211 flow, 212 self.user, 213 permissions=[ 214 {"id": "foo", "name": "foo-desc"}, 215 ], 216 additional_permissions=[], 217 ) 218 response = self.client.post( 219 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 220 { 221 "token": raw_res["token"], 222 }, 223 ) 224 self.assertEqual(response.status_code, 200) 225 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 226 self.assertTrue( 227 UserConsent.objects.filter( 228 user=self.user, application=self.application, permissions="foo" 229 ).exists() 230 ) 231 232 # Request again with more perms 233 plan = FlowPlan( 234 flow_pk=flow.pk.hex, 235 bindings=[binding], 236 markers=[StageMarker()], 237 context={ 238 PLAN_CONTEXT_APPLICATION: self.application, 239 PLAN_CONTEXT_CONSENT_PERMISSIONS: [ 240 PermissionDict(id="foo", name="foo-desc"), 241 PermissionDict(id="bar", name="bar-desc"), 242 ], 243 }, 244 ) 245 session = self.client.session 246 session[SESSION_KEY_PLAN] = plan 247 session.save() 248 249 response = self.client.get( 250 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 251 {}, 252 ) 253 self.assertEqual(response.status_code, 200) 254 raw_res = self.assertStageResponse( 255 response, 256 flow, 257 self.user, 258 permissions=[ 259 {"id": "foo", "name": "foo-desc"}, 260 ], 261 additional_permissions=[ 262 {"id": "bar", "name": "bar-desc"}, 263 ], 264 ) 265 response = self.client.post( 266 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 267 { 268 "token": raw_res["token"], 269 }, 270 ) 271 self.assertEqual(response.status_code, 200) 272 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 273 self.assertTrue( 274 UserConsent.objects.filter( 275 user=self.user, application=self.application, permissions="foo bar" 276 ).exists() 277 ) 278 279 def test_permanent_same(self): 280 """Test permanent consent from user""" 281 self.client.force_login(self.user) 282 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 283 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 284 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 285 286 plan = FlowPlan( 287 flow_pk=flow.pk.hex, 288 bindings=[binding], 289 markers=[StageMarker()], 290 context={ 291 PLAN_CONTEXT_APPLICATION: self.application, 292 }, 293 ) 294 session = self.client.session 295 session[SESSION_KEY_PLAN] = plan 296 session.save() 297 298 # First, consent with a single permission 299 response = self.client.get( 300 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 301 {}, 302 ) 303 self.assertEqual(response.status_code, 200) 304 raw_res = self.assertStageResponse( 305 response, 306 flow, 307 self.user, 308 permissions=[], 309 additional_permissions=[], 310 ) 311 response = self.client.post( 312 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 313 { 314 "token": raw_res["token"], 315 }, 316 ) 317 self.assertEqual(response.status_code, 200) 318 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 319 self.assertTrue( 320 UserConsent.objects.filter( 321 user=self.user, application=self.application, permissions="" 322 ).exists() 323 ) 324 325 # Request again with the same perms 326 plan = FlowPlan( 327 flow_pk=flow.pk.hex, 328 bindings=[binding], 329 markers=[StageMarker()], 330 context={ 331 PLAN_CONTEXT_APPLICATION: self.application, 332 PLAN_CONTEXT_CONSENT_PERMISSIONS: [], 333 }, 334 ) 335 session = self.client.session 336 session[SESSION_KEY_PLAN] = plan 337 session.save() 338 339 response = self.client.get( 340 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 341 {}, 342 ) 343 self.assertEqual(response.status_code, 200) 344 self.assertStageResponse(response, component="xak-flow-redirect")
Consent tests
def
setUp(self):
30 def setUp(self): 31 super().setUp() 32 self.user = create_test_admin_user() 33 self.application = Application.objects.create( 34 name=generate_id(), 35 slug=generate_id(), 36 )
Hook method for setting up the test fixture before exercising it.
def
test_mismatched_token(self):
38 def test_mismatched_token(self): 39 """Test incorrect token""" 40 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 41 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE) 42 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 43 44 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()]) 45 session = self.client.session 46 session[SESSION_KEY_PLAN] = plan 47 session.save() 48 response = self.client.get( 49 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 50 ) 51 self.assertEqual(response.status_code, 200) 52 53 session = self.client.session 54 response = self.client.post( 55 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 56 { 57 "token": generate_id(), 58 }, 59 ) 60 61 self.assertEqual(response.status_code, 200) 62 self.assertStageResponse( 63 response, 64 flow, 65 component="ak-stage-consent", 66 response_errors={ 67 "token": [{"string": "Invalid consent token, re-showing prompt", "code": "invalid"}] 68 }, 69 ) 70 self.assertFalse(UserConsent.objects.filter(user=self.user).exists())
Test incorrect token
def
test_always_required(self):
72 def test_always_required(self): 73 """Test always required consent""" 74 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 75 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE) 76 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 77 78 plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()]) 79 session = self.client.session 80 session[SESSION_KEY_PLAN] = plan 81 session.save() 82 response = self.client.get( 83 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 84 ) 85 self.assertEqual(response.status_code, 200) 86 87 response = self.client.post( 88 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 89 { 90 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 91 }, 92 ) 93 94 self.assertEqual(response.status_code, 200) 95 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 96 self.assertFalse(UserConsent.objects.filter(user=self.user).exists())
Test always required consent
def
test_permanent(self):
98 def test_permanent(self): 99 """Test permanent consent from user""" 100 self.client.force_login(self.user) 101 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 102 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 103 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 104 105 plan = FlowPlan( 106 flow_pk=flow.pk.hex, 107 bindings=[binding], 108 markers=[StageMarker()], 109 context={ 110 PLAN_CONTEXT_APPLICATION: self.application, 111 }, 112 ) 113 session = self.client.session 114 session[SESSION_KEY_PLAN] = plan 115 session.save() 116 response = self.client.get( 117 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 118 ) 119 self.assertEqual(response.status_code, 200) 120 121 session = self.client.session 122 response = self.client.post( 123 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 124 { 125 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 126 }, 127 ) 128 self.assertEqual(response.status_code, 200) 129 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 130 self.assertTrue( 131 UserConsent.objects.filter(user=self.user, application=self.application).exists() 132 )
Test permanent consent from user
def
test_expire(self):
134 def test_expire(self): 135 """Test expiring consent from user""" 136 self.client.force_login(self.user) 137 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 138 stage = ConsentStage.objects.create( 139 name=generate_id(), mode=ConsentMode.EXPIRING, consent_expire_in="seconds=1" 140 ) 141 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 142 143 plan = FlowPlan( 144 flow_pk=flow.pk.hex, 145 bindings=[binding], 146 markers=[StageMarker()], 147 context={PLAN_CONTEXT_APPLICATION: self.application}, 148 ) 149 session = self.client.session 150 session[SESSION_KEY_PLAN] = plan 151 session.save() 152 response = self.client.get( 153 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 154 {}, 155 ) 156 self.assertEqual(response.status_code, 200) 157 raw_res = self.assertStageResponse( 158 response, 159 flow, 160 self.user, 161 permissions=[], 162 additional_permissions=[], 163 ) 164 response = self.client.post( 165 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 166 { 167 "token": raw_res["token"], 168 }, 169 ) 170 self.assertEqual(response.status_code, 200) 171 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 172 self.assertTrue( 173 UserConsent.objects.filter(user=self.user, application=self.application).exists() 174 ) 175 with freeze_time() as frozen_time: 176 frozen_time.tick(timedelta(seconds=3)) 177 clean_expired_models.send() 178 self.assertFalse( 179 UserConsent.objects.filter(user=self.user, application=self.application).exists() 180 )
Test expiring consent from user
def
test_permanent_more_perms(self):
182 def test_permanent_more_perms(self): 183 """Test permanent consent from user""" 184 self.client.force_login(self.user) 185 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 186 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 187 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 188 189 plan = FlowPlan( 190 flow_pk=flow.pk.hex, 191 bindings=[binding], 192 markers=[StageMarker()], 193 context={ 194 PLAN_CONTEXT_APPLICATION: self.application, 195 PLAN_CONTEXT_CONSENT_PERMISSIONS: [PermissionDict(id="foo", name="foo-desc")], 196 PLAN_CONTEXT_CONSENT_HEADER: "test header", 197 }, 198 ) 199 session = self.client.session 200 session[SESSION_KEY_PLAN] = plan 201 session.save() 202 203 # First, consent with a single permission 204 response = self.client.get( 205 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 206 {}, 207 ) 208 self.assertEqual(response.status_code, 200) 209 raw_res = self.assertStageResponse( 210 response, 211 flow, 212 self.user, 213 permissions=[ 214 {"id": "foo", "name": "foo-desc"}, 215 ], 216 additional_permissions=[], 217 ) 218 response = self.client.post( 219 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 220 { 221 "token": raw_res["token"], 222 }, 223 ) 224 self.assertEqual(response.status_code, 200) 225 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 226 self.assertTrue( 227 UserConsent.objects.filter( 228 user=self.user, application=self.application, permissions="foo" 229 ).exists() 230 ) 231 232 # Request again with more perms 233 plan = FlowPlan( 234 flow_pk=flow.pk.hex, 235 bindings=[binding], 236 markers=[StageMarker()], 237 context={ 238 PLAN_CONTEXT_APPLICATION: self.application, 239 PLAN_CONTEXT_CONSENT_PERMISSIONS: [ 240 PermissionDict(id="foo", name="foo-desc"), 241 PermissionDict(id="bar", name="bar-desc"), 242 ], 243 }, 244 ) 245 session = self.client.session 246 session[SESSION_KEY_PLAN] = plan 247 session.save() 248 249 response = self.client.get( 250 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 251 {}, 252 ) 253 self.assertEqual(response.status_code, 200) 254 raw_res = self.assertStageResponse( 255 response, 256 flow, 257 self.user, 258 permissions=[ 259 {"id": "foo", "name": "foo-desc"}, 260 ], 261 additional_permissions=[ 262 {"id": "bar", "name": "bar-desc"}, 263 ], 264 ) 265 response = self.client.post( 266 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 267 { 268 "token": raw_res["token"], 269 }, 270 ) 271 self.assertEqual(response.status_code, 200) 272 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 273 self.assertTrue( 274 UserConsent.objects.filter( 275 user=self.user, application=self.application, permissions="foo bar" 276 ).exists() 277 )
Test permanent consent from user
def
test_permanent_same(self):
279 def test_permanent_same(self): 280 """Test permanent consent from user""" 281 self.client.force_login(self.user) 282 flow = create_test_flow(FlowDesignation.AUTHENTICATION) 283 stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT) 284 binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2) 285 286 plan = FlowPlan( 287 flow_pk=flow.pk.hex, 288 bindings=[binding], 289 markers=[StageMarker()], 290 context={ 291 PLAN_CONTEXT_APPLICATION: self.application, 292 }, 293 ) 294 session = self.client.session 295 session[SESSION_KEY_PLAN] = plan 296 session.save() 297 298 # First, consent with a single permission 299 response = self.client.get( 300 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 301 {}, 302 ) 303 self.assertEqual(response.status_code, 200) 304 raw_res = self.assertStageResponse( 305 response, 306 flow, 307 self.user, 308 permissions=[], 309 additional_permissions=[], 310 ) 311 response = self.client.post( 312 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 313 { 314 "token": raw_res["token"], 315 }, 316 ) 317 self.assertEqual(response.status_code, 200) 318 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 319 self.assertTrue( 320 UserConsent.objects.filter( 321 user=self.user, application=self.application, permissions="" 322 ).exists() 323 ) 324 325 # Request again with the same perms 326 plan = FlowPlan( 327 flow_pk=flow.pk.hex, 328 bindings=[binding], 329 markers=[StageMarker()], 330 context={ 331 PLAN_CONTEXT_APPLICATION: self.application, 332 PLAN_CONTEXT_CONSENT_PERMISSIONS: [], 333 }, 334 ) 335 session = self.client.session 336 session[SESSION_KEY_PLAN] = plan 337 session.save() 338 339 response = self.client.get( 340 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 341 {}, 342 ) 343 self.assertEqual(response.status_code, 200) 344 self.assertStageResponse(response, component="xak-flow-redirect")
Test permanent consent from user