authentik.stages.identification.tests
identification tests
1"""identification tests""" 2 3from django.urls import reverse 4from requests_mock import Mocker 5from rest_framework.exceptions import ValidationError 6 7from authentik.core.tests.utils import create_test_admin_user, create_test_flow 8from authentik.flows.models import FlowDesignation, FlowStageBinding 9from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER 10from authentik.flows.tests import FlowTestCase 11from authentik.lib.generators import generate_id 12from authentik.sources.oauth.models import OAuthSource 13from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses 14from authentik.stages.authenticator_webauthn.models import WebAuthnDevice 15from authentik.stages.captcha.models import CaptchaStage 16from authentik.stages.captcha.tests import RECAPTCHA_PRIVATE_KEY, RECAPTCHA_PUBLIC_KEY 17from authentik.stages.identification.api import IdentificationStageSerializer 18from authentik.stages.identification.models import IdentificationStage, UserFields 19from authentik.stages.password import BACKEND_INBUILT 20from authentik.stages.password.models import PasswordStage 21 22 23class TestIdentificationStagePasskey(FlowTestCase): 24 """Passkey authentication tests""" 25 26 def setUp(self): 27 super().setUp() 28 self.user = create_test_admin_user() 29 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 30 self.webauthn_stage = AuthenticatorValidateStage.objects.create( 31 name="webauthn-validate", 32 device_classes=[DeviceClasses.WEBAUTHN], 33 ) 34 self.stage = IdentificationStage.objects.create( 35 name="identification", 36 user_fields=[UserFields.E_MAIL], 37 webauthn_stage=self.webauthn_stage, 38 ) 39 FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 40 self.device = WebAuthnDevice.objects.create( 41 user=self.user, 42 name="Test Passkey", 43 credential_id="test-credential-id", 44 public_key="test-public-key", 45 sign_count=0, 46 rp_id="testserver", 47 ) 48 49 def test_passkey_auth_success(self): 50 """Test passkey sets device, user, backend and updates last_used""" 51 from unittest.mock import patch 52 53 # Get challenge to initialize session 54 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 55 self.client.get(url) 56 57 with patch( 58 "authentik.stages.identification.stage.validate_challenge_webauthn", 59 return_value=self.device, 60 ): 61 response = self.client.post( 62 url, {"passkey": {"id": "test"}}, content_type="application/json" 63 ) 64 65 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 66 # Verify device last_used was updated 67 self.device.refresh_from_db() 68 self.assertIsNotNone(self.device.last_used) 69 70 def test_passkey_challenge_disabled(self): 71 """Test that passkey challenge is not included when webauthn_stage is not set""" 72 self.stage.webauthn_stage = None 73 self.stage.save() 74 response = self.client.get( 75 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 76 ) 77 self.assertEqual(response.status_code, 200) 78 data = response.json() 79 self.assertIsNone(data.get("passkey_challenge")) 80 81 def test_passkey_challenge_enabled(self): 82 """Test that passkey challenge is included when webauthn_stage is set""" 83 response = self.client.get( 84 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 85 ) 86 self.assertEqual(response.status_code, 200) 87 data = response.json() 88 self.assertIsNotNone(data.get("passkey_challenge")) 89 passkey_challenge = data["passkey_challenge"] 90 self.assertIn("challenge", passkey_challenge) 91 self.assertIn("rpId", passkey_challenge) 92 self.assertEqual(passkey_challenge["allowCredentials"], []) 93 94 def test_passkey_challenge_generation(self): 95 """Test passkey challenge is generated correctly""" 96 response = self.client.get( 97 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 98 ) 99 self.assertEqual(response.status_code, 200) 100 data = response.json() 101 self.assertIsNotNone(data.get("passkey_challenge")) 102 103 def test_passkey_no_uid_field_required(self): 104 """Test that uid_field is not required when passkey is provided""" 105 # Get the challenge first to set up the session 106 response = self.client.get( 107 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 108 ) 109 self.assertEqual(response.status_code, 200) 110 111 # Submit without uid_field but with passkey (invalid passkey will fail validation) 112 form_data = { 113 "passkey": { 114 "id": "invalid", 115 "rawId": "invalid", 116 "type": "public-key", 117 "response": { 118 "clientDataJSON": "invalid", 119 "authenticatorData": "invalid", 120 "signature": "invalid", 121 }, 122 } 123 } 124 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 125 response = self.client.post(url, form_data, content_type="application/json") 126 self.assertEqual(response.status_code, 200) 127 data = response.json() 128 self.assertIn("response_errors", data) 129 errors = data.get("response_errors", {}) 130 self.assertNotIn("uid_field", errors) 131 132 133class TestIdentificationStage(FlowTestCase): 134 """Identification tests""" 135 136 def setUp(self): 137 super().setUp() 138 self.user = create_test_admin_user() 139 140 # OAuthSource for the login view 141 self.source = OAuthSource.objects.create(name=generate_id(), slug=generate_id()) 142 143 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 144 self.stage = IdentificationStage.objects.create( 145 name="identification", 146 user_fields=[UserFields.E_MAIL], 147 pretend_user_exists=False, 148 ) 149 self.stage.sources.set([self.source]) 150 self.stage.save() 151 FlowStageBinding.objects.create( 152 target=self.flow, 153 stage=self.stage, 154 order=0, 155 ) 156 157 def test_valid_render(self): 158 """Test that View renders correctly""" 159 response = self.client.get( 160 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 161 ) 162 self.assertEqual(response.status_code, 200) 163 164 def test_valid_with_email(self): 165 """Test with valid email, check that URL redirects back to itself""" 166 form_data = {"uid_field": self.user.email} 167 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 168 response = self.client.post(url, form_data) 169 self.assertEqual(response.status_code, 200) 170 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 171 172 def test_valid_with_password(self): 173 """Test with valid email and password in single step""" 174 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 175 self.stage.password_stage = pw_stage 176 self.stage.save() 177 form_data = {"uid_field": self.user.email, "password": self.user.username} 178 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 179 response = self.client.post(url, form_data) 180 self.assertEqual(response.status_code, 200) 181 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 182 183 def test_invalid_with_password(self): 184 """Test with valid email and invalid password in single step""" 185 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 186 self.stage.password_stage = pw_stage 187 self.stage.save() 188 form_data = { 189 "uid_field": self.user.email, 190 "password": self.user.username + "test", 191 } 192 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 193 response = self.client.post(url, form_data) 194 self.assertStageResponse( 195 response, 196 self.flow, 197 component="ak-stage-identification", 198 password_fields=True, 199 primary_action="Log in", 200 response_errors={ 201 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 202 }, 203 sources=[ 204 { 205 "challenge": { 206 "component": "xak-flow-redirect", 207 "to": f"/source/oauth/login/{self.source.slug}/", 208 }, 209 "icon_url": "/static/authentik/sources/default.svg", 210 "name": self.source.name, 211 "promoted": False, 212 } 213 ], 214 show_source_labels=False, 215 user_fields=["email"], 216 ) 217 218 def test_invalid_with_password_pretend(self): 219 """Test with invalid email and invalid password in single step (with pretend_user_exists)""" 220 self.stage.pretend_user_exists = True 221 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 222 self.stage.password_stage = pw_stage 223 self.stage.save() 224 form_data = { 225 "uid_field": self.user.email + "test", 226 "password": self.user.username + "test", 227 } 228 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 229 response = self.client.post(url, form_data) 230 self.assertStageResponse( 231 response, 232 self.flow, 233 component="ak-stage-identification", 234 password_fields=True, 235 primary_action="Log in", 236 response_errors={ 237 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 238 }, 239 sources=[ 240 { 241 "challenge": { 242 "component": "xak-flow-redirect", 243 "to": f"/source/oauth/login/{self.source.slug}/", 244 }, 245 "icon_url": "/static/authentik/sources/default.svg", 246 "name": self.source.name, 247 "promoted": False, 248 } 249 ], 250 show_source_labels=False, 251 user_fields=["email"], 252 ) 253 254 @Mocker() 255 def test_valid_with_captcha(self, mock: Mocker): 256 """Test with valid email and captcha token in single step""" 257 mock.post( 258 "https://www.recaptcha.net/recaptcha/api/siteverify", 259 json={ 260 "success": True, 261 "score": 0.5, 262 }, 263 ) 264 265 captcha_stage = CaptchaStage.objects.create( 266 name="captcha", 267 public_key=RECAPTCHA_PUBLIC_KEY, 268 private_key=RECAPTCHA_PRIVATE_KEY, 269 ) 270 self.stage.captcha_stage = captcha_stage 271 self.stage.save() 272 273 form_data = {"uid_field": self.user.email, "captcha_token": "PASSED"} 274 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 275 response = self.client.post(url, form_data) 276 self.assertEqual(response.status_code, 200) 277 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 278 279 @Mocker() 280 def test_invalid_with_captcha(self, mock: Mocker): 281 """Test with valid email and invalid captcha token in single step""" 282 mock.post( 283 "https://www.recaptcha.net/recaptcha/api/siteverify", 284 json={ 285 "success": False, 286 "score": 0.5, 287 }, 288 ) 289 290 captcha_stage = CaptchaStage.objects.create( 291 name="captcha", 292 public_key=RECAPTCHA_PUBLIC_KEY, 293 private_key=RECAPTCHA_PRIVATE_KEY, 294 ) 295 296 self.stage.captcha_stage = captcha_stage 297 self.stage.save() 298 299 form_data = { 300 "uid_field": self.user.email, 301 "captcha_token": "FAILED", 302 } 303 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 304 response = self.client.post(url, form_data) 305 self.assertStageResponse( 306 response, 307 self.flow, 308 component="ak-stage-identification", 309 password_fields=False, 310 primary_action="Log in", 311 response_errors={ 312 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 313 }, 314 sources=[ 315 { 316 "challenge": { 317 "component": "xak-flow-redirect", 318 "to": f"/source/oauth/login/{self.source.slug}/", 319 }, 320 "icon_url": "/static/authentik/sources/default.svg", 321 "name": self.source.name, 322 "promoted": False, 323 } 324 ], 325 show_source_labels=False, 326 user_fields=["email"], 327 ) 328 329 @Mocker() 330 def test_invalid_with_captcha_retriable(self, mock: Mocker): 331 """Test with valid email and invalid captcha token in single step""" 332 mock.post( 333 "https://www.recaptcha.net/recaptcha/api/siteverify", 334 json={ 335 "success": False, 336 "score": 0.5, 337 "error-codes": ["timeout-or-duplicate"], 338 }, 339 ) 340 341 captcha_stage = CaptchaStage.objects.create( 342 name="captcha", 343 public_key=RECAPTCHA_PUBLIC_KEY, 344 private_key=RECAPTCHA_PRIVATE_KEY, 345 ) 346 347 self.stage.captcha_stage = captcha_stage 348 self.stage.save() 349 350 form_data = { 351 "uid_field": self.user.email, 352 "captcha_token": "FAILED", 353 } 354 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 355 response = self.client.post(url, form_data) 356 self.assertStageResponse( 357 response, 358 self.flow, 359 component="ak-stage-identification", 360 password_fields=False, 361 primary_action="Log in", 362 response_errors={ 363 "non_field_errors": [ 364 { 365 "code": "invalid", 366 "string": "Failed to authenticate.", 367 } 368 ] 369 }, 370 sources=[ 371 { 372 "challenge": { 373 "component": "xak-flow-redirect", 374 "to": f"/source/oauth/login/{self.source.slug}/", 375 }, 376 "icon_url": "/static/authentik/sources/default.svg", 377 "name": self.source.name, 378 "promoted": False, 379 } 380 ], 381 show_source_labels=False, 382 user_fields=["email"], 383 ) 384 385 def test_invalid_with_username(self): 386 """Test invalid with username (user exists but stage only allows email)""" 387 form_data = {"uid_field": self.user.username} 388 response = self.client.post( 389 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 390 form_data, 391 ) 392 self.assertEqual(response.status_code, 200) 393 self.assertStageResponse( 394 response, 395 self.flow, 396 component="ak-stage-identification", 397 response_errors={ 398 "non_field_errors": [{"string": "Failed to authenticate.", "code": "invalid"}] 399 }, 400 ) 401 402 def test_invalid_with_username_pretend(self): 403 """Test invalid with username (user exists but stage only allows email)""" 404 self.stage.pretend_user_exists = True 405 self.stage.save() 406 form_data = {"uid_field": self.user.username} 407 response = self.client.post( 408 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 409 form_data, 410 ) 411 self.assertEqual(response.status_code, 200) 412 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 413 414 def test_invalid_no_fields(self): 415 """Test invalid with username (no user fields are enabled)""" 416 self.stage.user_fields = [] 417 self.stage.save() 418 form_data = {"uid_field": self.user.username} 419 response = self.client.post( 420 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 421 form_data, 422 ) 423 self.assertStageResponse( 424 response, 425 self.flow, 426 component="ak-stage-identification", 427 password_fields=False, 428 primary_action="Log in", 429 response_errors={ 430 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 431 }, 432 show_source_labels=False, 433 sources=[ 434 { 435 "challenge": { 436 "component": "xak-flow-redirect", 437 "to": f"/source/oauth/login/{self.source.slug}/", 438 }, 439 "icon_url": "/static/authentik/sources/default.svg", 440 "name": self.source.name, 441 "promoted": False, 442 } 443 ], 444 user_fields=[], 445 ) 446 447 def test_invalid_with_invalid_email(self): 448 """Test with invalid email (user doesn't exist) -> Will return to login form""" 449 form_data = {"uid_field": self.user.email + "test"} 450 response = self.client.post( 451 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 452 form_data, 453 ) 454 self.assertEqual(response.status_code, 200) 455 456 def test_enrollment_flow(self): 457 """Test that enrollment flow is linked correctly""" 458 flow = create_test_flow() 459 self.stage.enrollment_flow = flow 460 self.stage.save() 461 FlowStageBinding.objects.create( 462 target=flow, 463 stage=self.stage, 464 order=0, 465 ) 466 467 response = self.client.get( 468 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 469 ) 470 self.assertStageResponse( 471 response, 472 self.flow, 473 component="ak-stage-identification", 474 user_fields=["email"], 475 password_fields=False, 476 enroll_url=reverse( 477 "authentik_core:if-flow", 478 kwargs={"flow_slug": flow.slug}, 479 ), 480 show_source_labels=False, 481 primary_action="Log in", 482 sources=[ 483 { 484 "icon_url": "/static/authentik/sources/default.svg", 485 "name": self.source.name, 486 "challenge": { 487 "component": "xak-flow-redirect", 488 "to": f"/source/oauth/login/{self.source.slug}/", 489 }, 490 "promoted": False, 491 } 492 ], 493 ) 494 495 def test_link_recovery_flow(self): 496 """Test that recovery flow is linked correctly""" 497 flow = create_test_flow() 498 self.stage.recovery_flow = flow 499 self.stage.save() 500 FlowStageBinding.objects.create( 501 target=flow, 502 stage=self.stage, 503 order=0, 504 ) 505 response = self.client.get( 506 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 507 ) 508 self.assertStageResponse( 509 response, 510 self.flow, 511 component="ak-stage-identification", 512 user_fields=["email"], 513 password_fields=False, 514 recovery_url=reverse( 515 "authentik_core:if-flow", 516 kwargs={"flow_slug": flow.slug}, 517 ), 518 show_source_labels=False, 519 primary_action="Log in", 520 sources=[ 521 { 522 "challenge": { 523 "component": "xak-flow-redirect", 524 "to": f"/source/oauth/login/{self.source.slug}/", 525 }, 526 "icon_url": "/static/authentik/sources/default.svg", 527 "name": self.source.name, 528 "promoted": False, 529 } 530 ], 531 ) 532 533 def test_recovery_flow_invalid_user(self): 534 """Test that an invalid user can proceed in a recovery flow""" 535 self.flow.designation = FlowDesignation.RECOVERY 536 self.flow.save() 537 response = self.client.get( 538 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 539 ) 540 self.assertStageResponse( 541 response, 542 self.flow, 543 component="ak-stage-identification", 544 user_fields=["email"], 545 password_fields=False, 546 show_source_labels=False, 547 primary_action="Continue", 548 sources=[ 549 { 550 "challenge": { 551 "component": "xak-flow-redirect", 552 "to": f"/source/oauth/login/{self.source.slug}/", 553 }, 554 "icon_url": "/static/authentik/sources/default.svg", 555 "name": self.source.name, 556 "promoted": False, 557 } 558 ], 559 ) 560 form_data = {"uid_field": generate_id()} 561 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 562 response = self.client.post(url, form_data) 563 self.assertEqual(response.status_code, 200) 564 565 def test_api_validate(self): 566 """Test API validation""" 567 self.assertTrue( 568 IdentificationStageSerializer( 569 data={ 570 "name": generate_id(), 571 "user_fields": [UserFields.E_MAIL, UserFields.USERNAME], 572 } 573 ).is_valid(raise_exception=True) 574 ) 575 with self.assertRaises(ValidationError): 576 IdentificationStageSerializer( 577 data={ 578 "name": generate_id(), 579 "user_fields": [], 580 "sources": [], 581 } 582 ).is_valid(raise_exception=True) 583 584 def test_prefill(self): 585 """Username prefill from existing flow context""" 586 pw_stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT]) 587 self.stage.password_stage = pw_stage 588 self.stage.save() 589 590 self.client.get( 591 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 592 ) 593 594 plan = self.get_flow_plan() 595 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo" 596 self.set_flow_plan(plan) 597 with self.assertFlowFinishes() as plan: 598 response = self.client.get( 599 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 600 ) 601 self.assertEqual(response.status_code, 200) 602 self.assertStageResponse( 603 response, 604 self.flow, 605 component="ak-stage-identification", 606 pending_user_identifier="foo", 607 ) 608 609 def test_prefill_simple(self): 610 """Username prefill from existing flow context""" 611 self.stage.pretend_user_exists = True 612 self.stage.save() 613 614 self.client.get( 615 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 616 ) 617 plan = self.get_flow_plan() 618 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo" 619 self.set_flow_plan(plan) 620 response = self.client.get( 621 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 622 ) 623 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
24class TestIdentificationStagePasskey(FlowTestCase): 25 """Passkey authentication tests""" 26 27 def setUp(self): 28 super().setUp() 29 self.user = create_test_admin_user() 30 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 31 self.webauthn_stage = AuthenticatorValidateStage.objects.create( 32 name="webauthn-validate", 33 device_classes=[DeviceClasses.WEBAUTHN], 34 ) 35 self.stage = IdentificationStage.objects.create( 36 name="identification", 37 user_fields=[UserFields.E_MAIL], 38 webauthn_stage=self.webauthn_stage, 39 ) 40 FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 41 self.device = WebAuthnDevice.objects.create( 42 user=self.user, 43 name="Test Passkey", 44 credential_id="test-credential-id", 45 public_key="test-public-key", 46 sign_count=0, 47 rp_id="testserver", 48 ) 49 50 def test_passkey_auth_success(self): 51 """Test passkey sets device, user, backend and updates last_used""" 52 from unittest.mock import patch 53 54 # Get challenge to initialize session 55 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 56 self.client.get(url) 57 58 with patch( 59 "authentik.stages.identification.stage.validate_challenge_webauthn", 60 return_value=self.device, 61 ): 62 response = self.client.post( 63 url, {"passkey": {"id": "test"}}, content_type="application/json" 64 ) 65 66 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 67 # Verify device last_used was updated 68 self.device.refresh_from_db() 69 self.assertIsNotNone(self.device.last_used) 70 71 def test_passkey_challenge_disabled(self): 72 """Test that passkey challenge is not included when webauthn_stage is not set""" 73 self.stage.webauthn_stage = None 74 self.stage.save() 75 response = self.client.get( 76 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 77 ) 78 self.assertEqual(response.status_code, 200) 79 data = response.json() 80 self.assertIsNone(data.get("passkey_challenge")) 81 82 def test_passkey_challenge_enabled(self): 83 """Test that passkey challenge is included when webauthn_stage is set""" 84 response = self.client.get( 85 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 86 ) 87 self.assertEqual(response.status_code, 200) 88 data = response.json() 89 self.assertIsNotNone(data.get("passkey_challenge")) 90 passkey_challenge = data["passkey_challenge"] 91 self.assertIn("challenge", passkey_challenge) 92 self.assertIn("rpId", passkey_challenge) 93 self.assertEqual(passkey_challenge["allowCredentials"], []) 94 95 def test_passkey_challenge_generation(self): 96 """Test passkey challenge is generated correctly""" 97 response = self.client.get( 98 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 99 ) 100 self.assertEqual(response.status_code, 200) 101 data = response.json() 102 self.assertIsNotNone(data.get("passkey_challenge")) 103 104 def test_passkey_no_uid_field_required(self): 105 """Test that uid_field is not required when passkey is provided""" 106 # Get the challenge first to set up the session 107 response = self.client.get( 108 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 109 ) 110 self.assertEqual(response.status_code, 200) 111 112 # Submit without uid_field but with passkey (invalid passkey will fail validation) 113 form_data = { 114 "passkey": { 115 "id": "invalid", 116 "rawId": "invalid", 117 "type": "public-key", 118 "response": { 119 "clientDataJSON": "invalid", 120 "authenticatorData": "invalid", 121 "signature": "invalid", 122 }, 123 } 124 } 125 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 126 response = self.client.post(url, form_data, content_type="application/json") 127 self.assertEqual(response.status_code, 200) 128 data = response.json() 129 self.assertIn("response_errors", data) 130 errors = data.get("response_errors", {}) 131 self.assertNotIn("uid_field", errors)
Passkey authentication tests
27 def setUp(self): 28 super().setUp() 29 self.user = create_test_admin_user() 30 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 31 self.webauthn_stage = AuthenticatorValidateStage.objects.create( 32 name="webauthn-validate", 33 device_classes=[DeviceClasses.WEBAUTHN], 34 ) 35 self.stage = IdentificationStage.objects.create( 36 name="identification", 37 user_fields=[UserFields.E_MAIL], 38 webauthn_stage=self.webauthn_stage, 39 ) 40 FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 41 self.device = WebAuthnDevice.objects.create( 42 user=self.user, 43 name="Test Passkey", 44 credential_id="test-credential-id", 45 public_key="test-public-key", 46 sign_count=0, 47 rp_id="testserver", 48 )
Hook method for setting up the test fixture before exercising it.
50 def test_passkey_auth_success(self): 51 """Test passkey sets device, user, backend and updates last_used""" 52 from unittest.mock import patch 53 54 # Get challenge to initialize session 55 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 56 self.client.get(url) 57 58 with patch( 59 "authentik.stages.identification.stage.validate_challenge_webauthn", 60 return_value=self.device, 61 ): 62 response = self.client.post( 63 url, {"passkey": {"id": "test"}}, content_type="application/json" 64 ) 65 66 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 67 # Verify device last_used was updated 68 self.device.refresh_from_db() 69 self.assertIsNotNone(self.device.last_used)
Test passkey sets device, user, backend and updates last_used
71 def test_passkey_challenge_disabled(self): 72 """Test that passkey challenge is not included when webauthn_stage is not set""" 73 self.stage.webauthn_stage = None 74 self.stage.save() 75 response = self.client.get( 76 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 77 ) 78 self.assertEqual(response.status_code, 200) 79 data = response.json() 80 self.assertIsNone(data.get("passkey_challenge"))
Test that passkey challenge is not included when webauthn_stage is not set
82 def test_passkey_challenge_enabled(self): 83 """Test that passkey challenge is included when webauthn_stage is set""" 84 response = self.client.get( 85 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 86 ) 87 self.assertEqual(response.status_code, 200) 88 data = response.json() 89 self.assertIsNotNone(data.get("passkey_challenge")) 90 passkey_challenge = data["passkey_challenge"] 91 self.assertIn("challenge", passkey_challenge) 92 self.assertIn("rpId", passkey_challenge) 93 self.assertEqual(passkey_challenge["allowCredentials"], [])
Test that passkey challenge is included when webauthn_stage is set
95 def test_passkey_challenge_generation(self): 96 """Test passkey challenge is generated correctly""" 97 response = self.client.get( 98 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 99 ) 100 self.assertEqual(response.status_code, 200) 101 data = response.json() 102 self.assertIsNotNone(data.get("passkey_challenge"))
Test passkey challenge is generated correctly
104 def test_passkey_no_uid_field_required(self): 105 """Test that uid_field is not required when passkey is provided""" 106 # Get the challenge first to set up the session 107 response = self.client.get( 108 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 109 ) 110 self.assertEqual(response.status_code, 200) 111 112 # Submit without uid_field but with passkey (invalid passkey will fail validation) 113 form_data = { 114 "passkey": { 115 "id": "invalid", 116 "rawId": "invalid", 117 "type": "public-key", 118 "response": { 119 "clientDataJSON": "invalid", 120 "authenticatorData": "invalid", 121 "signature": "invalid", 122 }, 123 } 124 } 125 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 126 response = self.client.post(url, form_data, content_type="application/json") 127 self.assertEqual(response.status_code, 200) 128 data = response.json() 129 self.assertIn("response_errors", data) 130 errors = data.get("response_errors", {}) 131 self.assertNotIn("uid_field", errors)
Test that uid_field is not required when passkey is provided
134class TestIdentificationStage(FlowTestCase): 135 """Identification tests""" 136 137 def setUp(self): 138 super().setUp() 139 self.user = create_test_admin_user() 140 141 # OAuthSource for the login view 142 self.source = OAuthSource.objects.create(name=generate_id(), slug=generate_id()) 143 144 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 145 self.stage = IdentificationStage.objects.create( 146 name="identification", 147 user_fields=[UserFields.E_MAIL], 148 pretend_user_exists=False, 149 ) 150 self.stage.sources.set([self.source]) 151 self.stage.save() 152 FlowStageBinding.objects.create( 153 target=self.flow, 154 stage=self.stage, 155 order=0, 156 ) 157 158 def test_valid_render(self): 159 """Test that View renders correctly""" 160 response = self.client.get( 161 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 162 ) 163 self.assertEqual(response.status_code, 200) 164 165 def test_valid_with_email(self): 166 """Test with valid email, check that URL redirects back to itself""" 167 form_data = {"uid_field": self.user.email} 168 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 169 response = self.client.post(url, form_data) 170 self.assertEqual(response.status_code, 200) 171 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 172 173 def test_valid_with_password(self): 174 """Test with valid email and password in single step""" 175 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 176 self.stage.password_stage = pw_stage 177 self.stage.save() 178 form_data = {"uid_field": self.user.email, "password": self.user.username} 179 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 180 response = self.client.post(url, form_data) 181 self.assertEqual(response.status_code, 200) 182 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 183 184 def test_invalid_with_password(self): 185 """Test with valid email and invalid password in single step""" 186 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 187 self.stage.password_stage = pw_stage 188 self.stage.save() 189 form_data = { 190 "uid_field": self.user.email, 191 "password": self.user.username + "test", 192 } 193 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 194 response = self.client.post(url, form_data) 195 self.assertStageResponse( 196 response, 197 self.flow, 198 component="ak-stage-identification", 199 password_fields=True, 200 primary_action="Log in", 201 response_errors={ 202 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 203 }, 204 sources=[ 205 { 206 "challenge": { 207 "component": "xak-flow-redirect", 208 "to": f"/source/oauth/login/{self.source.slug}/", 209 }, 210 "icon_url": "/static/authentik/sources/default.svg", 211 "name": self.source.name, 212 "promoted": False, 213 } 214 ], 215 show_source_labels=False, 216 user_fields=["email"], 217 ) 218 219 def test_invalid_with_password_pretend(self): 220 """Test with invalid email and invalid password in single step (with pretend_user_exists)""" 221 self.stage.pretend_user_exists = True 222 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 223 self.stage.password_stage = pw_stage 224 self.stage.save() 225 form_data = { 226 "uid_field": self.user.email + "test", 227 "password": self.user.username + "test", 228 } 229 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 230 response = self.client.post(url, form_data) 231 self.assertStageResponse( 232 response, 233 self.flow, 234 component="ak-stage-identification", 235 password_fields=True, 236 primary_action="Log in", 237 response_errors={ 238 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 239 }, 240 sources=[ 241 { 242 "challenge": { 243 "component": "xak-flow-redirect", 244 "to": f"/source/oauth/login/{self.source.slug}/", 245 }, 246 "icon_url": "/static/authentik/sources/default.svg", 247 "name": self.source.name, 248 "promoted": False, 249 } 250 ], 251 show_source_labels=False, 252 user_fields=["email"], 253 ) 254 255 @Mocker() 256 def test_valid_with_captcha(self, mock: Mocker): 257 """Test with valid email and captcha token in single step""" 258 mock.post( 259 "https://www.recaptcha.net/recaptcha/api/siteverify", 260 json={ 261 "success": True, 262 "score": 0.5, 263 }, 264 ) 265 266 captcha_stage = CaptchaStage.objects.create( 267 name="captcha", 268 public_key=RECAPTCHA_PUBLIC_KEY, 269 private_key=RECAPTCHA_PRIVATE_KEY, 270 ) 271 self.stage.captcha_stage = captcha_stage 272 self.stage.save() 273 274 form_data = {"uid_field": self.user.email, "captcha_token": "PASSED"} 275 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 276 response = self.client.post(url, form_data) 277 self.assertEqual(response.status_code, 200) 278 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 279 280 @Mocker() 281 def test_invalid_with_captcha(self, mock: Mocker): 282 """Test with valid email and invalid captcha token in single step""" 283 mock.post( 284 "https://www.recaptcha.net/recaptcha/api/siteverify", 285 json={ 286 "success": False, 287 "score": 0.5, 288 }, 289 ) 290 291 captcha_stage = CaptchaStage.objects.create( 292 name="captcha", 293 public_key=RECAPTCHA_PUBLIC_KEY, 294 private_key=RECAPTCHA_PRIVATE_KEY, 295 ) 296 297 self.stage.captcha_stage = captcha_stage 298 self.stage.save() 299 300 form_data = { 301 "uid_field": self.user.email, 302 "captcha_token": "FAILED", 303 } 304 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 305 response = self.client.post(url, form_data) 306 self.assertStageResponse( 307 response, 308 self.flow, 309 component="ak-stage-identification", 310 password_fields=False, 311 primary_action="Log in", 312 response_errors={ 313 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 314 }, 315 sources=[ 316 { 317 "challenge": { 318 "component": "xak-flow-redirect", 319 "to": f"/source/oauth/login/{self.source.slug}/", 320 }, 321 "icon_url": "/static/authentik/sources/default.svg", 322 "name": self.source.name, 323 "promoted": False, 324 } 325 ], 326 show_source_labels=False, 327 user_fields=["email"], 328 ) 329 330 @Mocker() 331 def test_invalid_with_captcha_retriable(self, mock: Mocker): 332 """Test with valid email and invalid captcha token in single step""" 333 mock.post( 334 "https://www.recaptcha.net/recaptcha/api/siteverify", 335 json={ 336 "success": False, 337 "score": 0.5, 338 "error-codes": ["timeout-or-duplicate"], 339 }, 340 ) 341 342 captcha_stage = CaptchaStage.objects.create( 343 name="captcha", 344 public_key=RECAPTCHA_PUBLIC_KEY, 345 private_key=RECAPTCHA_PRIVATE_KEY, 346 ) 347 348 self.stage.captcha_stage = captcha_stage 349 self.stage.save() 350 351 form_data = { 352 "uid_field": self.user.email, 353 "captcha_token": "FAILED", 354 } 355 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 356 response = self.client.post(url, form_data) 357 self.assertStageResponse( 358 response, 359 self.flow, 360 component="ak-stage-identification", 361 password_fields=False, 362 primary_action="Log in", 363 response_errors={ 364 "non_field_errors": [ 365 { 366 "code": "invalid", 367 "string": "Failed to authenticate.", 368 } 369 ] 370 }, 371 sources=[ 372 { 373 "challenge": { 374 "component": "xak-flow-redirect", 375 "to": f"/source/oauth/login/{self.source.slug}/", 376 }, 377 "icon_url": "/static/authentik/sources/default.svg", 378 "name": self.source.name, 379 "promoted": False, 380 } 381 ], 382 show_source_labels=False, 383 user_fields=["email"], 384 ) 385 386 def test_invalid_with_username(self): 387 """Test invalid with username (user exists but stage only allows email)""" 388 form_data = {"uid_field": self.user.username} 389 response = self.client.post( 390 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 391 form_data, 392 ) 393 self.assertEqual(response.status_code, 200) 394 self.assertStageResponse( 395 response, 396 self.flow, 397 component="ak-stage-identification", 398 response_errors={ 399 "non_field_errors": [{"string": "Failed to authenticate.", "code": "invalid"}] 400 }, 401 ) 402 403 def test_invalid_with_username_pretend(self): 404 """Test invalid with username (user exists but stage only allows email)""" 405 self.stage.pretend_user_exists = True 406 self.stage.save() 407 form_data = {"uid_field": self.user.username} 408 response = self.client.post( 409 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 410 form_data, 411 ) 412 self.assertEqual(response.status_code, 200) 413 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 414 415 def test_invalid_no_fields(self): 416 """Test invalid with username (no user fields are enabled)""" 417 self.stage.user_fields = [] 418 self.stage.save() 419 form_data = {"uid_field": self.user.username} 420 response = self.client.post( 421 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 422 form_data, 423 ) 424 self.assertStageResponse( 425 response, 426 self.flow, 427 component="ak-stage-identification", 428 password_fields=False, 429 primary_action="Log in", 430 response_errors={ 431 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 432 }, 433 show_source_labels=False, 434 sources=[ 435 { 436 "challenge": { 437 "component": "xak-flow-redirect", 438 "to": f"/source/oauth/login/{self.source.slug}/", 439 }, 440 "icon_url": "/static/authentik/sources/default.svg", 441 "name": self.source.name, 442 "promoted": False, 443 } 444 ], 445 user_fields=[], 446 ) 447 448 def test_invalid_with_invalid_email(self): 449 """Test with invalid email (user doesn't exist) -> Will return to login form""" 450 form_data = {"uid_field": self.user.email + "test"} 451 response = self.client.post( 452 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 453 form_data, 454 ) 455 self.assertEqual(response.status_code, 200) 456 457 def test_enrollment_flow(self): 458 """Test that enrollment flow is linked correctly""" 459 flow = create_test_flow() 460 self.stage.enrollment_flow = flow 461 self.stage.save() 462 FlowStageBinding.objects.create( 463 target=flow, 464 stage=self.stage, 465 order=0, 466 ) 467 468 response = self.client.get( 469 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 470 ) 471 self.assertStageResponse( 472 response, 473 self.flow, 474 component="ak-stage-identification", 475 user_fields=["email"], 476 password_fields=False, 477 enroll_url=reverse( 478 "authentik_core:if-flow", 479 kwargs={"flow_slug": flow.slug}, 480 ), 481 show_source_labels=False, 482 primary_action="Log in", 483 sources=[ 484 { 485 "icon_url": "/static/authentik/sources/default.svg", 486 "name": self.source.name, 487 "challenge": { 488 "component": "xak-flow-redirect", 489 "to": f"/source/oauth/login/{self.source.slug}/", 490 }, 491 "promoted": False, 492 } 493 ], 494 ) 495 496 def test_link_recovery_flow(self): 497 """Test that recovery flow is linked correctly""" 498 flow = create_test_flow() 499 self.stage.recovery_flow = flow 500 self.stage.save() 501 FlowStageBinding.objects.create( 502 target=flow, 503 stage=self.stage, 504 order=0, 505 ) 506 response = self.client.get( 507 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 508 ) 509 self.assertStageResponse( 510 response, 511 self.flow, 512 component="ak-stage-identification", 513 user_fields=["email"], 514 password_fields=False, 515 recovery_url=reverse( 516 "authentik_core:if-flow", 517 kwargs={"flow_slug": flow.slug}, 518 ), 519 show_source_labels=False, 520 primary_action="Log in", 521 sources=[ 522 { 523 "challenge": { 524 "component": "xak-flow-redirect", 525 "to": f"/source/oauth/login/{self.source.slug}/", 526 }, 527 "icon_url": "/static/authentik/sources/default.svg", 528 "name": self.source.name, 529 "promoted": False, 530 } 531 ], 532 ) 533 534 def test_recovery_flow_invalid_user(self): 535 """Test that an invalid user can proceed in a recovery flow""" 536 self.flow.designation = FlowDesignation.RECOVERY 537 self.flow.save() 538 response = self.client.get( 539 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 540 ) 541 self.assertStageResponse( 542 response, 543 self.flow, 544 component="ak-stage-identification", 545 user_fields=["email"], 546 password_fields=False, 547 show_source_labels=False, 548 primary_action="Continue", 549 sources=[ 550 { 551 "challenge": { 552 "component": "xak-flow-redirect", 553 "to": f"/source/oauth/login/{self.source.slug}/", 554 }, 555 "icon_url": "/static/authentik/sources/default.svg", 556 "name": self.source.name, 557 "promoted": False, 558 } 559 ], 560 ) 561 form_data = {"uid_field": generate_id()} 562 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 563 response = self.client.post(url, form_data) 564 self.assertEqual(response.status_code, 200) 565 566 def test_api_validate(self): 567 """Test API validation""" 568 self.assertTrue( 569 IdentificationStageSerializer( 570 data={ 571 "name": generate_id(), 572 "user_fields": [UserFields.E_MAIL, UserFields.USERNAME], 573 } 574 ).is_valid(raise_exception=True) 575 ) 576 with self.assertRaises(ValidationError): 577 IdentificationStageSerializer( 578 data={ 579 "name": generate_id(), 580 "user_fields": [], 581 "sources": [], 582 } 583 ).is_valid(raise_exception=True) 584 585 def test_prefill(self): 586 """Username prefill from existing flow context""" 587 pw_stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT]) 588 self.stage.password_stage = pw_stage 589 self.stage.save() 590 591 self.client.get( 592 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 593 ) 594 595 plan = self.get_flow_plan() 596 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo" 597 self.set_flow_plan(plan) 598 with self.assertFlowFinishes() as plan: 599 response = self.client.get( 600 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 601 ) 602 self.assertEqual(response.status_code, 200) 603 self.assertStageResponse( 604 response, 605 self.flow, 606 component="ak-stage-identification", 607 pending_user_identifier="foo", 608 ) 609 610 def test_prefill_simple(self): 611 """Username prefill from existing flow context""" 612 self.stage.pretend_user_exists = True 613 self.stage.save() 614 615 self.client.get( 616 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 617 ) 618 plan = self.get_flow_plan() 619 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo" 620 self.set_flow_plan(plan) 621 response = self.client.get( 622 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 623 ) 624 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Identification tests
137 def setUp(self): 138 super().setUp() 139 self.user = create_test_admin_user() 140 141 # OAuthSource for the login view 142 self.source = OAuthSource.objects.create(name=generate_id(), slug=generate_id()) 143 144 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 145 self.stage = IdentificationStage.objects.create( 146 name="identification", 147 user_fields=[UserFields.E_MAIL], 148 pretend_user_exists=False, 149 ) 150 self.stage.sources.set([self.source]) 151 self.stage.save() 152 FlowStageBinding.objects.create( 153 target=self.flow, 154 stage=self.stage, 155 order=0, 156 )
Hook method for setting up the test fixture before exercising it.
158 def test_valid_render(self): 159 """Test that View renders correctly""" 160 response = self.client.get( 161 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 162 ) 163 self.assertEqual(response.status_code, 200)
Test that View renders correctly
165 def test_valid_with_email(self): 166 """Test with valid email, check that URL redirects back to itself""" 167 form_data = {"uid_field": self.user.email} 168 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 169 response = self.client.post(url, form_data) 170 self.assertEqual(response.status_code, 200) 171 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test with valid email, check that URL redirects back to itself
173 def test_valid_with_password(self): 174 """Test with valid email and password in single step""" 175 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 176 self.stage.password_stage = pw_stage 177 self.stage.save() 178 form_data = {"uid_field": self.user.email, "password": self.user.username} 179 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 180 response = self.client.post(url, form_data) 181 self.assertEqual(response.status_code, 200) 182 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test with valid email and password in single step
184 def test_invalid_with_password(self): 185 """Test with valid email and invalid password in single step""" 186 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 187 self.stage.password_stage = pw_stage 188 self.stage.save() 189 form_data = { 190 "uid_field": self.user.email, 191 "password": self.user.username + "test", 192 } 193 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 194 response = self.client.post(url, form_data) 195 self.assertStageResponse( 196 response, 197 self.flow, 198 component="ak-stage-identification", 199 password_fields=True, 200 primary_action="Log in", 201 response_errors={ 202 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 203 }, 204 sources=[ 205 { 206 "challenge": { 207 "component": "xak-flow-redirect", 208 "to": f"/source/oauth/login/{self.source.slug}/", 209 }, 210 "icon_url": "/static/authentik/sources/default.svg", 211 "name": self.source.name, 212 "promoted": False, 213 } 214 ], 215 show_source_labels=False, 216 user_fields=["email"], 217 )
Test with valid email and invalid password in single step
219 def test_invalid_with_password_pretend(self): 220 """Test with invalid email and invalid password in single step (with pretend_user_exists)""" 221 self.stage.pretend_user_exists = True 222 pw_stage = PasswordStage.objects.create(name="password", backends=[BACKEND_INBUILT]) 223 self.stage.password_stage = pw_stage 224 self.stage.save() 225 form_data = { 226 "uid_field": self.user.email + "test", 227 "password": self.user.username + "test", 228 } 229 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 230 response = self.client.post(url, form_data) 231 self.assertStageResponse( 232 response, 233 self.flow, 234 component="ak-stage-identification", 235 password_fields=True, 236 primary_action="Log in", 237 response_errors={ 238 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 239 }, 240 sources=[ 241 { 242 "challenge": { 243 "component": "xak-flow-redirect", 244 "to": f"/source/oauth/login/{self.source.slug}/", 245 }, 246 "icon_url": "/static/authentik/sources/default.svg", 247 "name": self.source.name, 248 "promoted": False, 249 } 250 ], 251 show_source_labels=False, 252 user_fields=["email"], 253 )
Test with invalid email and invalid password in single step (with pretend_user_exists)
255 @Mocker() 256 def test_valid_with_captcha(self, mock: Mocker): 257 """Test with valid email and captcha token in single step""" 258 mock.post( 259 "https://www.recaptcha.net/recaptcha/api/siteverify", 260 json={ 261 "success": True, 262 "score": 0.5, 263 }, 264 ) 265 266 captcha_stage = CaptchaStage.objects.create( 267 name="captcha", 268 public_key=RECAPTCHA_PUBLIC_KEY, 269 private_key=RECAPTCHA_PRIVATE_KEY, 270 ) 271 self.stage.captcha_stage = captcha_stage 272 self.stage.save() 273 274 form_data = {"uid_field": self.user.email, "captcha_token": "PASSED"} 275 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 276 response = self.client.post(url, form_data) 277 self.assertEqual(response.status_code, 200) 278 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test with valid email and captcha token in single step
280 @Mocker() 281 def test_invalid_with_captcha(self, mock: Mocker): 282 """Test with valid email and invalid captcha token in single step""" 283 mock.post( 284 "https://www.recaptcha.net/recaptcha/api/siteverify", 285 json={ 286 "success": False, 287 "score": 0.5, 288 }, 289 ) 290 291 captcha_stage = CaptchaStage.objects.create( 292 name="captcha", 293 public_key=RECAPTCHA_PUBLIC_KEY, 294 private_key=RECAPTCHA_PRIVATE_KEY, 295 ) 296 297 self.stage.captcha_stage = captcha_stage 298 self.stage.save() 299 300 form_data = { 301 "uid_field": self.user.email, 302 "captcha_token": "FAILED", 303 } 304 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 305 response = self.client.post(url, form_data) 306 self.assertStageResponse( 307 response, 308 self.flow, 309 component="ak-stage-identification", 310 password_fields=False, 311 primary_action="Log in", 312 response_errors={ 313 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 314 }, 315 sources=[ 316 { 317 "challenge": { 318 "component": "xak-flow-redirect", 319 "to": f"/source/oauth/login/{self.source.slug}/", 320 }, 321 "icon_url": "/static/authentik/sources/default.svg", 322 "name": self.source.name, 323 "promoted": False, 324 } 325 ], 326 show_source_labels=False, 327 user_fields=["email"], 328 )
Test with valid email and invalid captcha token in single step
330 @Mocker() 331 def test_invalid_with_captcha_retriable(self, mock: Mocker): 332 """Test with valid email and invalid captcha token in single step""" 333 mock.post( 334 "https://www.recaptcha.net/recaptcha/api/siteverify", 335 json={ 336 "success": False, 337 "score": 0.5, 338 "error-codes": ["timeout-or-duplicate"], 339 }, 340 ) 341 342 captcha_stage = CaptchaStage.objects.create( 343 name="captcha", 344 public_key=RECAPTCHA_PUBLIC_KEY, 345 private_key=RECAPTCHA_PRIVATE_KEY, 346 ) 347 348 self.stage.captcha_stage = captcha_stage 349 self.stage.save() 350 351 form_data = { 352 "uid_field": self.user.email, 353 "captcha_token": "FAILED", 354 } 355 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 356 response = self.client.post(url, form_data) 357 self.assertStageResponse( 358 response, 359 self.flow, 360 component="ak-stage-identification", 361 password_fields=False, 362 primary_action="Log in", 363 response_errors={ 364 "non_field_errors": [ 365 { 366 "code": "invalid", 367 "string": "Failed to authenticate.", 368 } 369 ] 370 }, 371 sources=[ 372 { 373 "challenge": { 374 "component": "xak-flow-redirect", 375 "to": f"/source/oauth/login/{self.source.slug}/", 376 }, 377 "icon_url": "/static/authentik/sources/default.svg", 378 "name": self.source.name, 379 "promoted": False, 380 } 381 ], 382 show_source_labels=False, 383 user_fields=["email"], 384 )
Test with valid email and invalid captcha token in single step
386 def test_invalid_with_username(self): 387 """Test invalid with username (user exists but stage only allows email)""" 388 form_data = {"uid_field": self.user.username} 389 response = self.client.post( 390 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 391 form_data, 392 ) 393 self.assertEqual(response.status_code, 200) 394 self.assertStageResponse( 395 response, 396 self.flow, 397 component="ak-stage-identification", 398 response_errors={ 399 "non_field_errors": [{"string": "Failed to authenticate.", "code": "invalid"}] 400 }, 401 )
Test invalid with username (user exists but stage only allows email)
403 def test_invalid_with_username_pretend(self): 404 """Test invalid with username (user exists but stage only allows email)""" 405 self.stage.pretend_user_exists = True 406 self.stage.save() 407 form_data = {"uid_field": self.user.username} 408 response = self.client.post( 409 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 410 form_data, 411 ) 412 self.assertEqual(response.status_code, 200) 413 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test invalid with username (user exists but stage only allows email)
415 def test_invalid_no_fields(self): 416 """Test invalid with username (no user fields are enabled)""" 417 self.stage.user_fields = [] 418 self.stage.save() 419 form_data = {"uid_field": self.user.username} 420 response = self.client.post( 421 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 422 form_data, 423 ) 424 self.assertStageResponse( 425 response, 426 self.flow, 427 component="ak-stage-identification", 428 password_fields=False, 429 primary_action="Log in", 430 response_errors={ 431 "non_field_errors": [{"code": "invalid", "string": "Failed to authenticate."}] 432 }, 433 show_source_labels=False, 434 sources=[ 435 { 436 "challenge": { 437 "component": "xak-flow-redirect", 438 "to": f"/source/oauth/login/{self.source.slug}/", 439 }, 440 "icon_url": "/static/authentik/sources/default.svg", 441 "name": self.source.name, 442 "promoted": False, 443 } 444 ], 445 user_fields=[], 446 )
Test invalid with username (no user fields are enabled)
448 def test_invalid_with_invalid_email(self): 449 """Test with invalid email (user doesn't exist) -> Will return to login form""" 450 form_data = {"uid_field": self.user.email + "test"} 451 response = self.client.post( 452 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 453 form_data, 454 ) 455 self.assertEqual(response.status_code, 200)
Test with invalid email (user doesn't exist) -> Will return to login form
457 def test_enrollment_flow(self): 458 """Test that enrollment flow is linked correctly""" 459 flow = create_test_flow() 460 self.stage.enrollment_flow = flow 461 self.stage.save() 462 FlowStageBinding.objects.create( 463 target=flow, 464 stage=self.stage, 465 order=0, 466 ) 467 468 response = self.client.get( 469 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 470 ) 471 self.assertStageResponse( 472 response, 473 self.flow, 474 component="ak-stage-identification", 475 user_fields=["email"], 476 password_fields=False, 477 enroll_url=reverse( 478 "authentik_core:if-flow", 479 kwargs={"flow_slug": flow.slug}, 480 ), 481 show_source_labels=False, 482 primary_action="Log in", 483 sources=[ 484 { 485 "icon_url": "/static/authentik/sources/default.svg", 486 "name": self.source.name, 487 "challenge": { 488 "component": "xak-flow-redirect", 489 "to": f"/source/oauth/login/{self.source.slug}/", 490 }, 491 "promoted": False, 492 } 493 ], 494 )
Test that enrollment flow is linked correctly
496 def test_link_recovery_flow(self): 497 """Test that recovery flow is linked correctly""" 498 flow = create_test_flow() 499 self.stage.recovery_flow = flow 500 self.stage.save() 501 FlowStageBinding.objects.create( 502 target=flow, 503 stage=self.stage, 504 order=0, 505 ) 506 response = self.client.get( 507 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 508 ) 509 self.assertStageResponse( 510 response, 511 self.flow, 512 component="ak-stage-identification", 513 user_fields=["email"], 514 password_fields=False, 515 recovery_url=reverse( 516 "authentik_core:if-flow", 517 kwargs={"flow_slug": flow.slug}, 518 ), 519 show_source_labels=False, 520 primary_action="Log in", 521 sources=[ 522 { 523 "challenge": { 524 "component": "xak-flow-redirect", 525 "to": f"/source/oauth/login/{self.source.slug}/", 526 }, 527 "icon_url": "/static/authentik/sources/default.svg", 528 "name": self.source.name, 529 "promoted": False, 530 } 531 ], 532 )
Test that recovery flow is linked correctly
534 def test_recovery_flow_invalid_user(self): 535 """Test that an invalid user can proceed in a recovery flow""" 536 self.flow.designation = FlowDesignation.RECOVERY 537 self.flow.save() 538 response = self.client.get( 539 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 540 ) 541 self.assertStageResponse( 542 response, 543 self.flow, 544 component="ak-stage-identification", 545 user_fields=["email"], 546 password_fields=False, 547 show_source_labels=False, 548 primary_action="Continue", 549 sources=[ 550 { 551 "challenge": { 552 "component": "xak-flow-redirect", 553 "to": f"/source/oauth/login/{self.source.slug}/", 554 }, 555 "icon_url": "/static/authentik/sources/default.svg", 556 "name": self.source.name, 557 "promoted": False, 558 } 559 ], 560 ) 561 form_data = {"uid_field": generate_id()} 562 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 563 response = self.client.post(url, form_data) 564 self.assertEqual(response.status_code, 200)
Test that an invalid user can proceed in a recovery flow
566 def test_api_validate(self): 567 """Test API validation""" 568 self.assertTrue( 569 IdentificationStageSerializer( 570 data={ 571 "name": generate_id(), 572 "user_fields": [UserFields.E_MAIL, UserFields.USERNAME], 573 } 574 ).is_valid(raise_exception=True) 575 ) 576 with self.assertRaises(ValidationError): 577 IdentificationStageSerializer( 578 data={ 579 "name": generate_id(), 580 "user_fields": [], 581 "sources": [], 582 } 583 ).is_valid(raise_exception=True)
Test API validation
585 def test_prefill(self): 586 """Username prefill from existing flow context""" 587 pw_stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT]) 588 self.stage.password_stage = pw_stage 589 self.stage.save() 590 591 self.client.get( 592 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 593 ) 594 595 plan = self.get_flow_plan() 596 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo" 597 self.set_flow_plan(plan) 598 with self.assertFlowFinishes() as plan: 599 response = self.client.get( 600 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 601 ) 602 self.assertEqual(response.status_code, 200) 603 self.assertStageResponse( 604 response, 605 self.flow, 606 component="ak-stage-identification", 607 pending_user_identifier="foo", 608 )
Username prefill from existing flow context
610 def test_prefill_simple(self): 611 """Username prefill from existing flow context""" 612 self.stage.pretend_user_exists = True 613 self.stage.save() 614 615 self.client.get( 616 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 617 ) 618 plan = self.get_flow_plan() 619 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo" 620 self.set_flow_plan(plan) 621 response = self.client.get( 622 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 623 ) 624 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Username prefill from existing flow context