authentik.stages.identification.tests

identification tests

  1"""identification tests"""
  2
  3from django.urls import reverse
  4from requests_mock import Mocker
  5from rest_framework.exceptions import ValidationError
  6
  7from authentik.core.tests.utils import create_test_admin_user, create_test_flow
  8from authentik.flows.models import FlowDesignation, FlowStageBinding
  9from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER
 10from authentik.flows.tests import FlowTestCase
 11from authentik.lib.generators import generate_id
 12from authentik.sources.oauth.models import OAuthSource
 13from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
 14from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
 15from authentik.stages.captcha.models import CaptchaStage
 16from authentik.stages.captcha.tests import RECAPTCHA_PRIVATE_KEY, RECAPTCHA_PUBLIC_KEY
 17from authentik.stages.identification.api import IdentificationStageSerializer
 18from authentik.stages.identification.models import IdentificationStage, UserFields
 19from authentik.stages.password import BACKEND_INBUILT
 20from authentik.stages.password.models import PasswordStage
 21
 22
 23class TestIdentificationStagePasskey(FlowTestCase):
 24    """Passkey authentication tests"""
 25
 26    def setUp(self):
 27        super().setUp()
 28        self.user = create_test_admin_user()
 29        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 30        self.webauthn_stage = AuthenticatorValidateStage.objects.create(
 31            name="webauthn-validate",
 32            device_classes=[DeviceClasses.WEBAUTHN],
 33        )
 34        self.stage = IdentificationStage.objects.create(
 35            name="identification",
 36            user_fields=[UserFields.E_MAIL],
 37            webauthn_stage=self.webauthn_stage,
 38        )
 39        FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 40        self.device = WebAuthnDevice.objects.create(
 41            user=self.user,
 42            name="Test Passkey",
 43            credential_id="test-credential-id",
 44            public_key="test-public-key",
 45            sign_count=0,
 46            rp_id="testserver",
 47        )
 48
 49    def test_passkey_auth_success(self):
 50        """Test passkey sets device, user, backend and updates last_used"""
 51        from unittest.mock import patch
 52
 53        # Get challenge to initialize session
 54        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 55        self.client.get(url)
 56
 57        with patch(
 58            "authentik.stages.identification.stage.validate_challenge_webauthn",
 59            return_value=self.device,
 60        ):
 61            response = self.client.post(
 62                url, {"passkey": {"id": "test"}}, content_type="application/json"
 63            )
 64
 65        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 66        # Verify device last_used was updated
 67        self.device.refresh_from_db()
 68        self.assertIsNotNone(self.device.last_used)
 69
 70    def test_passkey_challenge_disabled(self):
 71        """Test that passkey challenge is not included when webauthn_stage is not set"""
 72        self.stage.webauthn_stage = None
 73        self.stage.save()
 74        response = self.client.get(
 75            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 76        )
 77        self.assertEqual(response.status_code, 200)
 78        data = response.json()
 79        self.assertIsNone(data.get("passkey_challenge"))
 80
 81    def test_passkey_challenge_enabled(self):
 82        """Test that passkey challenge is included when webauthn_stage is set"""
 83        response = self.client.get(
 84            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 85        )
 86        self.assertEqual(response.status_code, 200)
 87        data = response.json()
 88        self.assertIsNotNone(data.get("passkey_challenge"))
 89        passkey_challenge = data["passkey_challenge"]
 90        self.assertIn("challenge", passkey_challenge)
 91        self.assertIn("rpId", passkey_challenge)
 92        self.assertEqual(passkey_challenge["allowCredentials"], [])
 93
 94    def test_passkey_challenge_generation(self):
 95        """Test passkey challenge is generated correctly"""
 96        response = self.client.get(
 97            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 98        )
 99        self.assertEqual(response.status_code, 200)
100        data = response.json()
101        self.assertIsNotNone(data.get("passkey_challenge"))
102
103    def test_passkey_no_uid_field_required(self):
104        """Test that uid_field is not required when passkey is provided"""
105        # Get the challenge first to set up the session
106        response = self.client.get(
107            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
108        )
109        self.assertEqual(response.status_code, 200)
110
111        # Submit without uid_field but with passkey (invalid passkey will fail validation)
112        form_data = {
113            "passkey": {
114                "id": "invalid",
115                "rawId": "invalid",
116                "type": "public-key",
117                "response": {
118                    "clientDataJSON": "invalid",
119                    "authenticatorData": "invalid",
120                    "signature": "invalid",
121                },
122            }
123        }
124        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
125        response = self.client.post(url, form_data, content_type="application/json")
126        self.assertEqual(response.status_code, 200)
127        data = response.json()
128        self.assertIn("response_errors", data)
129        errors = data.get("response_errors", {})
130        self.assertNotIn("uid_field", errors)
131
132
133class TestIdentificationStage(FlowTestCase):
134    """Identification tests"""
135
136    def setUp(self):
137        super().setUp()
138        self.user = create_test_admin_user()
139
140        # OAuthSource for the login view
141        self.source = OAuthSource.objects.create(name=generate_id(), slug=generate_id())
142
143        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
144        self.stage = IdentificationStage.objects.create(
145            name="identification",
146            user_fields=[UserFields.E_MAIL],
147            pretend_user_exists=False,
148        )
149        self.stage.sources.set([self.source])
150        self.stage.save()
151        FlowStageBinding.objects.create(
152            target=self.flow,
153            stage=self.stage,
154            order=0,
155        )
156
157    def test_valid_render(self):
158        """Test that View renders correctly"""
159        response = self.client.get(
160            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
161        )
162        self.assertEqual(response.status_code, 200)
163
164    def test_valid_with_email(self):
165        """Test with valid email, check that URL redirects back to itself"""
166        form_data = {"uid_field": self.user.email}
167        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
168        response = self.client.post(url, form_data)
169        self.assertEqual(response.status_code, 200)
170        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
171
172    def test_valid_with_password(self):
173        """Test with valid email and password in single step"""
174        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
175        self.stage.password_stage = pw_stage
176        self.stage.save()
177        form_data = {"uid_field": self.user.email, "password": self.user.username}
178        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
179        response = self.client.post(url, form_data)
180        self.assertEqual(response.status_code, 200)
181        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
182
183    def test_invalid_with_password(self):
184        """Test with valid email and invalid password in single step"""
185        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
186        self.stage.password_stage = pw_stage
187        self.stage.save()
188        form_data = {
189            "uid_field": self.user.email,
190            "password": self.user.username + "test",
191        }
192        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
193        response = self.client.post(url, form_data)
194        self.assertStageResponse(
195            response,
196            self.flow,
197            component="ak-stage-identification",
198            password_fields=True,
199            primary_action="Log in",
200            response_errors={
201                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
202            },
203            sources=[
204                {
205                    "challenge": {
206                        "component": "xak-flow-redirect",
207                        "to": f"/source/oauth/login/{self.source.slug}/",
208                    },
209                    "icon_url": "/static/authentik/sources/default.svg",
210                    "name": self.source.name,
211                    "promoted": False,
212                }
213            ],
214            show_source_labels=False,
215            user_fields=["email"],
216        )
217
218    def test_invalid_with_password_pretend(self):
219        """Test with invalid email and invalid password in single step (with pretend_user_exists)"""
220        self.stage.pretend_user_exists = True
221        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
222        self.stage.password_stage = pw_stage
223        self.stage.save()
224        form_data = {
225            "uid_field": self.user.email + "test",
226            "password": self.user.username + "test",
227        }
228        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
229        response = self.client.post(url, form_data)
230        self.assertStageResponse(
231            response,
232            self.flow,
233            component="ak-stage-identification",
234            password_fields=True,
235            primary_action="Log in",
236            response_errors={
237                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
238            },
239            sources=[
240                {
241                    "challenge": {
242                        "component": "xak-flow-redirect",
243                        "to": f"/source/oauth/login/{self.source.slug}/",
244                    },
245                    "icon_url": "/static/authentik/sources/default.svg",
246                    "name": self.source.name,
247                    "promoted": False,
248                }
249            ],
250            show_source_labels=False,
251            user_fields=["email"],
252        )
253
254    @Mocker()
255    def test_valid_with_captcha(self, mock: Mocker):
256        """Test with valid email and captcha token in single step"""
257        mock.post(
258            "https://www.recaptcha.net/recaptcha/api/siteverify",
259            json={
260                "success": True,
261                "score": 0.5,
262            },
263        )
264
265        captcha_stage = CaptchaStage.objects.create(
266            name="captcha",
267            public_key=RECAPTCHA_PUBLIC_KEY,
268            private_key=RECAPTCHA_PRIVATE_KEY,
269        )
270        self.stage.captcha_stage = captcha_stage
271        self.stage.save()
272
273        form_data = {"uid_field": self.user.email, "captcha_token": "PASSED"}
274        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
275        response = self.client.post(url, form_data)
276        self.assertEqual(response.status_code, 200)
277        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
278
279    @Mocker()
280    def test_invalid_with_captcha(self, mock: Mocker):
281        """Test with valid email and invalid captcha token in single step"""
282        mock.post(
283            "https://www.recaptcha.net/recaptcha/api/siteverify",
284            json={
285                "success": False,
286                "score": 0.5,
287            },
288        )
289
290        captcha_stage = CaptchaStage.objects.create(
291            name="captcha",
292            public_key=RECAPTCHA_PUBLIC_KEY,
293            private_key=RECAPTCHA_PRIVATE_KEY,
294        )
295
296        self.stage.captcha_stage = captcha_stage
297        self.stage.save()
298
299        form_data = {
300            "uid_field": self.user.email,
301            "captcha_token": "FAILED",
302        }
303        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
304        response = self.client.post(url, form_data)
305        self.assertStageResponse(
306            response,
307            self.flow,
308            component="ak-stage-identification",
309            password_fields=False,
310            primary_action="Log in",
311            response_errors={
312                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
313            },
314            sources=[
315                {
316                    "challenge": {
317                        "component": "xak-flow-redirect",
318                        "to": f"/source/oauth/login/{self.source.slug}/",
319                    },
320                    "icon_url": "/static/authentik/sources/default.svg",
321                    "name": self.source.name,
322                    "promoted": False,
323                }
324            ],
325            show_source_labels=False,
326            user_fields=["email"],
327        )
328
329    @Mocker()
330    def test_invalid_with_captcha_retriable(self, mock: Mocker):
331        """Test with valid email and invalid captcha token in single step"""
332        mock.post(
333            "https://www.recaptcha.net/recaptcha/api/siteverify",
334            json={
335                "success": False,
336                "score": 0.5,
337                "error-codes": ["timeout-or-duplicate"],
338            },
339        )
340
341        captcha_stage = CaptchaStage.objects.create(
342            name="captcha",
343            public_key=RECAPTCHA_PUBLIC_KEY,
344            private_key=RECAPTCHA_PRIVATE_KEY,
345        )
346
347        self.stage.captcha_stage = captcha_stage
348        self.stage.save()
349
350        form_data = {
351            "uid_field": self.user.email,
352            "captcha_token": "FAILED",
353        }
354        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
355        response = self.client.post(url, form_data)
356        self.assertStageResponse(
357            response,
358            self.flow,
359            component="ak-stage-identification",
360            password_fields=False,
361            primary_action="Log in",
362            response_errors={
363                "non_field_errors": [
364                    {
365                        "code": "invalid",
366                        "string": "Failed to authenticate.",
367                    }
368                ]
369            },
370            sources=[
371                {
372                    "challenge": {
373                        "component": "xak-flow-redirect",
374                        "to": f"/source/oauth/login/{self.source.slug}/",
375                    },
376                    "icon_url": "/static/authentik/sources/default.svg",
377                    "name": self.source.name,
378                    "promoted": False,
379                }
380            ],
381            show_source_labels=False,
382            user_fields=["email"],
383        )
384
385    def test_invalid_with_username(self):
386        """Test invalid with username (user exists but stage only allows email)"""
387        form_data = {"uid_field": self.user.username}
388        response = self.client.post(
389            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
390            form_data,
391        )
392        self.assertEqual(response.status_code, 200)
393        self.assertStageResponse(
394            response,
395            self.flow,
396            component="ak-stage-identification",
397            response_errors={
398                "non_field_errors": [{"string": "Failed to authenticate.", "code": "invalid"}]
399            },
400        )
401
402    def test_invalid_with_username_pretend(self):
403        """Test invalid with username (user exists but stage only allows email)"""
404        self.stage.pretend_user_exists = True
405        self.stage.save()
406        form_data = {"uid_field": self.user.username}
407        response = self.client.post(
408            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
409            form_data,
410        )
411        self.assertEqual(response.status_code, 200)
412        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
413
414    def test_invalid_no_fields(self):
415        """Test invalid with username (no user fields are enabled)"""
416        self.stage.user_fields = []
417        self.stage.save()
418        form_data = {"uid_field": self.user.username}
419        response = self.client.post(
420            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
421            form_data,
422        )
423        self.assertStageResponse(
424            response,
425            self.flow,
426            component="ak-stage-identification",
427            password_fields=False,
428            primary_action="Log in",
429            response_errors={
430                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
431            },
432            show_source_labels=False,
433            sources=[
434                {
435                    "challenge": {
436                        "component": "xak-flow-redirect",
437                        "to": f"/source/oauth/login/{self.source.slug}/",
438                    },
439                    "icon_url": "/static/authentik/sources/default.svg",
440                    "name": self.source.name,
441                    "promoted": False,
442                }
443            ],
444            user_fields=[],
445        )
446
447    def test_invalid_with_invalid_email(self):
448        """Test with invalid email (user doesn't exist) -> Will return to login form"""
449        form_data = {"uid_field": self.user.email + "test"}
450        response = self.client.post(
451            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
452            form_data,
453        )
454        self.assertEqual(response.status_code, 200)
455
456    def test_enrollment_flow(self):
457        """Test that enrollment flow is linked correctly"""
458        flow = create_test_flow()
459        self.stage.enrollment_flow = flow
460        self.stage.save()
461        FlowStageBinding.objects.create(
462            target=flow,
463            stage=self.stage,
464            order=0,
465        )
466
467        response = self.client.get(
468            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
469        )
470        self.assertStageResponse(
471            response,
472            self.flow,
473            component="ak-stage-identification",
474            user_fields=["email"],
475            password_fields=False,
476            enroll_url=reverse(
477                "authentik_core:if-flow",
478                kwargs={"flow_slug": flow.slug},
479            ),
480            show_source_labels=False,
481            primary_action="Log in",
482            sources=[
483                {
484                    "icon_url": "/static/authentik/sources/default.svg",
485                    "name": self.source.name,
486                    "challenge": {
487                        "component": "xak-flow-redirect",
488                        "to": f"/source/oauth/login/{self.source.slug}/",
489                    },
490                    "promoted": False,
491                }
492            ],
493        )
494
495    def test_link_recovery_flow(self):
496        """Test that recovery flow is linked correctly"""
497        flow = create_test_flow()
498        self.stage.recovery_flow = flow
499        self.stage.save()
500        FlowStageBinding.objects.create(
501            target=flow,
502            stage=self.stage,
503            order=0,
504        )
505        response = self.client.get(
506            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
507        )
508        self.assertStageResponse(
509            response,
510            self.flow,
511            component="ak-stage-identification",
512            user_fields=["email"],
513            password_fields=False,
514            recovery_url=reverse(
515                "authentik_core:if-flow",
516                kwargs={"flow_slug": flow.slug},
517            ),
518            show_source_labels=False,
519            primary_action="Log in",
520            sources=[
521                {
522                    "challenge": {
523                        "component": "xak-flow-redirect",
524                        "to": f"/source/oauth/login/{self.source.slug}/",
525                    },
526                    "icon_url": "/static/authentik/sources/default.svg",
527                    "name": self.source.name,
528                    "promoted": False,
529                }
530            ],
531        )
532
533    def test_recovery_flow_invalid_user(self):
534        """Test that an invalid user can proceed in a recovery flow"""
535        self.flow.designation = FlowDesignation.RECOVERY
536        self.flow.save()
537        response = self.client.get(
538            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
539        )
540        self.assertStageResponse(
541            response,
542            self.flow,
543            component="ak-stage-identification",
544            user_fields=["email"],
545            password_fields=False,
546            show_source_labels=False,
547            primary_action="Continue",
548            sources=[
549                {
550                    "challenge": {
551                        "component": "xak-flow-redirect",
552                        "to": f"/source/oauth/login/{self.source.slug}/",
553                    },
554                    "icon_url": "/static/authentik/sources/default.svg",
555                    "name": self.source.name,
556                    "promoted": False,
557                }
558            ],
559        )
560        form_data = {"uid_field": generate_id()}
561        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
562        response = self.client.post(url, form_data)
563        self.assertEqual(response.status_code, 200)
564
565    def test_api_validate(self):
566        """Test API validation"""
567        self.assertTrue(
568            IdentificationStageSerializer(
569                data={
570                    "name": generate_id(),
571                    "user_fields": [UserFields.E_MAIL, UserFields.USERNAME],
572                }
573            ).is_valid(raise_exception=True)
574        )
575        with self.assertRaises(ValidationError):
576            IdentificationStageSerializer(
577                data={
578                    "name": generate_id(),
579                    "user_fields": [],
580                    "sources": [],
581                }
582            ).is_valid(raise_exception=True)
583
584    def test_prefill(self):
585        """Username prefill from existing flow context"""
586        pw_stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT])
587        self.stage.password_stage = pw_stage
588        self.stage.save()
589
590        self.client.get(
591            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
592        )
593
594        plan = self.get_flow_plan()
595        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo"
596        self.set_flow_plan(plan)
597        with self.assertFlowFinishes() as plan:
598            response = self.client.get(
599                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
600            )
601            self.assertEqual(response.status_code, 200)
602            self.assertStageResponse(
603                response,
604                self.flow,
605                component="ak-stage-identification",
606                pending_user_identifier="foo",
607            )
608
609    def test_prefill_simple(self):
610        """Username prefill from existing flow context"""
611        self.stage.pretend_user_exists = True
612        self.stage.save()
613
614        self.client.get(
615            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
616        )
617        plan = self.get_flow_plan()
618        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo"
619        self.set_flow_plan(plan)
620        response = self.client.get(
621            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
622        )
623        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
class TestIdentificationStagePasskey(authentik.flows.tests.FlowTestCase):
 24class TestIdentificationStagePasskey(FlowTestCase):
 25    """Passkey authentication tests"""
 26
 27    def setUp(self):
 28        super().setUp()
 29        self.user = create_test_admin_user()
 30        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 31        self.webauthn_stage = AuthenticatorValidateStage.objects.create(
 32            name="webauthn-validate",
 33            device_classes=[DeviceClasses.WEBAUTHN],
 34        )
 35        self.stage = IdentificationStage.objects.create(
 36            name="identification",
 37            user_fields=[UserFields.E_MAIL],
 38            webauthn_stage=self.webauthn_stage,
 39        )
 40        FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 41        self.device = WebAuthnDevice.objects.create(
 42            user=self.user,
 43            name="Test Passkey",
 44            credential_id="test-credential-id",
 45            public_key="test-public-key",
 46            sign_count=0,
 47            rp_id="testserver",
 48        )
 49
 50    def test_passkey_auth_success(self):
 51        """Test passkey sets device, user, backend and updates last_used"""
 52        from unittest.mock import patch
 53
 54        # Get challenge to initialize session
 55        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 56        self.client.get(url)
 57
 58        with patch(
 59            "authentik.stages.identification.stage.validate_challenge_webauthn",
 60            return_value=self.device,
 61        ):
 62            response = self.client.post(
 63                url, {"passkey": {"id": "test"}}, content_type="application/json"
 64            )
 65
 66        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 67        # Verify device last_used was updated
 68        self.device.refresh_from_db()
 69        self.assertIsNotNone(self.device.last_used)
 70
 71    def test_passkey_challenge_disabled(self):
 72        """Test that passkey challenge is not included when webauthn_stage is not set"""
 73        self.stage.webauthn_stage = None
 74        self.stage.save()
 75        response = self.client.get(
 76            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 77        )
 78        self.assertEqual(response.status_code, 200)
 79        data = response.json()
 80        self.assertIsNone(data.get("passkey_challenge"))
 81
 82    def test_passkey_challenge_enabled(self):
 83        """Test that passkey challenge is included when webauthn_stage is set"""
 84        response = self.client.get(
 85            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 86        )
 87        self.assertEqual(response.status_code, 200)
 88        data = response.json()
 89        self.assertIsNotNone(data.get("passkey_challenge"))
 90        passkey_challenge = data["passkey_challenge"]
 91        self.assertIn("challenge", passkey_challenge)
 92        self.assertIn("rpId", passkey_challenge)
 93        self.assertEqual(passkey_challenge["allowCredentials"], [])
 94
 95    def test_passkey_challenge_generation(self):
 96        """Test passkey challenge is generated correctly"""
 97        response = self.client.get(
 98            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 99        )
100        self.assertEqual(response.status_code, 200)
101        data = response.json()
102        self.assertIsNotNone(data.get("passkey_challenge"))
103
104    def test_passkey_no_uid_field_required(self):
105        """Test that uid_field is not required when passkey is provided"""
106        # Get the challenge first to set up the session
107        response = self.client.get(
108            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
109        )
110        self.assertEqual(response.status_code, 200)
111
112        # Submit without uid_field but with passkey (invalid passkey will fail validation)
113        form_data = {
114            "passkey": {
115                "id": "invalid",
116                "rawId": "invalid",
117                "type": "public-key",
118                "response": {
119                    "clientDataJSON": "invalid",
120                    "authenticatorData": "invalid",
121                    "signature": "invalid",
122                },
123            }
124        }
125        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
126        response = self.client.post(url, form_data, content_type="application/json")
127        self.assertEqual(response.status_code, 200)
128        data = response.json()
129        self.assertIn("response_errors", data)
130        errors = data.get("response_errors", {})
131        self.assertNotIn("uid_field", errors)

Passkey authentication tests

def setUp(self):
27    def setUp(self):
28        super().setUp()
29        self.user = create_test_admin_user()
30        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
31        self.webauthn_stage = AuthenticatorValidateStage.objects.create(
32            name="webauthn-validate",
33            device_classes=[DeviceClasses.WEBAUTHN],
34        )
35        self.stage = IdentificationStage.objects.create(
36            name="identification",
37            user_fields=[UserFields.E_MAIL],
38            webauthn_stage=self.webauthn_stage,
39        )
40        FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
41        self.device = WebAuthnDevice.objects.create(
42            user=self.user,
43            name="Test Passkey",
44            credential_id="test-credential-id",
45            public_key="test-public-key",
46            sign_count=0,
47            rp_id="testserver",
48        )

Hook method for setting up the test fixture before exercising it.

def test_passkey_auth_success(self):
50    def test_passkey_auth_success(self):
51        """Test passkey sets device, user, backend and updates last_used"""
52        from unittest.mock import patch
53
54        # Get challenge to initialize session
55        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
56        self.client.get(url)
57
58        with patch(
59            "authentik.stages.identification.stage.validate_challenge_webauthn",
60            return_value=self.device,
61        ):
62            response = self.client.post(
63                url, {"passkey": {"id": "test"}}, content_type="application/json"
64            )
65
66        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
67        # Verify device last_used was updated
68        self.device.refresh_from_db()
69        self.assertIsNotNone(self.device.last_used)

Test passkey sets device, user, backend and updates last_used

def test_passkey_challenge_disabled(self):
71    def test_passkey_challenge_disabled(self):
72        """Test that passkey challenge is not included when webauthn_stage is not set"""
73        self.stage.webauthn_stage = None
74        self.stage.save()
75        response = self.client.get(
76            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
77        )
78        self.assertEqual(response.status_code, 200)
79        data = response.json()
80        self.assertIsNone(data.get("passkey_challenge"))

Test that passkey challenge is not included when webauthn_stage is not set

def test_passkey_challenge_enabled(self):
82    def test_passkey_challenge_enabled(self):
83        """Test that passkey challenge is included when webauthn_stage is set"""
84        response = self.client.get(
85            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
86        )
87        self.assertEqual(response.status_code, 200)
88        data = response.json()
89        self.assertIsNotNone(data.get("passkey_challenge"))
90        passkey_challenge = data["passkey_challenge"]
91        self.assertIn("challenge", passkey_challenge)
92        self.assertIn("rpId", passkey_challenge)
93        self.assertEqual(passkey_challenge["allowCredentials"], [])

Test that passkey challenge is included when webauthn_stage is set

def test_passkey_challenge_generation(self):
 95    def test_passkey_challenge_generation(self):
 96        """Test passkey challenge is generated correctly"""
 97        response = self.client.get(
 98            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 99        )
100        self.assertEqual(response.status_code, 200)
101        data = response.json()
102        self.assertIsNotNone(data.get("passkey_challenge"))

Test passkey challenge is generated correctly

def test_passkey_no_uid_field_required(self):
104    def test_passkey_no_uid_field_required(self):
105        """Test that uid_field is not required when passkey is provided"""
106        # Get the challenge first to set up the session
107        response = self.client.get(
108            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
109        )
110        self.assertEqual(response.status_code, 200)
111
112        # Submit without uid_field but with passkey (invalid passkey will fail validation)
113        form_data = {
114            "passkey": {
115                "id": "invalid",
116                "rawId": "invalid",
117                "type": "public-key",
118                "response": {
119                    "clientDataJSON": "invalid",
120                    "authenticatorData": "invalid",
121                    "signature": "invalid",
122                },
123            }
124        }
125        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
126        response = self.client.post(url, form_data, content_type="application/json")
127        self.assertEqual(response.status_code, 200)
128        data = response.json()
129        self.assertIn("response_errors", data)
130        errors = data.get("response_errors", {})
131        self.assertNotIn("uid_field", errors)

Test that uid_field is not required when passkey is provided

class TestIdentificationStage(authentik.flows.tests.FlowTestCase):
134class TestIdentificationStage(FlowTestCase):
135    """Identification tests"""
136
137    def setUp(self):
138        super().setUp()
139        self.user = create_test_admin_user()
140
141        # OAuthSource for the login view
142        self.source = OAuthSource.objects.create(name=generate_id(), slug=generate_id())
143
144        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
145        self.stage = IdentificationStage.objects.create(
146            name="identification",
147            user_fields=[UserFields.E_MAIL],
148            pretend_user_exists=False,
149        )
150        self.stage.sources.set([self.source])
151        self.stage.save()
152        FlowStageBinding.objects.create(
153            target=self.flow,
154            stage=self.stage,
155            order=0,
156        )
157
158    def test_valid_render(self):
159        """Test that View renders correctly"""
160        response = self.client.get(
161            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
162        )
163        self.assertEqual(response.status_code, 200)
164
165    def test_valid_with_email(self):
166        """Test with valid email, check that URL redirects back to itself"""
167        form_data = {"uid_field": self.user.email}
168        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
169        response = self.client.post(url, form_data)
170        self.assertEqual(response.status_code, 200)
171        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
172
173    def test_valid_with_password(self):
174        """Test with valid email and password in single step"""
175        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
176        self.stage.password_stage = pw_stage
177        self.stage.save()
178        form_data = {"uid_field": self.user.email, "password": self.user.username}
179        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
180        response = self.client.post(url, form_data)
181        self.assertEqual(response.status_code, 200)
182        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
183
184    def test_invalid_with_password(self):
185        """Test with valid email and invalid password in single step"""
186        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
187        self.stage.password_stage = pw_stage
188        self.stage.save()
189        form_data = {
190            "uid_field": self.user.email,
191            "password": self.user.username + "test",
192        }
193        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
194        response = self.client.post(url, form_data)
195        self.assertStageResponse(
196            response,
197            self.flow,
198            component="ak-stage-identification",
199            password_fields=True,
200            primary_action="Log in",
201            response_errors={
202                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
203            },
204            sources=[
205                {
206                    "challenge": {
207                        "component": "xak-flow-redirect",
208                        "to": f"/source/oauth/login/{self.source.slug}/",
209                    },
210                    "icon_url": "/static/authentik/sources/default.svg",
211                    "name": self.source.name,
212                    "promoted": False,
213                }
214            ],
215            show_source_labels=False,
216            user_fields=["email"],
217        )
218
219    def test_invalid_with_password_pretend(self):
220        """Test with invalid email and invalid password in single step (with pretend_user_exists)"""
221        self.stage.pretend_user_exists = True
222        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
223        self.stage.password_stage = pw_stage
224        self.stage.save()
225        form_data = {
226            "uid_field": self.user.email + "test",
227            "password": self.user.username + "test",
228        }
229        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
230        response = self.client.post(url, form_data)
231        self.assertStageResponse(
232            response,
233            self.flow,
234            component="ak-stage-identification",
235            password_fields=True,
236            primary_action="Log in",
237            response_errors={
238                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
239            },
240            sources=[
241                {
242                    "challenge": {
243                        "component": "xak-flow-redirect",
244                        "to": f"/source/oauth/login/{self.source.slug}/",
245                    },
246                    "icon_url": "/static/authentik/sources/default.svg",
247                    "name": self.source.name,
248                    "promoted": False,
249                }
250            ],
251            show_source_labels=False,
252            user_fields=["email"],
253        )
254
255    @Mocker()
256    def test_valid_with_captcha(self, mock: Mocker):
257        """Test with valid email and captcha token in single step"""
258        mock.post(
259            "https://www.recaptcha.net/recaptcha/api/siteverify",
260            json={
261                "success": True,
262                "score": 0.5,
263            },
264        )
265
266        captcha_stage = CaptchaStage.objects.create(
267            name="captcha",
268            public_key=RECAPTCHA_PUBLIC_KEY,
269            private_key=RECAPTCHA_PRIVATE_KEY,
270        )
271        self.stage.captcha_stage = captcha_stage
272        self.stage.save()
273
274        form_data = {"uid_field": self.user.email, "captcha_token": "PASSED"}
275        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
276        response = self.client.post(url, form_data)
277        self.assertEqual(response.status_code, 200)
278        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
279
280    @Mocker()
281    def test_invalid_with_captcha(self, mock: Mocker):
282        """Test with valid email and invalid captcha token in single step"""
283        mock.post(
284            "https://www.recaptcha.net/recaptcha/api/siteverify",
285            json={
286                "success": False,
287                "score": 0.5,
288            },
289        )
290
291        captcha_stage = CaptchaStage.objects.create(
292            name="captcha",
293            public_key=RECAPTCHA_PUBLIC_KEY,
294            private_key=RECAPTCHA_PRIVATE_KEY,
295        )
296
297        self.stage.captcha_stage = captcha_stage
298        self.stage.save()
299
300        form_data = {
301            "uid_field": self.user.email,
302            "captcha_token": "FAILED",
303        }
304        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
305        response = self.client.post(url, form_data)
306        self.assertStageResponse(
307            response,
308            self.flow,
309            component="ak-stage-identification",
310            password_fields=False,
311            primary_action="Log in",
312            response_errors={
313                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
314            },
315            sources=[
316                {
317                    "challenge": {
318                        "component": "xak-flow-redirect",
319                        "to": f"/source/oauth/login/{self.source.slug}/",
320                    },
321                    "icon_url": "/static/authentik/sources/default.svg",
322                    "name": self.source.name,
323                    "promoted": False,
324                }
325            ],
326            show_source_labels=False,
327            user_fields=["email"],
328        )
329
330    @Mocker()
331    def test_invalid_with_captcha_retriable(self, mock: Mocker):
332        """Test with valid email and invalid captcha token in single step"""
333        mock.post(
334            "https://www.recaptcha.net/recaptcha/api/siteverify",
335            json={
336                "success": False,
337                "score": 0.5,
338                "error-codes": ["timeout-or-duplicate"],
339            },
340        )
341
342        captcha_stage = CaptchaStage.objects.create(
343            name="captcha",
344            public_key=RECAPTCHA_PUBLIC_KEY,
345            private_key=RECAPTCHA_PRIVATE_KEY,
346        )
347
348        self.stage.captcha_stage = captcha_stage
349        self.stage.save()
350
351        form_data = {
352            "uid_field": self.user.email,
353            "captcha_token": "FAILED",
354        }
355        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
356        response = self.client.post(url, form_data)
357        self.assertStageResponse(
358            response,
359            self.flow,
360            component="ak-stage-identification",
361            password_fields=False,
362            primary_action="Log in",
363            response_errors={
364                "non_field_errors": [
365                    {
366                        "code": "invalid",
367                        "string": "Failed to authenticate.",
368                    }
369                ]
370            },
371            sources=[
372                {
373                    "challenge": {
374                        "component": "xak-flow-redirect",
375                        "to": f"/source/oauth/login/{self.source.slug}/",
376                    },
377                    "icon_url": "/static/authentik/sources/default.svg",
378                    "name": self.source.name,
379                    "promoted": False,
380                }
381            ],
382            show_source_labels=False,
383            user_fields=["email"],
384        )
385
386    def test_invalid_with_username(self):
387        """Test invalid with username (user exists but stage only allows email)"""
388        form_data = {"uid_field": self.user.username}
389        response = self.client.post(
390            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
391            form_data,
392        )
393        self.assertEqual(response.status_code, 200)
394        self.assertStageResponse(
395            response,
396            self.flow,
397            component="ak-stage-identification",
398            response_errors={
399                "non_field_errors": [{"string": "Failed to authenticate.", "code": "invalid"}]
400            },
401        )
402
403    def test_invalid_with_username_pretend(self):
404        """Test invalid with username (user exists but stage only allows email)"""
405        self.stage.pretend_user_exists = True
406        self.stage.save()
407        form_data = {"uid_field": self.user.username}
408        response = self.client.post(
409            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
410            form_data,
411        )
412        self.assertEqual(response.status_code, 200)
413        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
414
415    def test_invalid_no_fields(self):
416        """Test invalid with username (no user fields are enabled)"""
417        self.stage.user_fields = []
418        self.stage.save()
419        form_data = {"uid_field": self.user.username}
420        response = self.client.post(
421            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
422            form_data,
423        )
424        self.assertStageResponse(
425            response,
426            self.flow,
427            component="ak-stage-identification",
428            password_fields=False,
429            primary_action="Log in",
430            response_errors={
431                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
432            },
433            show_source_labels=False,
434            sources=[
435                {
436                    "challenge": {
437                        "component": "xak-flow-redirect",
438                        "to": f"/source/oauth/login/{self.source.slug}/",
439                    },
440                    "icon_url": "/static/authentik/sources/default.svg",
441                    "name": self.source.name,
442                    "promoted": False,
443                }
444            ],
445            user_fields=[],
446        )
447
448    def test_invalid_with_invalid_email(self):
449        """Test with invalid email (user doesn't exist) -> Will return to login form"""
450        form_data = {"uid_field": self.user.email + "test"}
451        response = self.client.post(
452            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
453            form_data,
454        )
455        self.assertEqual(response.status_code, 200)
456
457    def test_enrollment_flow(self):
458        """Test that enrollment flow is linked correctly"""
459        flow = create_test_flow()
460        self.stage.enrollment_flow = flow
461        self.stage.save()
462        FlowStageBinding.objects.create(
463            target=flow,
464            stage=self.stage,
465            order=0,
466        )
467
468        response = self.client.get(
469            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
470        )
471        self.assertStageResponse(
472            response,
473            self.flow,
474            component="ak-stage-identification",
475            user_fields=["email"],
476            password_fields=False,
477            enroll_url=reverse(
478                "authentik_core:if-flow",
479                kwargs={"flow_slug": flow.slug},
480            ),
481            show_source_labels=False,
482            primary_action="Log in",
483            sources=[
484                {
485                    "icon_url": "/static/authentik/sources/default.svg",
486                    "name": self.source.name,
487                    "challenge": {
488                        "component": "xak-flow-redirect",
489                        "to": f"/source/oauth/login/{self.source.slug}/",
490                    },
491                    "promoted": False,
492                }
493            ],
494        )
495
496    def test_link_recovery_flow(self):
497        """Test that recovery flow is linked correctly"""
498        flow = create_test_flow()
499        self.stage.recovery_flow = flow
500        self.stage.save()
501        FlowStageBinding.objects.create(
502            target=flow,
503            stage=self.stage,
504            order=0,
505        )
506        response = self.client.get(
507            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
508        )
509        self.assertStageResponse(
510            response,
511            self.flow,
512            component="ak-stage-identification",
513            user_fields=["email"],
514            password_fields=False,
515            recovery_url=reverse(
516                "authentik_core:if-flow",
517                kwargs={"flow_slug": flow.slug},
518            ),
519            show_source_labels=False,
520            primary_action="Log in",
521            sources=[
522                {
523                    "challenge": {
524                        "component": "xak-flow-redirect",
525                        "to": f"/source/oauth/login/{self.source.slug}/",
526                    },
527                    "icon_url": "/static/authentik/sources/default.svg",
528                    "name": self.source.name,
529                    "promoted": False,
530                }
531            ],
532        )
533
534    def test_recovery_flow_invalid_user(self):
535        """Test that an invalid user can proceed in a recovery flow"""
536        self.flow.designation = FlowDesignation.RECOVERY
537        self.flow.save()
538        response = self.client.get(
539            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
540        )
541        self.assertStageResponse(
542            response,
543            self.flow,
544            component="ak-stage-identification",
545            user_fields=["email"],
546            password_fields=False,
547            show_source_labels=False,
548            primary_action="Continue",
549            sources=[
550                {
551                    "challenge": {
552                        "component": "xak-flow-redirect",
553                        "to": f"/source/oauth/login/{self.source.slug}/",
554                    },
555                    "icon_url": "/static/authentik/sources/default.svg",
556                    "name": self.source.name,
557                    "promoted": False,
558                }
559            ],
560        )
561        form_data = {"uid_field": generate_id()}
562        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
563        response = self.client.post(url, form_data)
564        self.assertEqual(response.status_code, 200)
565
566    def test_api_validate(self):
567        """Test API validation"""
568        self.assertTrue(
569            IdentificationStageSerializer(
570                data={
571                    "name": generate_id(),
572                    "user_fields": [UserFields.E_MAIL, UserFields.USERNAME],
573                }
574            ).is_valid(raise_exception=True)
575        )
576        with self.assertRaises(ValidationError):
577            IdentificationStageSerializer(
578                data={
579                    "name": generate_id(),
580                    "user_fields": [],
581                    "sources": [],
582                }
583            ).is_valid(raise_exception=True)
584
585    def test_prefill(self):
586        """Username prefill from existing flow context"""
587        pw_stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT])
588        self.stage.password_stage = pw_stage
589        self.stage.save()
590
591        self.client.get(
592            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
593        )
594
595        plan = self.get_flow_plan()
596        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo"
597        self.set_flow_plan(plan)
598        with self.assertFlowFinishes() as plan:
599            response = self.client.get(
600                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
601            )
602            self.assertEqual(response.status_code, 200)
603            self.assertStageResponse(
604                response,
605                self.flow,
606                component="ak-stage-identification",
607                pending_user_identifier="foo",
608            )
609
610    def test_prefill_simple(self):
611        """Username prefill from existing flow context"""
612        self.stage.pretend_user_exists = True
613        self.stage.save()
614
615        self.client.get(
616            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
617        )
618        plan = self.get_flow_plan()
619        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo"
620        self.set_flow_plan(plan)
621        response = self.client.get(
622            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
623        )
624        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Identification tests

def setUp(self):
137    def setUp(self):
138        super().setUp()
139        self.user = create_test_admin_user()
140
141        # OAuthSource for the login view
142        self.source = OAuthSource.objects.create(name=generate_id(), slug=generate_id())
143
144        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
145        self.stage = IdentificationStage.objects.create(
146            name="identification",
147            user_fields=[UserFields.E_MAIL],
148            pretend_user_exists=False,
149        )
150        self.stage.sources.set([self.source])
151        self.stage.save()
152        FlowStageBinding.objects.create(
153            target=self.flow,
154            stage=self.stage,
155            order=0,
156        )

Hook method for setting up the test fixture before exercising it.

def test_valid_render(self):
158    def test_valid_render(self):
159        """Test that View renders correctly"""
160        response = self.client.get(
161            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
162        )
163        self.assertEqual(response.status_code, 200)

Test that View renders correctly

def test_valid_with_email(self):
165    def test_valid_with_email(self):
166        """Test with valid email, check that URL redirects back to itself"""
167        form_data = {"uid_field": self.user.email}
168        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
169        response = self.client.post(url, form_data)
170        self.assertEqual(response.status_code, 200)
171        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test with valid email, check that URL redirects back to itself

def test_valid_with_password(self):
173    def test_valid_with_password(self):
174        """Test with valid email and password in single step"""
175        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
176        self.stage.password_stage = pw_stage
177        self.stage.save()
178        form_data = {"uid_field": self.user.email, "password": self.user.username}
179        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
180        response = self.client.post(url, form_data)
181        self.assertEqual(response.status_code, 200)
182        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test with valid email and password in single step

def test_invalid_with_password(self):
184    def test_invalid_with_password(self):
185        """Test with valid email and invalid password in single step"""
186        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
187        self.stage.password_stage = pw_stage
188        self.stage.save()
189        form_data = {
190            "uid_field": self.user.email,
191            "password": self.user.username + "test",
192        }
193        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
194        response = self.client.post(url, form_data)
195        self.assertStageResponse(
196            response,
197            self.flow,
198            component="ak-stage-identification",
199            password_fields=True,
200            primary_action="Log in",
201            response_errors={
202                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
203            },
204            sources=[
205                {
206                    "challenge": {
207                        "component": "xak-flow-redirect",
208                        "to": f"/source/oauth/login/{self.source.slug}/",
209                    },
210                    "icon_url": "/static/authentik/sources/default.svg",
211                    "name": self.source.name,
212                    "promoted": False,
213                }
214            ],
215            show_source_labels=False,
216            user_fields=["email"],
217        )

Test with valid email and invalid password in single step

def test_invalid_with_password_pretend(self):
219    def test_invalid_with_password_pretend(self):
220        """Test with invalid email and invalid password in single step (with pretend_user_exists)"""
221        self.stage.pretend_user_exists = True
222        pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT])
223        self.stage.password_stage = pw_stage
224        self.stage.save()
225        form_data = {
226            "uid_field": self.user.email + "test",
227            "password": self.user.username + "test",
228        }
229        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
230        response = self.client.post(url, form_data)
231        self.assertStageResponse(
232            response,
233            self.flow,
234            component="ak-stage-identification",
235            password_fields=True,
236            primary_action="Log in",
237            response_errors={
238                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
239            },
240            sources=[
241                {
242                    "challenge": {
243                        "component": "xak-flow-redirect",
244                        "to": f"/source/oauth/login/{self.source.slug}/",
245                    },
246                    "icon_url": "/static/authentik/sources/default.svg",
247                    "name": self.source.name,
248                    "promoted": False,
249                }
250            ],
251            show_source_labels=False,
252            user_fields=["email"],
253        )

Test with invalid email and invalid password in single step (with pretend_user_exists)

@Mocker()
def test_valid_with_captcha(self, mock: requests_mock.mocker.Mocker):
255    @Mocker()
256    def test_valid_with_captcha(self, mock: Mocker):
257        """Test with valid email and captcha token in single step"""
258        mock.post(
259            "https://www.recaptcha.net/recaptcha/api/siteverify",
260            json={
261                "success": True,
262                "score": 0.5,
263            },
264        )
265
266        captcha_stage = CaptchaStage.objects.create(
267            name="captcha",
268            public_key=RECAPTCHA_PUBLIC_KEY,
269            private_key=RECAPTCHA_PRIVATE_KEY,
270        )
271        self.stage.captcha_stage = captcha_stage
272        self.stage.save()
273
274        form_data = {"uid_field": self.user.email, "captcha_token": "PASSED"}
275        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
276        response = self.client.post(url, form_data)
277        self.assertEqual(response.status_code, 200)
278        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test with valid email and captcha token in single step

@Mocker()
def test_invalid_with_captcha(self, mock: requests_mock.mocker.Mocker):
280    @Mocker()
281    def test_invalid_with_captcha(self, mock: Mocker):
282        """Test with valid email and invalid captcha token in single step"""
283        mock.post(
284            "https://www.recaptcha.net/recaptcha/api/siteverify",
285            json={
286                "success": False,
287                "score": 0.5,
288            },
289        )
290
291        captcha_stage = CaptchaStage.objects.create(
292            name="captcha",
293            public_key=RECAPTCHA_PUBLIC_KEY,
294            private_key=RECAPTCHA_PRIVATE_KEY,
295        )
296
297        self.stage.captcha_stage = captcha_stage
298        self.stage.save()
299
300        form_data = {
301            "uid_field": self.user.email,
302            "captcha_token": "FAILED",
303        }
304        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
305        response = self.client.post(url, form_data)
306        self.assertStageResponse(
307            response,
308            self.flow,
309            component="ak-stage-identification",
310            password_fields=False,
311            primary_action="Log in",
312            response_errors={
313                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
314            },
315            sources=[
316                {
317                    "challenge": {
318                        "component": "xak-flow-redirect",
319                        "to": f"/source/oauth/login/{self.source.slug}/",
320                    },
321                    "icon_url": "/static/authentik/sources/default.svg",
322                    "name": self.source.name,
323                    "promoted": False,
324                }
325            ],
326            show_source_labels=False,
327            user_fields=["email"],
328        )

Test with valid email and invalid captcha token in single step

@Mocker()
def test_invalid_with_captcha_retriable(self, mock: requests_mock.mocker.Mocker):
330    @Mocker()
331    def test_invalid_with_captcha_retriable(self, mock: Mocker):
332        """Test with valid email and invalid captcha token in single step"""
333        mock.post(
334            "https://www.recaptcha.net/recaptcha/api/siteverify",
335            json={
336                "success": False,
337                "score": 0.5,
338                "error-codes": ["timeout-or-duplicate"],
339            },
340        )
341
342        captcha_stage = CaptchaStage.objects.create(
343            name="captcha",
344            public_key=RECAPTCHA_PUBLIC_KEY,
345            private_key=RECAPTCHA_PRIVATE_KEY,
346        )
347
348        self.stage.captcha_stage = captcha_stage
349        self.stage.save()
350
351        form_data = {
352            "uid_field": self.user.email,
353            "captcha_token": "FAILED",
354        }
355        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
356        response = self.client.post(url, form_data)
357        self.assertStageResponse(
358            response,
359            self.flow,
360            component="ak-stage-identification",
361            password_fields=False,
362            primary_action="Log in",
363            response_errors={
364                "non_field_errors": [
365                    {
366                        "code": "invalid",
367                        "string": "Failed to authenticate.",
368                    }
369                ]
370            },
371            sources=[
372                {
373                    "challenge": {
374                        "component": "xak-flow-redirect",
375                        "to": f"/source/oauth/login/{self.source.slug}/",
376                    },
377                    "icon_url": "/static/authentik/sources/default.svg",
378                    "name": self.source.name,
379                    "promoted": False,
380                }
381            ],
382            show_source_labels=False,
383            user_fields=["email"],
384        )

Test with valid email and invalid captcha token in single step

def test_invalid_with_username(self):
386    def test_invalid_with_username(self):
387        """Test invalid with username (user exists but stage only allows email)"""
388        form_data = {"uid_field": self.user.username}
389        response = self.client.post(
390            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
391            form_data,
392        )
393        self.assertEqual(response.status_code, 200)
394        self.assertStageResponse(
395            response,
396            self.flow,
397            component="ak-stage-identification",
398            response_errors={
399                "non_field_errors": [{"string": "Failed to authenticate.", "code": "invalid"}]
400            },
401        )

Test invalid with username (user exists but stage only allows email)

def test_invalid_with_username_pretend(self):
403    def test_invalid_with_username_pretend(self):
404        """Test invalid with username (user exists but stage only allows email)"""
405        self.stage.pretend_user_exists = True
406        self.stage.save()
407        form_data = {"uid_field": self.user.username}
408        response = self.client.post(
409            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
410            form_data,
411        )
412        self.assertEqual(response.status_code, 200)
413        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test invalid with username (user exists but stage only allows email)

def test_invalid_no_fields(self):
415    def test_invalid_no_fields(self):
416        """Test invalid with username (no user fields are enabled)"""
417        self.stage.user_fields = []
418        self.stage.save()
419        form_data = {"uid_field": self.user.username}
420        response = self.client.post(
421            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
422            form_data,
423        )
424        self.assertStageResponse(
425            response,
426            self.flow,
427            component="ak-stage-identification",
428            password_fields=False,
429            primary_action="Log in",
430            response_errors={
431                "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}]
432            },
433            show_source_labels=False,
434            sources=[
435                {
436                    "challenge": {
437                        "component": "xak-flow-redirect",
438                        "to": f"/source/oauth/login/{self.source.slug}/",
439                    },
440                    "icon_url": "/static/authentik/sources/default.svg",
441                    "name": self.source.name,
442                    "promoted": False,
443                }
444            ],
445            user_fields=[],
446        )

Test invalid with username (no user fields are enabled)

def test_invalid_with_invalid_email(self):
448    def test_invalid_with_invalid_email(self):
449        """Test with invalid email (user doesn't exist) -> Will return to login form"""
450        form_data = {"uid_field": self.user.email + "test"}
451        response = self.client.post(
452            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
453            form_data,
454        )
455        self.assertEqual(response.status_code, 200)

Test with invalid email (user doesn't exist) -> Will return to login form

def test_enrollment_flow(self):
457    def test_enrollment_flow(self):
458        """Test that enrollment flow is linked correctly"""
459        flow = create_test_flow()
460        self.stage.enrollment_flow = flow
461        self.stage.save()
462        FlowStageBinding.objects.create(
463            target=flow,
464            stage=self.stage,
465            order=0,
466        )
467
468        response = self.client.get(
469            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
470        )
471        self.assertStageResponse(
472            response,
473            self.flow,
474            component="ak-stage-identification",
475            user_fields=["email"],
476            password_fields=False,
477            enroll_url=reverse(
478                "authentik_core:if-flow",
479                kwargs={"flow_slug": flow.slug},
480            ),
481            show_source_labels=False,
482            primary_action="Log in",
483            sources=[
484                {
485                    "icon_url": "/static/authentik/sources/default.svg",
486                    "name": self.source.name,
487                    "challenge": {
488                        "component": "xak-flow-redirect",
489                        "to": f"/source/oauth/login/{self.source.slug}/",
490                    },
491                    "promoted": False,
492                }
493            ],
494        )

Test that enrollment flow is linked correctly

def test_recovery_flow_invalid_user(self):
534    def test_recovery_flow_invalid_user(self):
535        """Test that an invalid user can proceed in a recovery flow"""
536        self.flow.designation = FlowDesignation.RECOVERY
537        self.flow.save()
538        response = self.client.get(
539            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
540        )
541        self.assertStageResponse(
542            response,
543            self.flow,
544            component="ak-stage-identification",
545            user_fields=["email"],
546            password_fields=False,
547            show_source_labels=False,
548            primary_action="Continue",
549            sources=[
550                {
551                    "challenge": {
552                        "component": "xak-flow-redirect",
553                        "to": f"/source/oauth/login/{self.source.slug}/",
554                    },
555                    "icon_url": "/static/authentik/sources/default.svg",
556                    "name": self.source.name,
557                    "promoted": False,
558                }
559            ],
560        )
561        form_data = {"uid_field": generate_id()}
562        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
563        response = self.client.post(url, form_data)
564        self.assertEqual(response.status_code, 200)

Test that an invalid user can proceed in a recovery flow

def test_api_validate(self):
566    def test_api_validate(self):
567        """Test API validation"""
568        self.assertTrue(
569            IdentificationStageSerializer(
570                data={
571                    "name": generate_id(),
572                    "user_fields": [UserFields.E_MAIL, UserFields.USERNAME],
573                }
574            ).is_valid(raise_exception=True)
575        )
576        with self.assertRaises(ValidationError):
577            IdentificationStageSerializer(
578                data={
579                    "name": generate_id(),
580                    "user_fields": [],
581                    "sources": [],
582                }
583            ).is_valid(raise_exception=True)

Test API validation

def test_prefill(self):
585    def test_prefill(self):
586        """Username prefill from existing flow context"""
587        pw_stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT])
588        self.stage.password_stage = pw_stage
589        self.stage.save()
590
591        self.client.get(
592            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
593        )
594
595        plan = self.get_flow_plan()
596        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo"
597        self.set_flow_plan(plan)
598        with self.assertFlowFinishes() as plan:
599            response = self.client.get(
600                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
601            )
602            self.assertEqual(response.status_code, 200)
603            self.assertStageResponse(
604                response,
605                self.flow,
606                component="ak-stage-identification",
607                pending_user_identifier="foo",
608            )

Username prefill from existing flow context

def test_prefill_simple(self):
610    def test_prefill_simple(self):
611        """Username prefill from existing flow context"""
612        self.stage.pretend_user_exists = True
613        self.stage.save()
614
615        self.client.get(
616            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
617        )
618        plan = self.get_flow_plan()
619        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo"
620        self.set_flow_plan(plan)
621        response = self.client.get(
622            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
623        )
624        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Username prefill from existing flow context