authentik.stages.consent.tests

consent tests

  1"""consent tests"""
  2
  3from datetime import timedelta
  4
  5from django.urls import reverse
  6from freezegun import freeze_time
  7
  8from authentik.core.models import Application
  9from authentik.core.tasks import clean_expired_models
 10from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 11from authentik.flows.challenge import PermissionDict
 12from authentik.flows.markers import StageMarker
 13from authentik.flows.models import FlowDesignation, FlowStageBinding
 14from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, FlowPlan
 15from authentik.flows.tests import FlowTestCase
 16from authentik.flows.views.executor import SESSION_KEY_PLAN
 17from authentik.lib.generators import generate_id
 18from authentik.stages.consent.models import ConsentMode, ConsentStage, UserConsent
 19from authentik.stages.consent.stage import (
 20    PLAN_CONTEXT_CONSENT_HEADER,
 21    PLAN_CONTEXT_CONSENT_PERMISSIONS,
 22    PLAN_CONTEXT_CONSENT_TOKEN,
 23)
 24
 25
 26class TestConsentStage(FlowTestCase):
 27    """Consent tests"""
 28
 29    def setUp(self):
 30        super().setUp()
 31        self.user = create_test_admin_user()
 32        self.application = Application.objects.create(
 33            name=generate_id(),
 34            slug=generate_id(),
 35        )
 36
 37    def test_mismatched_token(self):
 38        """Test incorrect token"""
 39        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 40        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE)
 41        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
 42
 43        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()])
 44        session = self.client.session
 45        session[SESSION_KEY_PLAN] = plan
 46        session.save()
 47        response = self.client.get(
 48            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 49        )
 50        self.assertEqual(response.status_code, 200)
 51
 52        session = self.client.session
 53        response = self.client.post(
 54            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 55            {
 56                "token": generate_id(),
 57            },
 58        )
 59
 60        self.assertEqual(response.status_code, 200)
 61        self.assertStageResponse(
 62            response,
 63            flow,
 64            component="ak-stage-consent",
 65            response_errors={
 66                "token": [{"string": "Invalid consent token, re-showing prompt", "code": "invalid"}]
 67            },
 68        )
 69        self.assertFalse(UserConsent.objects.filter(user=self.user).exists())
 70
 71    def test_always_required(self):
 72        """Test always required consent"""
 73        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 74        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE)
 75        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
 76
 77        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()])
 78        session = self.client.session
 79        session[SESSION_KEY_PLAN] = plan
 80        session.save()
 81        response = self.client.get(
 82            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 83        )
 84        self.assertEqual(response.status_code, 200)
 85
 86        response = self.client.post(
 87            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 88            {
 89                "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
 90            },
 91        )
 92
 93        self.assertEqual(response.status_code, 200)
 94        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 95        self.assertFalse(UserConsent.objects.filter(user=self.user).exists())
 96
 97    def test_permanent(self):
 98        """Test permanent consent from user"""
 99        self.client.force_login(self.user)
100        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
101        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
102        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
103
104        plan = FlowPlan(
105            flow_pk=flow.pk.hex,
106            bindings=[binding],
107            markers=[StageMarker()],
108            context={
109                PLAN_CONTEXT_APPLICATION: self.application,
110            },
111        )
112        session = self.client.session
113        session[SESSION_KEY_PLAN] = plan
114        session.save()
115        response = self.client.get(
116            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
117        )
118        self.assertEqual(response.status_code, 200)
119
120        session = self.client.session
121        response = self.client.post(
122            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
123            {
124                "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
125            },
126        )
127        self.assertEqual(response.status_code, 200)
128        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
129        self.assertTrue(
130            UserConsent.objects.filter(user=self.user, application=self.application).exists()
131        )
132
133    def test_expire(self):
134        """Test expiring consent from user"""
135        self.client.force_login(self.user)
136        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
137        stage = ConsentStage.objects.create(
138            name=generate_id(), mode=ConsentMode.EXPIRING, consent_expire_in="seconds=1"
139        )
140        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
141
142        plan = FlowPlan(
143            flow_pk=flow.pk.hex,
144            bindings=[binding],
145            markers=[StageMarker()],
146            context={PLAN_CONTEXT_APPLICATION: self.application},
147        )
148        session = self.client.session
149        session[SESSION_KEY_PLAN] = plan
150        session.save()
151        response = self.client.get(
152            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
153            {},
154        )
155        self.assertEqual(response.status_code, 200)
156        raw_res = self.assertStageResponse(
157            response,
158            flow,
159            self.user,
160            permissions=[],
161            additional_permissions=[],
162        )
163        response = self.client.post(
164            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
165            {
166                "token": raw_res["token"],
167            },
168        )
169        self.assertEqual(response.status_code, 200)
170        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
171        self.assertTrue(
172            UserConsent.objects.filter(user=self.user, application=self.application).exists()
173        )
174        with freeze_time() as frozen_time:
175            frozen_time.tick(timedelta(seconds=3))
176            clean_expired_models.send()
177            self.assertFalse(
178                UserConsent.objects.filter(user=self.user, application=self.application).exists()
179            )
180
181    def test_permanent_more_perms(self):
182        """Test permanent consent from user"""
183        self.client.force_login(self.user)
184        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
185        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
186        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
187
188        plan = FlowPlan(
189            flow_pk=flow.pk.hex,
190            bindings=[binding],
191            markers=[StageMarker()],
192            context={
193                PLAN_CONTEXT_APPLICATION: self.application,
194                PLAN_CONTEXT_CONSENT_PERMISSIONS: [PermissionDict(id="foo", name="foo-desc")],
195                PLAN_CONTEXT_CONSENT_HEADER: "test header",
196            },
197        )
198        session = self.client.session
199        session[SESSION_KEY_PLAN] = plan
200        session.save()
201
202        # First, consent with a single permission
203        response = self.client.get(
204            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
205            {},
206        )
207        self.assertEqual(response.status_code, 200)
208        raw_res = self.assertStageResponse(
209            response,
210            flow,
211            self.user,
212            permissions=[
213                {"id": "foo", "name": "foo-desc"},
214            ],
215            additional_permissions=[],
216        )
217        response = self.client.post(
218            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
219            {
220                "token": raw_res["token"],
221            },
222        )
223        self.assertEqual(response.status_code, 200)
224        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
225        self.assertTrue(
226            UserConsent.objects.filter(
227                user=self.user, application=self.application, permissions="foo"
228            ).exists()
229        )
230
231        # Request again with more perms
232        plan = FlowPlan(
233            flow_pk=flow.pk.hex,
234            bindings=[binding],
235            markers=[StageMarker()],
236            context={
237                PLAN_CONTEXT_APPLICATION: self.application,
238                PLAN_CONTEXT_CONSENT_PERMISSIONS: [
239                    PermissionDict(id="foo", name="foo-desc"),
240                    PermissionDict(id="bar", name="bar-desc"),
241                ],
242            },
243        )
244        session = self.client.session
245        session[SESSION_KEY_PLAN] = plan
246        session.save()
247
248        response = self.client.get(
249            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
250            {},
251        )
252        self.assertEqual(response.status_code, 200)
253        raw_res = self.assertStageResponse(
254            response,
255            flow,
256            self.user,
257            permissions=[
258                {"id": "foo", "name": "foo-desc"},
259            ],
260            additional_permissions=[
261                {"id": "bar", "name": "bar-desc"},
262            ],
263        )
264        response = self.client.post(
265            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
266            {
267                "token": raw_res["token"],
268            },
269        )
270        self.assertEqual(response.status_code, 200)
271        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
272        self.assertTrue(
273            UserConsent.objects.filter(
274                user=self.user, application=self.application, permissions="foo bar"
275            ).exists()
276        )
277
278    def test_permanent_same(self):
279        """Test permanent consent from user"""
280        self.client.force_login(self.user)
281        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
282        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
283        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
284
285        plan = FlowPlan(
286            flow_pk=flow.pk.hex,
287            bindings=[binding],
288            markers=[StageMarker()],
289            context={
290                PLAN_CONTEXT_APPLICATION: self.application,
291            },
292        )
293        session = self.client.session
294        session[SESSION_KEY_PLAN] = plan
295        session.save()
296
297        # First, consent with a single permission
298        response = self.client.get(
299            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
300            {},
301        )
302        self.assertEqual(response.status_code, 200)
303        raw_res = self.assertStageResponse(
304            response,
305            flow,
306            self.user,
307            permissions=[],
308            additional_permissions=[],
309        )
310        response = self.client.post(
311            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
312            {
313                "token": raw_res["token"],
314            },
315        )
316        self.assertEqual(response.status_code, 200)
317        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
318        self.assertTrue(
319            UserConsent.objects.filter(
320                user=self.user, application=self.application, permissions=""
321            ).exists()
322        )
323
324        # Request again with the same perms
325        plan = FlowPlan(
326            flow_pk=flow.pk.hex,
327            bindings=[binding],
328            markers=[StageMarker()],
329            context={
330                PLAN_CONTEXT_APPLICATION: self.application,
331                PLAN_CONTEXT_CONSENT_PERMISSIONS: [],
332            },
333        )
334        session = self.client.session
335        session[SESSION_KEY_PLAN] = plan
336        session.save()
337
338        response = self.client.get(
339            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
340            {},
341        )
342        self.assertEqual(response.status_code, 200)
343        self.assertStageResponse(response, component="xak-flow-redirect")
class TestConsentStage(authentik.flows.tests.FlowTestCase):
 27class TestConsentStage(FlowTestCase):
 28    """Consent tests"""
 29
 30    def setUp(self):
 31        super().setUp()
 32        self.user = create_test_admin_user()
 33        self.application = Application.objects.create(
 34            name=generate_id(),
 35            slug=generate_id(),
 36        )
 37
 38    def test_mismatched_token(self):
 39        """Test incorrect token"""
 40        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 41        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE)
 42        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
 43
 44        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()])
 45        session = self.client.session
 46        session[SESSION_KEY_PLAN] = plan
 47        session.save()
 48        response = self.client.get(
 49            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 50        )
 51        self.assertEqual(response.status_code, 200)
 52
 53        session = self.client.session
 54        response = self.client.post(
 55            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 56            {
 57                "token": generate_id(),
 58            },
 59        )
 60
 61        self.assertEqual(response.status_code, 200)
 62        self.assertStageResponse(
 63            response,
 64            flow,
 65            component="ak-stage-consent",
 66            response_errors={
 67                "token": [{"string": "Invalid consent token, re-showing prompt", "code": "invalid"}]
 68            },
 69        )
 70        self.assertFalse(UserConsent.objects.filter(user=self.user).exists())
 71
 72    def test_always_required(self):
 73        """Test always required consent"""
 74        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 75        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE)
 76        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
 77
 78        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()])
 79        session = self.client.session
 80        session[SESSION_KEY_PLAN] = plan
 81        session.save()
 82        response = self.client.get(
 83            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 84        )
 85        self.assertEqual(response.status_code, 200)
 86
 87        response = self.client.post(
 88            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 89            {
 90                "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
 91            },
 92        )
 93
 94        self.assertEqual(response.status_code, 200)
 95        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 96        self.assertFalse(UserConsent.objects.filter(user=self.user).exists())
 97
 98    def test_permanent(self):
 99        """Test permanent consent from user"""
100        self.client.force_login(self.user)
101        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
102        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
103        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
104
105        plan = FlowPlan(
106            flow_pk=flow.pk.hex,
107            bindings=[binding],
108            markers=[StageMarker()],
109            context={
110                PLAN_CONTEXT_APPLICATION: self.application,
111            },
112        )
113        session = self.client.session
114        session[SESSION_KEY_PLAN] = plan
115        session.save()
116        response = self.client.get(
117            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
118        )
119        self.assertEqual(response.status_code, 200)
120
121        session = self.client.session
122        response = self.client.post(
123            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
124            {
125                "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
126            },
127        )
128        self.assertEqual(response.status_code, 200)
129        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
130        self.assertTrue(
131            UserConsent.objects.filter(user=self.user, application=self.application).exists()
132        )
133
134    def test_expire(self):
135        """Test expiring consent from user"""
136        self.client.force_login(self.user)
137        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
138        stage = ConsentStage.objects.create(
139            name=generate_id(), mode=ConsentMode.EXPIRING, consent_expire_in="seconds=1"
140        )
141        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
142
143        plan = FlowPlan(
144            flow_pk=flow.pk.hex,
145            bindings=[binding],
146            markers=[StageMarker()],
147            context={PLAN_CONTEXT_APPLICATION: self.application},
148        )
149        session = self.client.session
150        session[SESSION_KEY_PLAN] = plan
151        session.save()
152        response = self.client.get(
153            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
154            {},
155        )
156        self.assertEqual(response.status_code, 200)
157        raw_res = self.assertStageResponse(
158            response,
159            flow,
160            self.user,
161            permissions=[],
162            additional_permissions=[],
163        )
164        response = self.client.post(
165            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
166            {
167                "token": raw_res["token"],
168            },
169        )
170        self.assertEqual(response.status_code, 200)
171        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
172        self.assertTrue(
173            UserConsent.objects.filter(user=self.user, application=self.application).exists()
174        )
175        with freeze_time() as frozen_time:
176            frozen_time.tick(timedelta(seconds=3))
177            clean_expired_models.send()
178            self.assertFalse(
179                UserConsent.objects.filter(user=self.user, application=self.application).exists()
180            )
181
182    def test_permanent_more_perms(self):
183        """Test permanent consent from user"""
184        self.client.force_login(self.user)
185        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
186        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
187        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
188
189        plan = FlowPlan(
190            flow_pk=flow.pk.hex,
191            bindings=[binding],
192            markers=[StageMarker()],
193            context={
194                PLAN_CONTEXT_APPLICATION: self.application,
195                PLAN_CONTEXT_CONSENT_PERMISSIONS: [PermissionDict(id="foo", name="foo-desc")],
196                PLAN_CONTEXT_CONSENT_HEADER: "test header",
197            },
198        )
199        session = self.client.session
200        session[SESSION_KEY_PLAN] = plan
201        session.save()
202
203        # First, consent with a single permission
204        response = self.client.get(
205            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
206            {},
207        )
208        self.assertEqual(response.status_code, 200)
209        raw_res = self.assertStageResponse(
210            response,
211            flow,
212            self.user,
213            permissions=[
214                {"id": "foo", "name": "foo-desc"},
215            ],
216            additional_permissions=[],
217        )
218        response = self.client.post(
219            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
220            {
221                "token": raw_res["token"],
222            },
223        )
224        self.assertEqual(response.status_code, 200)
225        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
226        self.assertTrue(
227            UserConsent.objects.filter(
228                user=self.user, application=self.application, permissions="foo"
229            ).exists()
230        )
231
232        # Request again with more perms
233        plan = FlowPlan(
234            flow_pk=flow.pk.hex,
235            bindings=[binding],
236            markers=[StageMarker()],
237            context={
238                PLAN_CONTEXT_APPLICATION: self.application,
239                PLAN_CONTEXT_CONSENT_PERMISSIONS: [
240                    PermissionDict(id="foo", name="foo-desc"),
241                    PermissionDict(id="bar", name="bar-desc"),
242                ],
243            },
244        )
245        session = self.client.session
246        session[SESSION_KEY_PLAN] = plan
247        session.save()
248
249        response = self.client.get(
250            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
251            {},
252        )
253        self.assertEqual(response.status_code, 200)
254        raw_res = self.assertStageResponse(
255            response,
256            flow,
257            self.user,
258            permissions=[
259                {"id": "foo", "name": "foo-desc"},
260            ],
261            additional_permissions=[
262                {"id": "bar", "name": "bar-desc"},
263            ],
264        )
265        response = self.client.post(
266            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
267            {
268                "token": raw_res["token"],
269            },
270        )
271        self.assertEqual(response.status_code, 200)
272        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
273        self.assertTrue(
274            UserConsent.objects.filter(
275                user=self.user, application=self.application, permissions="foo bar"
276            ).exists()
277        )
278
279    def test_permanent_same(self):
280        """Test permanent consent from user"""
281        self.client.force_login(self.user)
282        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
283        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
284        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
285
286        plan = FlowPlan(
287            flow_pk=flow.pk.hex,
288            bindings=[binding],
289            markers=[StageMarker()],
290            context={
291                PLAN_CONTEXT_APPLICATION: self.application,
292            },
293        )
294        session = self.client.session
295        session[SESSION_KEY_PLAN] = plan
296        session.save()
297
298        # First, consent with a single permission
299        response = self.client.get(
300            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
301            {},
302        )
303        self.assertEqual(response.status_code, 200)
304        raw_res = self.assertStageResponse(
305            response,
306            flow,
307            self.user,
308            permissions=[],
309            additional_permissions=[],
310        )
311        response = self.client.post(
312            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
313            {
314                "token": raw_res["token"],
315            },
316        )
317        self.assertEqual(response.status_code, 200)
318        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
319        self.assertTrue(
320            UserConsent.objects.filter(
321                user=self.user, application=self.application, permissions=""
322            ).exists()
323        )
324
325        # Request again with the same perms
326        plan = FlowPlan(
327            flow_pk=flow.pk.hex,
328            bindings=[binding],
329            markers=[StageMarker()],
330            context={
331                PLAN_CONTEXT_APPLICATION: self.application,
332                PLAN_CONTEXT_CONSENT_PERMISSIONS: [],
333            },
334        )
335        session = self.client.session
336        session[SESSION_KEY_PLAN] = plan
337        session.save()
338
339        response = self.client.get(
340            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
341            {},
342        )
343        self.assertEqual(response.status_code, 200)
344        self.assertStageResponse(response, component="xak-flow-redirect")

Consent tests

def setUp(self):
30    def setUp(self):
31        super().setUp()
32        self.user = create_test_admin_user()
33        self.application = Application.objects.create(
34            name=generate_id(),
35            slug=generate_id(),
36        )

Hook method for setting up the test fixture before exercising it.

def test_mismatched_token(self):
38    def test_mismatched_token(self):
39        """Test incorrect token"""
40        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
41        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE)
42        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
43
44        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()])
45        session = self.client.session
46        session[SESSION_KEY_PLAN] = plan
47        session.save()
48        response = self.client.get(
49            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
50        )
51        self.assertEqual(response.status_code, 200)
52
53        session = self.client.session
54        response = self.client.post(
55            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
56            {
57                "token": generate_id(),
58            },
59        )
60
61        self.assertEqual(response.status_code, 200)
62        self.assertStageResponse(
63            response,
64            flow,
65            component="ak-stage-consent",
66            response_errors={
67                "token": [{"string": "Invalid consent token, re-showing prompt", "code": "invalid"}]
68            },
69        )
70        self.assertFalse(UserConsent.objects.filter(user=self.user).exists())

Test incorrect token

def test_always_required(self):
72    def test_always_required(self):
73        """Test always required consent"""
74        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
75        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.ALWAYS_REQUIRE)
76        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
77
78        plan = FlowPlan(flow_pk=flow.pk.hex, bindings=[binding], markers=[StageMarker()])
79        session = self.client.session
80        session[SESSION_KEY_PLAN] = plan
81        session.save()
82        response = self.client.get(
83            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
84        )
85        self.assertEqual(response.status_code, 200)
86
87        response = self.client.post(
88            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
89            {
90                "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
91            },
92        )
93
94        self.assertEqual(response.status_code, 200)
95        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
96        self.assertFalse(UserConsent.objects.filter(user=self.user).exists())

Test always required consent

def test_permanent(self):
 98    def test_permanent(self):
 99        """Test permanent consent from user"""
100        self.client.force_login(self.user)
101        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
102        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
103        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
104
105        plan = FlowPlan(
106            flow_pk=flow.pk.hex,
107            bindings=[binding],
108            markers=[StageMarker()],
109            context={
110                PLAN_CONTEXT_APPLICATION: self.application,
111            },
112        )
113        session = self.client.session
114        session[SESSION_KEY_PLAN] = plan
115        session.save()
116        response = self.client.get(
117            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
118        )
119        self.assertEqual(response.status_code, 200)
120
121        session = self.client.session
122        response = self.client.post(
123            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
124            {
125                "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
126            },
127        )
128        self.assertEqual(response.status_code, 200)
129        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
130        self.assertTrue(
131            UserConsent.objects.filter(user=self.user, application=self.application).exists()
132        )

Test permanent consent from user

def test_expire(self):
134    def test_expire(self):
135        """Test expiring consent from user"""
136        self.client.force_login(self.user)
137        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
138        stage = ConsentStage.objects.create(
139            name=generate_id(), mode=ConsentMode.EXPIRING, consent_expire_in="seconds=1"
140        )
141        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
142
143        plan = FlowPlan(
144            flow_pk=flow.pk.hex,
145            bindings=[binding],
146            markers=[StageMarker()],
147            context={PLAN_CONTEXT_APPLICATION: self.application},
148        )
149        session = self.client.session
150        session[SESSION_KEY_PLAN] = plan
151        session.save()
152        response = self.client.get(
153            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
154            {},
155        )
156        self.assertEqual(response.status_code, 200)
157        raw_res = self.assertStageResponse(
158            response,
159            flow,
160            self.user,
161            permissions=[],
162            additional_permissions=[],
163        )
164        response = self.client.post(
165            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
166            {
167                "token": raw_res["token"],
168            },
169        )
170        self.assertEqual(response.status_code, 200)
171        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
172        self.assertTrue(
173            UserConsent.objects.filter(user=self.user, application=self.application).exists()
174        )
175        with freeze_time() as frozen_time:
176            frozen_time.tick(timedelta(seconds=3))
177            clean_expired_models.send()
178            self.assertFalse(
179                UserConsent.objects.filter(user=self.user, application=self.application).exists()
180            )

Test expiring consent from user

def test_permanent_more_perms(self):
182    def test_permanent_more_perms(self):
183        """Test permanent consent from user"""
184        self.client.force_login(self.user)
185        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
186        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
187        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
188
189        plan = FlowPlan(
190            flow_pk=flow.pk.hex,
191            bindings=[binding],
192            markers=[StageMarker()],
193            context={
194                PLAN_CONTEXT_APPLICATION: self.application,
195                PLAN_CONTEXT_CONSENT_PERMISSIONS: [PermissionDict(id="foo", name="foo-desc")],
196                PLAN_CONTEXT_CONSENT_HEADER: "test header",
197            },
198        )
199        session = self.client.session
200        session[SESSION_KEY_PLAN] = plan
201        session.save()
202
203        # First, consent with a single permission
204        response = self.client.get(
205            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
206            {},
207        )
208        self.assertEqual(response.status_code, 200)
209        raw_res = self.assertStageResponse(
210            response,
211            flow,
212            self.user,
213            permissions=[
214                {"id": "foo", "name": "foo-desc"},
215            ],
216            additional_permissions=[],
217        )
218        response = self.client.post(
219            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
220            {
221                "token": raw_res["token"],
222            },
223        )
224        self.assertEqual(response.status_code, 200)
225        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
226        self.assertTrue(
227            UserConsent.objects.filter(
228                user=self.user, application=self.application, permissions="foo"
229            ).exists()
230        )
231
232        # Request again with more perms
233        plan = FlowPlan(
234            flow_pk=flow.pk.hex,
235            bindings=[binding],
236            markers=[StageMarker()],
237            context={
238                PLAN_CONTEXT_APPLICATION: self.application,
239                PLAN_CONTEXT_CONSENT_PERMISSIONS: [
240                    PermissionDict(id="foo", name="foo-desc"),
241                    PermissionDict(id="bar", name="bar-desc"),
242                ],
243            },
244        )
245        session = self.client.session
246        session[SESSION_KEY_PLAN] = plan
247        session.save()
248
249        response = self.client.get(
250            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
251            {},
252        )
253        self.assertEqual(response.status_code, 200)
254        raw_res = self.assertStageResponse(
255            response,
256            flow,
257            self.user,
258            permissions=[
259                {"id": "foo", "name": "foo-desc"},
260            ],
261            additional_permissions=[
262                {"id": "bar", "name": "bar-desc"},
263            ],
264        )
265        response = self.client.post(
266            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
267            {
268                "token": raw_res["token"],
269            },
270        )
271        self.assertEqual(response.status_code, 200)
272        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
273        self.assertTrue(
274            UserConsent.objects.filter(
275                user=self.user, application=self.application, permissions="foo bar"
276            ).exists()
277        )

Test permanent consent from user

def test_permanent_same(self):
279    def test_permanent_same(self):
280        """Test permanent consent from user"""
281        self.client.force_login(self.user)
282        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
283        stage = ConsentStage.objects.create(name=generate_id(), mode=ConsentMode.PERMANENT)
284        binding = FlowStageBinding.objects.create(target=flow, stage=stage, order=2)
285
286        plan = FlowPlan(
287            flow_pk=flow.pk.hex,
288            bindings=[binding],
289            markers=[StageMarker()],
290            context={
291                PLAN_CONTEXT_APPLICATION: self.application,
292            },
293        )
294        session = self.client.session
295        session[SESSION_KEY_PLAN] = plan
296        session.save()
297
298        # First, consent with a single permission
299        response = self.client.get(
300            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
301            {},
302        )
303        self.assertEqual(response.status_code, 200)
304        raw_res = self.assertStageResponse(
305            response,
306            flow,
307            self.user,
308            permissions=[],
309            additional_permissions=[],
310        )
311        response = self.client.post(
312            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
313            {
314                "token": raw_res["token"],
315            },
316        )
317        self.assertEqual(response.status_code, 200)
318        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
319        self.assertTrue(
320            UserConsent.objects.filter(
321                user=self.user, application=self.application, permissions=""
322            ).exists()
323        )
324
325        # Request again with the same perms
326        plan = FlowPlan(
327            flow_pk=flow.pk.hex,
328            bindings=[binding],
329            markers=[StageMarker()],
330            context={
331                PLAN_CONTEXT_APPLICATION: self.application,
332                PLAN_CONTEXT_CONSENT_PERMISSIONS: [],
333            },
334        )
335        session = self.client.session
336        session[SESSION_KEY_PLAN] = plan
337        session.save()
338
339        response = self.client.get(
340            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
341            {},
342        )
343        self.assertEqual(response.status_code, 200)
344        self.assertStageResponse(response, component="xak-flow-redirect")

Test permanent consent from user