authentik.stages.authenticator_validate.tests.test_totp

Test validator stage

  1"""Test validator stage"""
  2
  3from datetime import datetime, timedelta
  4from hashlib import sha256
  5from time import sleep
  6
  7from django.test.client import RequestFactory
  8from django.urls.base import reverse
  9from jwt import encode
 10from rest_framework.exceptions import ValidationError
 11
 12from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 13from authentik.flows.models import FlowDesignation, FlowStageBinding, NotConfiguredAction
 14from authentik.flows.stage import StageView
 15from authentik.flows.tests import FlowTestCase
 16from authentik.flows.views.executor import FlowExecutorView
 17from authentik.lib.generators import generate_id
 18from authentik.root.install_id import get_install_id
 19from authentik.stages.authenticator.oath import TOTP
 20from authentik.stages.authenticator_totp.models import TOTPDevice
 21from authentik.stages.authenticator_validate.challenge import (
 22    get_challenge_for_device,
 23    validate_challenge_code,
 24)
 25from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
 26from authentik.stages.authenticator_validate.stage import COOKIE_NAME_MFA
 27from authentik.stages.identification.models import IdentificationStage, UserFields
 28
 29
 30class AuthenticatorValidateStageTOTPTests(FlowTestCase):
 31    """Test validator stage"""
 32
 33    def setUp(self) -> None:
 34        self.user = create_test_admin_user()
 35        self.request_factory = RequestFactory()
 36        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 37
 38    def test_last_auth_threshold(self):
 39        """Test last_auth_threshold"""
 40        ident_stage = IdentificationStage.objects.create(
 41            name=generate_id(),
 42            user_fields=[
 43                UserFields.USERNAME,
 44            ],
 45        )
 46        device: TOTPDevice = TOTPDevice.objects.create(
 47            user=self.user,
 48            confirmed=True,
 49        )
 50        # Verify token once here to set last_t etc
 51        totp = TOTP(device.bin_key)
 52        sleep(1)
 53        self.assertTrue(device.verify_token(totp.token()))
 54        stage = AuthenticatorValidateStage.objects.create(
 55            name=generate_id(),
 56            last_auth_threshold="milliseconds=0",
 57            not_configured_action=NotConfiguredAction.CONFIGURE,
 58            device_classes=[DeviceClasses.TOTP],
 59        )
 60        stage.configuration_stages.set([ident_stage])
 61        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
 62        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
 63
 64        response = self.client.post(
 65            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 66            {"uid_field": self.user.username},
 67        )
 68        self.assertEqual(response.status_code, 302)
 69        response = self.client.get(
 70            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 71            follow=True,
 72        )
 73        self.assertStageResponse(
 74            response,
 75            self.flow,
 76            component="ak-stage-authenticator-validate",
 77        )
 78
 79    def test_last_auth_threshold_valid(self):
 80        """Test last_auth_threshold"""
 81        ident_stage = IdentificationStage.objects.create(
 82            name=generate_id(),
 83            user_fields=[
 84                UserFields.USERNAME,
 85            ],
 86        )
 87        device: TOTPDevice = TOTPDevice.objects.create(
 88            user=self.user,
 89            confirmed=True,
 90        )
 91        stage = AuthenticatorValidateStage.objects.create(
 92            name=generate_id(),
 93            last_auth_threshold="hours=1",
 94            not_configured_action=NotConfiguredAction.CONFIGURE,
 95            device_classes=[DeviceClasses.TOTP],
 96        )
 97        stage.configuration_stages.set([ident_stage])
 98        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
 99        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
100
101        response = self.client.post(
102            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
103            {"uid_field": self.user.username},
104        )
105        self.assertEqual(response.status_code, 302)
106        response = self.client.get(
107            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
108        )
109        # Verify token once here to set last_t etc
110        totp = TOTP(device.bin_key)
111        sleep(1)
112        response = self.client.post(
113            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
114            {"code": str(totp.token())},
115        )
116        self.assertIn(COOKIE_NAME_MFA, response.cookies)
117        self.assertStageResponse(response, component="xak-flow-redirect", to="/")
118
119    def test_last_auth_skip(self):
120        """Test valid cookie"""
121        ident_stage = IdentificationStage.objects.create(
122            name=generate_id(),
123            user_fields=[
124                UserFields.USERNAME,
125            ],
126        )
127        device: TOTPDevice = TOTPDevice.objects.create(
128            user=self.user,
129            confirmed=True,
130        )
131        stage = AuthenticatorValidateStage.objects.create(
132            name=generate_id(),
133            last_auth_threshold="hours=1",
134            not_configured_action=NotConfiguredAction.CONFIGURE,
135            device_classes=[DeviceClasses.TOTP],
136        )
137        stage.configuration_stages.set([ident_stage])
138        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
139        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
140
141        response = self.client.post(
142            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
143            {"uid_field": self.user.username},
144        )
145        self.assertEqual(response.status_code, 302)
146        response = self.client.get(
147            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
148        )
149        # Verify token once here to set last_t etc
150        totp = TOTP(device.bin_key)
151        sleep(1)
152        response = self.client.post(
153            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
154            {"code": str(totp.token())},
155        )
156        self.assertIn(COOKIE_NAME_MFA, response.cookies)
157        self.assertStageResponse(response, component="xak-flow-redirect", to="/")
158        mfa_cookie = response.cookies[COOKIE_NAME_MFA]
159        self.client.logout()
160        self.client.cookies[COOKIE_NAME_MFA] = mfa_cookie
161        response = self.client.post(
162            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
163            {"uid_field": self.user.username},
164        )
165        self.assertEqual(response.status_code, 302)
166        response = self.client.get(
167            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
168        )
169        self.assertStageResponse(response, component="xak-flow-redirect", to="/")
170
171    def test_last_auth_stage_pk(self):
172        """Test MFA cookie with wrong stage PK"""
173        ident_stage = IdentificationStage.objects.create(
174            name=generate_id(),
175            user_fields=[
176                UserFields.USERNAME,
177            ],
178        )
179        device: TOTPDevice = TOTPDevice.objects.create(
180            user=self.user,
181            confirmed=True,
182        )
183        stage = AuthenticatorValidateStage.objects.create(
184            name=generate_id(),
185            last_auth_threshold="hours=1",
186            not_configured_action=NotConfiguredAction.CONFIGURE,
187            device_classes=[DeviceClasses.TOTP],
188        )
189        stage.configuration_stages.set([ident_stage])
190        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
191        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
192        self.client.cookies[COOKIE_NAME_MFA] = encode(
193            payload={
194                "device": device.pk,
195                "stage": stage.pk.hex + generate_id(),
196                "exp": (datetime.now() + timedelta(days=3)).timestamp(),
197            },
198            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
199        )
200        response = self.client.post(
201            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
202            {"uid_field": self.user.username},
203        )
204        self.assertEqual(response.status_code, 302)
205        response = self.client.get(
206            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
207        )
208        self.assertStageResponse(response, component="ak-stage-authenticator-validate")
209
210    def test_last_auth_stage_device(self):
211        """Test MFA cookie with wrong device PK"""
212        ident_stage = IdentificationStage.objects.create(
213            name=generate_id(),
214            user_fields=[
215                UserFields.USERNAME,
216            ],
217        )
218        device: TOTPDevice = TOTPDevice.objects.create(
219            user=self.user,
220            confirmed=True,
221        )
222        stage = AuthenticatorValidateStage.objects.create(
223            name=generate_id(),
224            last_auth_threshold="hours=1",
225            not_configured_action=NotConfiguredAction.CONFIGURE,
226            device_classes=[DeviceClasses.TOTP],
227        )
228        stage.configuration_stages.set([ident_stage])
229        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
230        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
231        self.client.cookies[COOKIE_NAME_MFA] = encode(
232            payload={
233                "device": device.pk + 1,
234                "stage": stage.pk.hex,
235                "exp": (datetime.now() + timedelta(days=3)).timestamp(),
236            },
237            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
238        )
239        response = self.client.post(
240            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
241            {"uid_field": self.user.username},
242        )
243        self.assertEqual(response.status_code, 302)
244        response = self.client.get(
245            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
246        )
247        self.assertStageResponse(response, component="ak-stage-authenticator-validate")
248
249    def test_last_auth_stage_expired(self):
250        """Test MFA cookie with expired cookie"""
251        ident_stage = IdentificationStage.objects.create(
252            name=generate_id(),
253            user_fields=[
254                UserFields.USERNAME,
255            ],
256        )
257        device: TOTPDevice = TOTPDevice.objects.create(
258            user=self.user,
259            confirmed=True,
260        )
261        stage = AuthenticatorValidateStage.objects.create(
262            name=generate_id(),
263            last_auth_threshold="hours=1",
264            not_configured_action=NotConfiguredAction.CONFIGURE,
265            device_classes=[DeviceClasses.TOTP],
266        )
267        stage.configuration_stages.set([ident_stage])
268        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
269        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
270        self.client.cookies[COOKIE_NAME_MFA] = encode(
271            payload={
272                "device": device.pk,
273                "stage": stage.pk.hex,
274                "exp": (datetime.now() - timedelta(days=3)).timestamp(),
275            },
276            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
277        )
278        response = self.client.post(
279            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
280            {"uid_field": self.user.username},
281        )
282        self.assertEqual(response.status_code, 302)
283        response = self.client.get(
284            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
285        )
286        self.assertStageResponse(response, component="ak-stage-authenticator-validate")
287
288    def test_device_challenge_totp(self):
289        """Test device challenge"""
290        request = self.request_factory.get("/")
291        totp_device = TOTPDevice.objects.create(user=self.user, confirmed=True, digits=6)
292        stage = AuthenticatorValidateStage.objects.create(
293            name=generate_id(),
294            last_auth_threshold="hours=1",
295            not_configured_action=NotConfiguredAction.CONFIGURE,
296            device_classes=[DeviceClasses.TOTP],
297        )
298        self.assertEqual(get_challenge_for_device(request, stage, totp_device), {})
299        with self.assertRaises(ValidationError):
300            validate_challenge_code(
301                "1234", StageView(FlowExecutorView(current_stage=stage), request=request), self.user
302            )
class AuthenticatorValidateStageTOTPTests(authentik.flows.tests.FlowTestCase):
 31class AuthenticatorValidateStageTOTPTests(FlowTestCase):
 32    """Test validator stage"""
 33
 34    def setUp(self) -> None:
 35        self.user = create_test_admin_user()
 36        self.request_factory = RequestFactory()
 37        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 38
 39    def test_last_auth_threshold(self):
 40        """Test last_auth_threshold"""
 41        ident_stage = IdentificationStage.objects.create(
 42            name=generate_id(),
 43            user_fields=[
 44                UserFields.USERNAME,
 45            ],
 46        )
 47        device: TOTPDevice = TOTPDevice.objects.create(
 48            user=self.user,
 49            confirmed=True,
 50        )
 51        # Verify token once here to set last_t etc
 52        totp = TOTP(device.bin_key)
 53        sleep(1)
 54        self.assertTrue(device.verify_token(totp.token()))
 55        stage = AuthenticatorValidateStage.objects.create(
 56            name=generate_id(),
 57            last_auth_threshold="milliseconds=0",
 58            not_configured_action=NotConfiguredAction.CONFIGURE,
 59            device_classes=[DeviceClasses.TOTP],
 60        )
 61        stage.configuration_stages.set([ident_stage])
 62        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
 63        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
 64
 65        response = self.client.post(
 66            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 67            {"uid_field": self.user.username},
 68        )
 69        self.assertEqual(response.status_code, 302)
 70        response = self.client.get(
 71            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 72            follow=True,
 73        )
 74        self.assertStageResponse(
 75            response,
 76            self.flow,
 77            component="ak-stage-authenticator-validate",
 78        )
 79
 80    def test_last_auth_threshold_valid(self):
 81        """Test last_auth_threshold"""
 82        ident_stage = IdentificationStage.objects.create(
 83            name=generate_id(),
 84            user_fields=[
 85                UserFields.USERNAME,
 86            ],
 87        )
 88        device: TOTPDevice = TOTPDevice.objects.create(
 89            user=self.user,
 90            confirmed=True,
 91        )
 92        stage = AuthenticatorValidateStage.objects.create(
 93            name=generate_id(),
 94            last_auth_threshold="hours=1",
 95            not_configured_action=NotConfiguredAction.CONFIGURE,
 96            device_classes=[DeviceClasses.TOTP],
 97        )
 98        stage.configuration_stages.set([ident_stage])
 99        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
100        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
101
102        response = self.client.post(
103            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
104            {"uid_field": self.user.username},
105        )
106        self.assertEqual(response.status_code, 302)
107        response = self.client.get(
108            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
109        )
110        # Verify token once here to set last_t etc
111        totp = TOTP(device.bin_key)
112        sleep(1)
113        response = self.client.post(
114            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
115            {"code": str(totp.token())},
116        )
117        self.assertIn(COOKIE_NAME_MFA, response.cookies)
118        self.assertStageResponse(response, component="xak-flow-redirect", to="/")
119
120    def test_last_auth_skip(self):
121        """Test valid cookie"""
122        ident_stage = IdentificationStage.objects.create(
123            name=generate_id(),
124            user_fields=[
125                UserFields.USERNAME,
126            ],
127        )
128        device: TOTPDevice = TOTPDevice.objects.create(
129            user=self.user,
130            confirmed=True,
131        )
132        stage = AuthenticatorValidateStage.objects.create(
133            name=generate_id(),
134            last_auth_threshold="hours=1",
135            not_configured_action=NotConfiguredAction.CONFIGURE,
136            device_classes=[DeviceClasses.TOTP],
137        )
138        stage.configuration_stages.set([ident_stage])
139        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
140        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
141
142        response = self.client.post(
143            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
144            {"uid_field": self.user.username},
145        )
146        self.assertEqual(response.status_code, 302)
147        response = self.client.get(
148            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
149        )
150        # Verify token once here to set last_t etc
151        totp = TOTP(device.bin_key)
152        sleep(1)
153        response = self.client.post(
154            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
155            {"code": str(totp.token())},
156        )
157        self.assertIn(COOKIE_NAME_MFA, response.cookies)
158        self.assertStageResponse(response, component="xak-flow-redirect", to="/")
159        mfa_cookie = response.cookies[COOKIE_NAME_MFA]
160        self.client.logout()
161        self.client.cookies[COOKIE_NAME_MFA] = mfa_cookie
162        response = self.client.post(
163            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
164            {"uid_field": self.user.username},
165        )
166        self.assertEqual(response.status_code, 302)
167        response = self.client.get(
168            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
169        )
170        self.assertStageResponse(response, component="xak-flow-redirect", to="/")
171
172    def test_last_auth_stage_pk(self):
173        """Test MFA cookie with wrong stage PK"""
174        ident_stage = IdentificationStage.objects.create(
175            name=generate_id(),
176            user_fields=[
177                UserFields.USERNAME,
178            ],
179        )
180        device: TOTPDevice = TOTPDevice.objects.create(
181            user=self.user,
182            confirmed=True,
183        )
184        stage = AuthenticatorValidateStage.objects.create(
185            name=generate_id(),
186            last_auth_threshold="hours=1",
187            not_configured_action=NotConfiguredAction.CONFIGURE,
188            device_classes=[DeviceClasses.TOTP],
189        )
190        stage.configuration_stages.set([ident_stage])
191        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
192        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
193        self.client.cookies[COOKIE_NAME_MFA] = encode(
194            payload={
195                "device": device.pk,
196                "stage": stage.pk.hex + generate_id(),
197                "exp": (datetime.now() + timedelta(days=3)).timestamp(),
198            },
199            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
200        )
201        response = self.client.post(
202            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
203            {"uid_field": self.user.username},
204        )
205        self.assertEqual(response.status_code, 302)
206        response = self.client.get(
207            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
208        )
209        self.assertStageResponse(response, component="ak-stage-authenticator-validate")
210
211    def test_last_auth_stage_device(self):
212        """Test MFA cookie with wrong device PK"""
213        ident_stage = IdentificationStage.objects.create(
214            name=generate_id(),
215            user_fields=[
216                UserFields.USERNAME,
217            ],
218        )
219        device: TOTPDevice = TOTPDevice.objects.create(
220            user=self.user,
221            confirmed=True,
222        )
223        stage = AuthenticatorValidateStage.objects.create(
224            name=generate_id(),
225            last_auth_threshold="hours=1",
226            not_configured_action=NotConfiguredAction.CONFIGURE,
227            device_classes=[DeviceClasses.TOTP],
228        )
229        stage.configuration_stages.set([ident_stage])
230        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
231        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
232        self.client.cookies[COOKIE_NAME_MFA] = encode(
233            payload={
234                "device": device.pk + 1,
235                "stage": stage.pk.hex,
236                "exp": (datetime.now() + timedelta(days=3)).timestamp(),
237            },
238            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
239        )
240        response = self.client.post(
241            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
242            {"uid_field": self.user.username},
243        )
244        self.assertEqual(response.status_code, 302)
245        response = self.client.get(
246            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
247        )
248        self.assertStageResponse(response, component="ak-stage-authenticator-validate")
249
250    def test_last_auth_stage_expired(self):
251        """Test MFA cookie with expired cookie"""
252        ident_stage = IdentificationStage.objects.create(
253            name=generate_id(),
254            user_fields=[
255                UserFields.USERNAME,
256            ],
257        )
258        device: TOTPDevice = TOTPDevice.objects.create(
259            user=self.user,
260            confirmed=True,
261        )
262        stage = AuthenticatorValidateStage.objects.create(
263            name=generate_id(),
264            last_auth_threshold="hours=1",
265            not_configured_action=NotConfiguredAction.CONFIGURE,
266            device_classes=[DeviceClasses.TOTP],
267        )
268        stage.configuration_stages.set([ident_stage])
269        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
270        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
271        self.client.cookies[COOKIE_NAME_MFA] = encode(
272            payload={
273                "device": device.pk,
274                "stage": stage.pk.hex,
275                "exp": (datetime.now() - timedelta(days=3)).timestamp(),
276            },
277            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
278        )
279        response = self.client.post(
280            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
281            {"uid_field": self.user.username},
282        )
283        self.assertEqual(response.status_code, 302)
284        response = self.client.get(
285            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
286        )
287        self.assertStageResponse(response, component="ak-stage-authenticator-validate")
288
289    def test_device_challenge_totp(self):
290        """Test device challenge"""
291        request = self.request_factory.get("/")
292        totp_device = TOTPDevice.objects.create(user=self.user, confirmed=True, digits=6)
293        stage = AuthenticatorValidateStage.objects.create(
294            name=generate_id(),
295            last_auth_threshold="hours=1",
296            not_configured_action=NotConfiguredAction.CONFIGURE,
297            device_classes=[DeviceClasses.TOTP],
298        )
299        self.assertEqual(get_challenge_for_device(request, stage, totp_device), {})
300        with self.assertRaises(ValidationError):
301            validate_challenge_code(
302                "1234", StageView(FlowExecutorView(current_stage=stage), request=request), self.user
303            )

Test validator stage

def setUp(self) -> None:
34    def setUp(self) -> None:
35        self.user = create_test_admin_user()
36        self.request_factory = RequestFactory()
37        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)

Hook method for setting up the test fixture before exercising it.

def test_last_auth_threshold(self):
39    def test_last_auth_threshold(self):
40        """Test last_auth_threshold"""
41        ident_stage = IdentificationStage.objects.create(
42            name=generate_id(),
43            user_fields=[
44                UserFields.USERNAME,
45            ],
46        )
47        device: TOTPDevice = TOTPDevice.objects.create(
48            user=self.user,
49            confirmed=True,
50        )
51        # Verify token once here to set last_t etc
52        totp = TOTP(device.bin_key)
53        sleep(1)
54        self.assertTrue(device.verify_token(totp.token()))
55        stage = AuthenticatorValidateStage.objects.create(
56            name=generate_id(),
57            last_auth_threshold="milliseconds=0",
58            not_configured_action=NotConfiguredAction.CONFIGURE,
59            device_classes=[DeviceClasses.TOTP],
60        )
61        stage.configuration_stages.set([ident_stage])
62        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
63        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
64
65        response = self.client.post(
66            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
67            {"uid_field": self.user.username},
68        )
69        self.assertEqual(response.status_code, 302)
70        response = self.client.get(
71            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
72            follow=True,
73        )
74        self.assertStageResponse(
75            response,
76            self.flow,
77            component="ak-stage-authenticator-validate",
78        )

Test last_auth_threshold

def test_last_auth_threshold_valid(self):
 80    def test_last_auth_threshold_valid(self):
 81        """Test last_auth_threshold"""
 82        ident_stage = IdentificationStage.objects.create(
 83            name=generate_id(),
 84            user_fields=[
 85                UserFields.USERNAME,
 86            ],
 87        )
 88        device: TOTPDevice = TOTPDevice.objects.create(
 89            user=self.user,
 90            confirmed=True,
 91        )
 92        stage = AuthenticatorValidateStage.objects.create(
 93            name=generate_id(),
 94            last_auth_threshold="hours=1",
 95            not_configured_action=NotConfiguredAction.CONFIGURE,
 96            device_classes=[DeviceClasses.TOTP],
 97        )
 98        stage.configuration_stages.set([ident_stage])
 99        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
100        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
101
102        response = self.client.post(
103            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
104            {"uid_field": self.user.username},
105        )
106        self.assertEqual(response.status_code, 302)
107        response = self.client.get(
108            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
109        )
110        # Verify token once here to set last_t etc
111        totp = TOTP(device.bin_key)
112        sleep(1)
113        response = self.client.post(
114            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
115            {"code": str(totp.token())},
116        )
117        self.assertIn(COOKIE_NAME_MFA, response.cookies)
118        self.assertStageResponse(response, component="xak-flow-redirect", to="/")

Test last_auth_threshold

def test_last_auth_skip(self):
120    def test_last_auth_skip(self):
121        """Test valid cookie"""
122        ident_stage = IdentificationStage.objects.create(
123            name=generate_id(),
124            user_fields=[
125                UserFields.USERNAME,
126            ],
127        )
128        device: TOTPDevice = TOTPDevice.objects.create(
129            user=self.user,
130            confirmed=True,
131        )
132        stage = AuthenticatorValidateStage.objects.create(
133            name=generate_id(),
134            last_auth_threshold="hours=1",
135            not_configured_action=NotConfiguredAction.CONFIGURE,
136            device_classes=[DeviceClasses.TOTP],
137        )
138        stage.configuration_stages.set([ident_stage])
139        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
140        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
141
142        response = self.client.post(
143            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
144            {"uid_field": self.user.username},
145        )
146        self.assertEqual(response.status_code, 302)
147        response = self.client.get(
148            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
149        )
150        # Verify token once here to set last_t etc
151        totp = TOTP(device.bin_key)
152        sleep(1)
153        response = self.client.post(
154            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
155            {"code": str(totp.token())},
156        )
157        self.assertIn(COOKIE_NAME_MFA, response.cookies)
158        self.assertStageResponse(response, component="xak-flow-redirect", to="/")
159        mfa_cookie = response.cookies[COOKIE_NAME_MFA]
160        self.client.logout()
161        self.client.cookies[COOKIE_NAME_MFA] = mfa_cookie
162        response = self.client.post(
163            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
164            {"uid_field": self.user.username},
165        )
166        self.assertEqual(response.status_code, 302)
167        response = self.client.get(
168            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
169        )
170        self.assertStageResponse(response, component="xak-flow-redirect", to="/")

Test valid cookie

def test_last_auth_stage_pk(self):
172    def test_last_auth_stage_pk(self):
173        """Test MFA cookie with wrong stage PK"""
174        ident_stage = IdentificationStage.objects.create(
175            name=generate_id(),
176            user_fields=[
177                UserFields.USERNAME,
178            ],
179        )
180        device: TOTPDevice = TOTPDevice.objects.create(
181            user=self.user,
182            confirmed=True,
183        )
184        stage = AuthenticatorValidateStage.objects.create(
185            name=generate_id(),
186            last_auth_threshold="hours=1",
187            not_configured_action=NotConfiguredAction.CONFIGURE,
188            device_classes=[DeviceClasses.TOTP],
189        )
190        stage.configuration_stages.set([ident_stage])
191        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
192        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
193        self.client.cookies[COOKIE_NAME_MFA] = encode(
194            payload={
195                "device": device.pk,
196                "stage": stage.pk.hex + generate_id(),
197                "exp": (datetime.now() + timedelta(days=3)).timestamp(),
198            },
199            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
200        )
201        response = self.client.post(
202            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
203            {"uid_field": self.user.username},
204        )
205        self.assertEqual(response.status_code, 302)
206        response = self.client.get(
207            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
208        )
209        self.assertStageResponse(response, component="ak-stage-authenticator-validate")

Test MFA cookie with wrong stage PK

def test_last_auth_stage_device(self):
211    def test_last_auth_stage_device(self):
212        """Test MFA cookie with wrong device PK"""
213        ident_stage = IdentificationStage.objects.create(
214            name=generate_id(),
215            user_fields=[
216                UserFields.USERNAME,
217            ],
218        )
219        device: TOTPDevice = TOTPDevice.objects.create(
220            user=self.user,
221            confirmed=True,
222        )
223        stage = AuthenticatorValidateStage.objects.create(
224            name=generate_id(),
225            last_auth_threshold="hours=1",
226            not_configured_action=NotConfiguredAction.CONFIGURE,
227            device_classes=[DeviceClasses.TOTP],
228        )
229        stage.configuration_stages.set([ident_stage])
230        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
231        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
232        self.client.cookies[COOKIE_NAME_MFA] = encode(
233            payload={
234                "device": device.pk + 1,
235                "stage": stage.pk.hex,
236                "exp": (datetime.now() + timedelta(days=3)).timestamp(),
237            },
238            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
239        )
240        response = self.client.post(
241            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
242            {"uid_field": self.user.username},
243        )
244        self.assertEqual(response.status_code, 302)
245        response = self.client.get(
246            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
247        )
248        self.assertStageResponse(response, component="ak-stage-authenticator-validate")

Test MFA cookie with wrong device PK

def test_last_auth_stage_expired(self):
250    def test_last_auth_stage_expired(self):
251        """Test MFA cookie with expired cookie"""
252        ident_stage = IdentificationStage.objects.create(
253            name=generate_id(),
254            user_fields=[
255                UserFields.USERNAME,
256            ],
257        )
258        device: TOTPDevice = TOTPDevice.objects.create(
259            user=self.user,
260            confirmed=True,
261        )
262        stage = AuthenticatorValidateStage.objects.create(
263            name=generate_id(),
264            last_auth_threshold="hours=1",
265            not_configured_action=NotConfiguredAction.CONFIGURE,
266            device_classes=[DeviceClasses.TOTP],
267        )
268        stage.configuration_stages.set([ident_stage])
269        FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
270        FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
271        self.client.cookies[COOKIE_NAME_MFA] = encode(
272            payload={
273                "device": device.pk,
274                "stage": stage.pk.hex,
275                "exp": (datetime.now() - timedelta(days=3)).timestamp(),
276            },
277            key=sha256(f"{get_install_id()}:{stage.pk.hex}".encode("ascii")).hexdigest(),
278        )
279        response = self.client.post(
280            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
281            {"uid_field": self.user.username},
282        )
283        self.assertEqual(response.status_code, 302)
284        response = self.client.get(
285            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
286        )
287        self.assertStageResponse(response, component="ak-stage-authenticator-validate")

Test MFA cookie with expired cookie

def test_device_challenge_totp(self):
289    def test_device_challenge_totp(self):
290        """Test device challenge"""
291        request = self.request_factory.get("/")
292        totp_device = TOTPDevice.objects.create(user=self.user, confirmed=True, digits=6)
293        stage = AuthenticatorValidateStage.objects.create(
294            name=generate_id(),
295            last_auth_threshold="hours=1",
296            not_configured_action=NotConfiguredAction.CONFIGURE,
297            device_classes=[DeviceClasses.TOTP],
298        )
299        self.assertEqual(get_challenge_for_device(request, stage, totp_device), {})
300        with self.assertRaises(ValidationError):
301            validate_challenge_code(
302                "1234", StageView(FlowExecutorView(current_stage=stage), request=request), self.user
303            )

Test device challenge